Browse Source

Wrestling with replica set failover. Probably still needs more testing. Added waitFor parameter to Connect to specify what to wait for when connecting to a replica set. Added connectionAttempt counter to MongoServer. Changed how current primary is tracked in MongoServer. Added new ConnectedToSubset value to MongoServerState.

pull/65/head
rstam 14 years ago
parent
commit
1fa7d1d566
  1. 39
      Driver/Core/ConnectWaitFor.cs
  2. 157
      Driver/Core/MongoServer.cs
  3. 19
      Driver/Core/MongoServerInstance.cs
  4. 6
      Driver/Core/MongoServerState.cs
  5. 1
      Driver/Driver.csproj
  6. 5
      Driver/Internal/DirectConnector.cs
  7. 62
      Driver/Internal/ReplicaSetConnector.cs

39
Driver/Core/ConnectWaitFor.cs

@ -0,0 +1,39 @@
/* Copyright 2010-2011 10gen Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace MongoDB.Driver {
/// <summary>
/// Used with the Connect method when connecting to a replica set to specify what subset of the replica set must be connected before returning.
/// </summary>
public enum ConnectWaitFor {
/// <summary>
/// Wait for all members of the replica set to be connected.
/// </summary>
All,
/// <summary>
/// Wait for the primary member of the replica set to be connected.
/// </summary>
Primary,
/// <summary>
/// Wait for any slaveOk member of the replica set to be connected (includes primary, secondaries and passives).
/// </summary>
AnySlaveOk
}
}

157
Driver/Core/MongoServer.cs

@ -41,7 +41,9 @@ namespace MongoDB.Driver {
private MongoServerSettings settings;
private MongoServerState state = MongoServerState.Disconnected;
private object stateLock = new object(); // synchronize state changes
private int connectionAttempt;
private List<MongoServerInstance> instances = new List<MongoServerInstance>();
private MongoServerInstance primary;
private string replicaSetName;
private int loadBalancingInstanceIndex; // used to distribute reads across secondaries in round robin fashion
private Dictionary<MongoDatabaseSettings, MongoDatabase> databases = new Dictionary<MongoDatabaseSettings, MongoDatabase>();
@ -248,6 +250,13 @@ namespace MongoDB.Driver {
}
}
/// <summary>
/// Gets the most recent connection attempt number.
/// </summary>
public int ConnectionAttempt {
get { return connectionAttempt; }
}
/// <summary>
/// Gets the index cache (used by EnsureIndex) for this server.
/// </summary>
@ -299,7 +308,7 @@ namespace MongoDB.Driver {
public MongoServerInstance Primary {
get {
lock (stateLock) {
return instances.Where(i => i.IsPrimary).SingleOrDefault();
return primary;
}
}
}
@ -451,6 +460,17 @@ namespace MongoDB.Driver {
Connect(settings.ConnectTimeout);
}
/// <summary>
/// Connects to the server. Normally there is no need to call this method as
/// the driver will connect to the server automatically when needed.
/// </summary>
/// <param name="waitFor">What to wait for before returning.</param>
public virtual void Connect(
ConnectWaitFor waitFor
) {
Connect(settings.ConnectTimeout, waitFor);
}
/// <summary>
/// Connects to the server. Normally there is no need to call this method as
/// the driver will connect to the server automatically when needed.
@ -458,21 +478,71 @@ namespace MongoDB.Driver {
/// <param name="timeout">How long to wait before timing out.</param>
public virtual void Connect(
TimeSpan timeout
) {
var waitFor = settings.SlaveOk ? ConnectWaitFor.AnySlaveOk : ConnectWaitFor.Primary;
Connect(timeout, waitFor);
}
/// <summary>
/// Connects to the server. Normally there is no need to call this method as
/// the driver will connect to the server automatically when needed.
/// </summary>
/// <param name="timeout">How long to wait before timing out.</param>
/// <param name="waitFor">What to wait for before returning.</param>
public virtual void Connect(
TimeSpan timeout,
ConnectWaitFor waitFor
) {
lock (serverLock) {
if (state == MongoServerState.Disconnected) {
switch (settings.ConnectionMode) {
case ConnectionMode.Direct:
var directConnector = new DirectConnector(this);
switch (settings.ConnectionMode) {
case ConnectionMode.Direct:
if (state == MongoServerState.Disconnected) {
var directConnector = new DirectConnector(this, ++connectionAttempt);
directConnector.Connect(timeout);
break;
case ConnectionMode.ReplicaSet:
var replicaSetConnector = new ReplicaSetConnector(this);
replicaSetConnector.Connect(timeout);
break;
default:
throw new MongoInternalException("Invalid ConnectionMode.");
}
}
return;
case ConnectionMode.ReplicaSet:
var timeoutAt = DateTime.UtcNow + timeout;
while (true) {
lock (stateLock) {
switch (waitFor) {
case ConnectWaitFor.All:
if (instances.All(i => i.State == MongoServerState.Connected)) {
return;
}
break;
case ConnectWaitFor.AnySlaveOk:
if (instances.Any(i => (i.IsPrimary || i.IsSecondary || i.IsPassive) && i.State == MongoServerState.Connected)) {
return;
}
break;
case ConnectWaitFor.Primary:
if (primary != null && primary.State == MongoServerState.Connected) {
return;
}
break;
default:
throw new ArgumentException("Invalid ConnectWaitMode.");
}
}
// a replica set connector might have exited early and still be working in the background
if (state == MongoServerState.Connecting) {
if (DateTime.UtcNow > timeoutAt) {
throw new TimeoutException("Timeout while connecting to server.");
}
Thread.Sleep(TimeSpan.FromMilliseconds(20));
} else {
break;
}
}
var replicaSetConnector = new ReplicaSetConnector(this, ++connectionAttempt);
var remainingTimeout = timeoutAt - DateTime.UtcNow;
replicaSetConnector.Connect(remainingTimeout, waitFor);
return;
default:
throw new MongoInternalException("Invalid ConnectionMode.");
}
}
}
@ -527,15 +597,7 @@ namespace MongoDB.Driver {
// normally called from a connection when there is a SocketException
// but anyone can call it if they want to close all sockets to the server
lock (serverLock) {
MongoServerInstance[] instances;
lock (stateLock) {
if (state != MongoServerState.Connected) {
return;
}
instances = Instances;
}
foreach (var instance in instances) {
foreach (var instance in Instances) {
instance.Disconnect();
}
@ -942,7 +1004,7 @@ namespace MongoDB.Driver {
return request.Connection;
}
var serverInstance = GetServerInstance(slaveOk);
var serverInstance = ChooseServerInstance(slaveOk);
return serverInstance.AcquireConnection(database);
}
}
@ -986,13 +1048,12 @@ namespace MongoDB.Driver {
}
}
internal MongoServerInstance GetServerInstance(
internal MongoServerInstance ChooseServerInstance(
bool slaveOk
) {
lock (serverLock) {
if (state == MongoServerState.Disconnected) {
Connect();
}
var waitFor = slaveOk ? ConnectWaitFor.AnySlaveOk : ConnectWaitFor.Primary;
Connect(waitFor);
if (settings.ConnectionMode == ConnectionMode.ReplicaSet) {
if (slaveOk) {
@ -1008,13 +1069,14 @@ namespace MongoDB.Driver {
}
}
var primary = Primary;
if (primary == null) {
throw new MongoConnectionException("Primary server not found.");
lock (stateLock) {
if (primary == null) {
throw new MongoConnectionException("Primary server not found.");
}
return primary;
}
return primary;
} else {
return Instances.First();
return Instance;
}
}
}
@ -1054,28 +1116,27 @@ namespace MongoDB.Driver {
object args
) {
lock (stateLock) {
if (instances.Any(i => i.State == MongoServerState.Connected && i.IsPrimary)) {
// Console.WriteLine("Server state: Connected");
state = MongoServerState.Connected;
return;
}
var instance = (MongoServerInstance) sender;
if (settings.SlaveOk) {
if (instances.Any(i => i.State == MongoServerState.Connected && (i.IsSecondary || i.IsPassive))) {
// Console.WriteLine("Server state: Connected");
state = MongoServerState.Connected;
return;
if (instances.Contains(instance)) {
if (instance.IsPrimary && instance.State == MongoServerState.Connected && primary != instance) {
primary = instance; // new primary
}
} else {
if (primary == instance) {
primary = null; // no primary until we find one again
}
}
if (instances.Any(i => i.State == MongoServerState.Connecting)) {
// Console.WriteLine("Server state: Connecting");
if (instances.All(i => i.State == MongoServerState.Connected)) {
state = MongoServerState.Connected;
} else if (instances.Any(i => i.State == MongoServerState.Connecting)) {
state = MongoServerState.Connecting;
return;
} else if (instances.Any(i => i.State == MongoServerState.Connected)) {
state = MongoServerState.ConnectedToSubset;
} else {
state = MongoServerState.Disconnected;
}
// Console.WriteLine("Server state: Disconnected");
state = MongoServerState.Disconnected;
}
}
#endregion

19
Driver/Core/MongoServerInstance.cs

@ -216,17 +216,18 @@ namespace MongoDB.Driver {
bool slaveOk
) {
lock (serverInstanceLock) {
if (state != MongoServerState.Disconnected) {
var message = string.Format("MongoServerInstance.Connect can only be called when state is Disconnected, not when state is {0}.", state);
throw new InvalidOperationException(message);
}
// note: don't check that state is Disconnected here
// when reconnecting to a replica set state can transition from Connected -> Connecting -> Connected
State = MongoServerState.Connecting;
connectException = null;
try {
endPoint = address.ToIPEndPoint(server.Settings.AddressFamily);
var connectionPool = new MongoConnectionPool(this);
if (connectionPool == null) {
connectionPool = new MongoConnectionPool(this);
}
try {
var connection = connectionPool.AcquireConnection(null);
try {
@ -260,10 +261,13 @@ namespace MongoDB.Driver {
buildInfoResult.Response["version"].AsString // versionString
);
} finally {
connectionPool.ReleaseConnection(connection);
connection.ConnectionPool.ReleaseConnection(connection);
}
} catch {
connectionPool.Close();
if (connectionPool != null) {
connectionPool.Close();
connectionPool = null;
}
throw;
}
@ -273,7 +277,6 @@ namespace MongoDB.Driver {
}
State = MongoServerState.Connected;
this.connectionPool = connectionPool;
} catch (Exception ex) {
State = MongoServerState.Disconnected;
connectException = ex;

6
Driver/Core/MongoServerState.cs

@ -38,6 +38,10 @@ namespace MongoDB.Driver {
/// <summary>
/// Connected to the server.
/// </summary>
Connected
Connected,
/// <summary>
/// Connected to a subset of the replica set members.
/// </summary>
ConnectedToSubset
}
}

1
Driver/Driver.csproj

@ -98,6 +98,7 @@
<Compile Include="Core\CommandResults\GetLastErrorResult.cs" />
<Compile Include="Core\CommandResults\SafeModeResult.cs" />
<Compile Include="Core\CommandResults\ValidateCollectionResult.cs" />
<Compile Include="Core\ConnectWaitFor.cs" />
<Compile Include="Core\Interfaces\IMongoCollectionOptions.cs" />
<Compile Include="Core\Interfaces\IMongoCommand.cs" />
<Compile Include="Core\Interfaces\IMongoFields.cs" />

5
Driver/Internal/DirectConnector.cs

@ -25,13 +25,16 @@ namespace MongoDB.Driver.Internal {
internal class DirectConnector {
#region private fields
private MongoServer server;
private int connectionAttempt;
#endregion
#region constructors
internal DirectConnector(
MongoServer server
MongoServer server,
int connectionAttempt
) {
this.server = server;
this.connectionAttempt = connectionAttempt;
}
#endregion

62
Driver/Internal/ReplicaSetConnector.cs

@ -26,6 +26,7 @@ namespace MongoDB.Driver.Internal {
internal class ReplicaSetConnector {
#region private fields
private MongoServer server;
private int connectionAttempt;
private DateTime timeoutAt;
private BlockingQueue<ConnectResponse> responseQueue = new BlockingQueue<ConnectResponse>();
private List<ConnectArgs> connects = new List<ConnectArgs>();
@ -35,15 +36,18 @@ namespace MongoDB.Driver.Internal {
#region constructors
internal ReplicaSetConnector(
MongoServer server
MongoServer server,
int connectionAttempt
) {
this.server = server;
this.connectionAttempt = connectionAttempt;
}
#endregion
#region internal methods
internal void Connect(
TimeSpan timeout
TimeSpan timeout,
ConnectWaitFor waitFor
) {
timeoutAt = DateTime.UtcNow + timeout;
@ -65,23 +69,47 @@ namespace MongoDB.Driver.Internal {
ProcessResponse(response);
// return as soon as we've connected to the primary
var serverInstance = response.ServerInstance;
if (serverInstance.State == MongoServerState.Connected && serverInstance.IsPrimary) {
// make sure this serverInstance is still an instance of this server
// it might have been removed if the connection string used a DNS alias for the host
// (in which case we have already queued a connect using the official host name and should see that response shortly)
if (server.Instances.Contains(serverInstance)) {
// return as soon as we can (according to the waitFor mode specified)
bool exitEarly = false;
switch (waitFor) {
case ConnectWaitFor.All:
if (server.Instances.All(i => i.State == MongoServerState.Connected)) {
exitEarly = true;
}
break;
case ConnectWaitFor.AnySlaveOk:
if (server.Instances.Any(i => (i.IsPrimary || i.IsSecondary || i.IsPassive) && i.State == MongoServerState.Connected)) {
exitEarly = true;
}
break;
case ConnectWaitFor.Primary:
var primary = server.Primary;
if (primary != null && primary.State == MongoServerState.Connected) {
exitEarly = true;
}
break;
default:
throw new ArgumentException("Invalid ConnectWaitFor value.");
}
if (exitEarly) {
if (responses.Count < connects.Count) {
// process any additional responses in the background
ThreadPool.QueueUserWorkItem(ProcessAdditionalResponsesWorkItem);
return;
}
return;
}
}
var exceptions = responses.Select(r => r.ServerInstance.ConnectException).Where(e => e != null).ToArray();
var innerException = exceptions.FirstOrDefault();
var exception = new MongoConnectionException("Unable to connect to server.", innerException);
string message;
if (innerException == null) {
message = "Unable to connect to server.";
} else {
message = string.Format("Unable to connect to server: {0}.", innerException.Message);
}
var exception = new MongoConnectionException(message, innerException);
exception.Data.Add("InnerExceptions", exceptions);
throw exception;
}
@ -99,8 +127,8 @@ namespace MongoDB.Driver.Internal {
try {
serverInstance.Connect(true); // slaveOk
response.IsMasterResult = serverInstance.IsMasterResult;
} catch {
// exception is already stored in serverInstance.ConnectException and will be handled later
} catch (Exception ex) {
response.Exception = ex;
}
args.ResponseQueue.Enqueue(response);
@ -190,7 +218,12 @@ namespace MongoDB.Driver.Internal {
ConnectResponse response
) {
responses.Add(response);
if (response.ServerInstance.ConnectException != null) {
// don't process response if it threw an exception or if the connect attempt has been abandoned
if (response.Exception != null) {
return;
}
if (server.State == MongoServerState.Disconnected || connectionAttempt != server.ConnectionAttempt) {
return;
}
@ -239,6 +272,7 @@ namespace MongoDB.Driver.Internal {
private class ConnectResponse {
public MongoServerInstance ServerInstance { get; set; }
public CommandResult IsMasterResult { get; set; }
public Exception Exception { get; set; }
}
#endregion
}

Loading…
Cancel
Save