Browse Source

More work on CSHARP-183, CSHARP-201, CSHARP-302, CSHARP-303. Added SequentialId property to MongoServer and MongoServerInstance to use in logging. Added new Disconnect*ing* MongoServerState. Changed ChooseServerInstance to make first attempt to find an instance using current state, and only call VerifyUnknownStates or Connect if first attempt fails. Changed InstanceStateChanged to handle new Disconnecting state. No longer try to start and stop the connection pool timer (there are too many edge cases); instead let the timer fire continuously but do nothing when Disconnected.

pull/66/merge
rstam 14 years ago
parent
commit
5778f994d4
  1. 1
      Driver/Core/MongoDatabase.cs
  2. 145
      Driver/Core/MongoServer.cs
  3. 34
      Driver/Core/MongoServerInstance.cs
  4. 14
      Driver/Core/MongoServerState.cs
  5. 18
      Driver/Internal/MongoConnectionPool.cs
  6. 9
      Driver/Internal/ReplicaSetConnector.cs

1
Driver/Core/MongoDatabase.cs

@ -861,6 +861,7 @@ namespace MongoDB.Driver {
commandResult.Initialize(command, response); // so two phase construction required
if (!commandResult.Ok) {
if (commandResult.ErrorMessage == "not master") {
// TODO: figure out which instance gave the error and set its state to Unknown
server.Disconnect();
}
throw new MongoCommandException(commandResult);

145
Driver/Core/MongoServer.cs

@ -33,12 +33,14 @@ namespace MongoDB.Driver {
#region private static fields
private static object staticLock = new object();
private static Dictionary<MongoServerSettings, MongoServer> servers = new Dictionary<MongoServerSettings, MongoServer>();
private static int nextSequentialId;
private static int maxServerCount = 100;
#endregion
#region private fields
private object serverLock = new object();
private MongoServerSettings settings;
private int sequentialId;
private MongoServerState state = MongoServerState.Disconnected;
private object stateLock = new object(); // synchronize state changes
private int connectionAttempt;
@ -61,6 +63,8 @@ namespace MongoDB.Driver {
MongoServerSettings settings
) {
this.settings = settings.Freeze();
this.sequentialId = Interlocked.Increment(ref nextSequentialId);
// Console.WriteLine("MongoServer[{0}]: {1}", sequentialId, settings);
if (settings.ConnectionMode == ConnectionMode.ReplicaSet) {
// initialize the set of server instances from the seed list (might change once we connect)
@ -366,6 +370,13 @@ namespace MongoDB.Driver {
}
}
/// <summary>
/// Gets the unique sequential Id for this server.
/// </summary>
public virtual int SequentialId {
get { return sequentialId; }
}
/// <summary>
/// Gets the settings for this server.
/// </summary>
@ -464,7 +475,7 @@ namespace MongoDB.Driver {
/// Connects to the server. Normally there is no need to call this method as
/// the driver will connect to the server automatically when needed.
/// </summary>
/// <param name="waitFor">What to wait for before returning.</param>
/// <param name="waitFor">What to wait for before returning (when connecting to a replica set).</param>
public virtual void Connect(
ConnectWaitFor waitFor
) {
@ -488,7 +499,7 @@ namespace MongoDB.Driver {
/// the driver will connect to the server automatically when needed.
/// </summary>
/// <param name="timeout">How long to wait before timing out.</param>
/// <param name="waitFor">What to wait for before returning.</param>
/// <param name="waitFor">What to wait for before returning (when connecting to a replica set).</param>
public virtual void Connect(
TimeSpan timeout,
ConnectWaitFor waitFor
@ -497,10 +508,13 @@ namespace MongoDB.Driver {
switch (settings.ConnectionMode) {
case ConnectionMode.Direct:
if (state == MongoServerState.Disconnected) {
var directConnector = new DirectConnector(this, ++connectionAttempt);
connectionAttempt += 1;
// Console.WriteLine("MongoServer[{0}]: Connect(waitFor={1},attempt={2}).", sequentialId, waitFor, connectionAttempt);
var directConnector = new DirectConnector(this, connectionAttempt);
directConnector.Connect(timeout);
}
return;
case ConnectionMode.ReplicaSet:
var timeoutAt = DateTime.UtcNow + timeout;
while (true) {
@ -512,7 +526,7 @@ namespace MongoDB.Driver {
}
break;
case ConnectWaitFor.AnySlaveOk:
if (instances.Any(i => (i.IsPrimary || i.IsSecondary || i.IsPassive) && i.State == MongoServerState.Connected)) {
if (instances.Any(i => i.State == MongoServerState.Connected && (i.IsPrimary || i.IsSecondary || i.IsPassive))) {
return;
}
break;
@ -537,10 +551,13 @@ namespace MongoDB.Driver {
}
}
var replicaSetConnector = new ReplicaSetConnector(this, ++connectionAttempt);
connectionAttempt += 1;
// Console.WriteLine("MongoServer[{0}]: Connect(waitFor={1},attempt={2}).", sequentialId, waitFor, connectionAttempt);
var replicaSetConnector = new ReplicaSetConnector(this, connectionAttempt);
var remainingTimeout = timeoutAt - DateTime.UtcNow;
replicaSetConnector.Connect(remainingTimeout, waitFor);
return;
default:
throw new MongoInternalException("Invalid ConnectionMode.");
}
@ -594,16 +611,17 @@ namespace MongoDB.Driver {
/// you should be sure to have a good reason to call it.
/// </summary>
public virtual void Disconnect() {
// normally called from a connection when there is a SocketException
// but anyone can call it if they want to close all sockets to the server
lock (serverLock) {
foreach (var instance in Instances) {
instance.Disconnect();
}
// note: server state should have changed in response to InstanceStateChanged events
if (state != MongoServerState.Disconnected) {
throw new MongoInternalException("Disconnect failed to change MongoServerState to Disconnected.");
// Console.WriteLine("MongoServer[{0}]: Disconnect called.", sequentialId);
state = MongoServerState.Disconnecting;
try {
foreach (var instance in Instances) {
instance.Disconnect();
}
} finally {
state = MongoServerState.Disconnected;
}
}
}
}
@ -1050,6 +1068,7 @@ namespace MongoDB.Driver {
MongoServerInstance instance
) {
lock (stateLock) {
// Console.WriteLine("MongoServer[{0}]: Add MongoServerInstance[{1}].", sequentialId, instance.SequentialId);
if (instances.Any(i => i.Address == instance.Address)) {
var message = string.Format("A server instance already exists for address '{0}'.", instance.Address);
throw new ArgumentException(message);
@ -1064,36 +1083,48 @@ namespace MongoDB.Driver {
bool slaveOk
) {
lock (serverLock) {
if (state == MongoServerState.Unknown) {
VerifyUnknownStates();
}
var waitFor = slaveOk ? ConnectWaitFor.AnySlaveOk : ConnectWaitFor.Primary;
Connect(waitFor); // will do nothing if already sufficiently connected
// first try to choose an instance given the current state
// and only try to verify state or connect if necessary
for (int attempt = 1; attempt <= 2; attempt++) {
if (settings.ConnectionMode == ConnectionMode.ReplicaSet) {
if (slaveOk) {
// round robin the connected secondaries, fall back to primary if no secondary found
lock (stateLock) {
for (int i = 0; i < instances.Count; i++) {
// round robin (use if statement instead of mod because instances.Count can change
if (++loadBalancingInstanceIndex >= instances.Count) {
loadBalancingInstanceIndex = 0;
}
var instance = instances[loadBalancingInstanceIndex];
if (instance.State == MongoServerState.Connected && (instance.IsSecondary || instance.IsPassive)) {
return instance;
}
}
}
}
if (settings.ConnectionMode == ConnectionMode.ReplicaSet) {
if (slaveOk) {
// round robin the connected secondaries, fall back to primary if no secondary found
lock (stateLock) {
for (int i = 0; i < instances.Count; i++) {
loadBalancingInstanceIndex = (loadBalancingInstanceIndex + 1) % instances.Count; // round robin
var instance = instances[loadBalancingInstanceIndex];
if (instance.State == MongoServerState.Connected && (instance.IsSecondary || instance.IsPassive)) {
return instance;
}
if (primary != null && primary.State == MongoServerState.Connected) {
return primary;
}
}
} else {
var instance = Instance;
if (instance.State == MongoServerState.Connected) {
return instance;
}
}
lock (stateLock) {
if (primary == null) {
throw new MongoConnectionException("Primary server not found.");
if (attempt == 1) {
if (state == MongoServerState.Unknown) {
VerifyUnknownStates();
}
return primary;
var waitFor = slaveOk ? ConnectWaitFor.AnySlaveOk : ConnectWaitFor.Primary;
Connect(waitFor); // will do nothing if already sufficiently connected
}
} else {
return Instance;
}
throw new MongoConnectionException("Unable to choose a server instance.");
}
}
@ -1119,6 +1150,7 @@ namespace MongoDB.Driver {
MongoServerInstance instance
) {
lock (stateLock) {
// Console.WriteLine("MongoServer[{0}]: Remove MongoServerInstance[{1}].", sequentialId, instance.SequentialId);
instance.StateChanged -= InstanceStateChanged;
instances.Remove(instance);
InstanceStateChanged(instance, null); // removing an instance can change server state
@ -1133,6 +1165,7 @@ namespace MongoDB.Driver {
) {
lock (stateLock) {
var instance = (MongoServerInstance) sender;
// Console.WriteLine("MongoServer[{0}]: MongoServerInstance[{1}] state changed.", sequentialId, instance.SequentialId);
if (instance.IsPrimary && instance.State == MongoServerState.Connected && instances.Contains(instance)) {
if (primary != instance) {
@ -1144,35 +1177,29 @@ namespace MongoDB.Driver {
}
}
var oldState = state;
if (instances.All(i => i.State == MongoServerState.Connected)) {
state = MongoServerState.Connected;
} else if (instances.All(i => i.State == MongoServerState.Disconnected)) {
state = MongoServerState.Disconnected;
} else if (instances.Any(i => i.State == MongoServerState.Unknown)) {
state = MongoServerState.Unknown;
} else if (instances.Any(i => i.State == MongoServerState.Connecting)) {
state = MongoServerState.Connecting;
} else if (instances.Any(i => i.State == MongoServerState.Connected)) {
state = MongoServerState.ConnectedToSubset;
// the order of the tests is significant
// and resolves ambiguities when more than one state might match
if (state == MongoServerState.Disconnecting) {
if (instances.All(i => i.State == MongoServerState.Disconnected)) {
state = MongoServerState.Disconnected;
}
} else {
throw new MongoInternalException("Unexpected server instance states.");
}
// when transitioning from or to Disconnected state the connection pool timers need to be started or stopped
if (state != oldState && (oldState == MongoServerState.Disconnected || state == MongoServerState.Disconnected)) {
TimeSpan dueTime;
TimeSpan period;
if (oldState == MongoServerState.Disconnected) {
dueTime = TimeSpan.Zero;
period = TimeSpan.FromSeconds(10);
if (instances.All(i => i.State == MongoServerState.Disconnected)) {
state = MongoServerState.Disconnected;
} else if (instances.All(i => i.State == MongoServerState.Connected)) {
state = MongoServerState.Connected;
} else if (instances.Any(i => i.State == MongoServerState.Connecting)) {
state = MongoServerState.Connecting;
} else if (instances.Any(i => i.State == MongoServerState.Unknown)) {
state = MongoServerState.Unknown;
} else if (instances.Any(i => i.State == MongoServerState.Connected)) {
state = MongoServerState.ConnectedToSubset;
} else {
dueTime = period = TimeSpan.FromMilliseconds(-1); // disables the timer
}
foreach (var serverInstance in instances) {
serverInstance.ConnectionPool.Timer.Change(dueTime, period);
throw new MongoInternalException("Unexpected server instance states.");
}
}
// Console.WriteLine("MongoServer[{0}]: State={1}, Primary={2}.", sequentialId, state, (primary == null) ? "null" : primary.Address.ToString());
}
}

34
Driver/Core/MongoServerInstance.cs

@ -29,6 +29,10 @@ namespace MongoDB.Driver {
/// Represents an instance of a MongoDB server host (in the case of a replica set a MongoServer uses multiple MongoServerInstances).
/// </summary>
public class MongoServerInstance {
#region private static fields
private static int nextSequentialId;
#endregion
#region public events
/// <summary>
/// Occurs when the value of the State property changes.
@ -50,6 +54,7 @@ namespace MongoDB.Driver {
private bool isSecondary;
private int maxDocumentSize;
private int maxMessageLength;
private int sequentialId;
private MongoServer server;
private MongoServerState state; // always use property to set value so event gets raised
#endregion
@ -61,10 +66,12 @@ namespace MongoDB.Driver {
) {
this.server = server;
this.address = address;
this.sequentialId = Interlocked.Increment(ref nextSequentialId);
this.maxDocumentSize = MongoDefaults.MaxDocumentSize;
this.maxMessageLength = MongoDefaults.MaxMessageLength;
this.state = MongoServerState.Disconnected;
this.connectionPool = new MongoConnectionPool(this);
// Console.WriteLine("MongoServerInstance[{0}]: {1}", sequentialId, address);
}
#endregion
@ -162,6 +169,13 @@ namespace MongoDB.Driver {
get { return maxMessageLength; }
}
/// <summary>
/// Gets the unique sequential Id for this server instance.
/// </summary>
public int SequentialId {
get { return sequentialId; }
}
/// <summary>
/// Gets the server for this server instance.
/// </summary>
@ -177,7 +191,7 @@ namespace MongoDB.Driver {
internal set {
lock (serverInstanceLock) {
if (state != value) {
// Console.WriteLine("{0} state: {1}{2}", address, value, isPrimary ? " (Primary)" : "");
// Console.WriteLine("MongoServerInstance[{0}]: State changed: state={1}{2}", sequentialId, value, isPrimary ? " (Primary)" : "");
state = value;
if (StateChanged != null) {
try { StateChanged(this, null); } catch { } // ignore exceptions
@ -207,17 +221,24 @@ namespace MongoDB.Driver {
/// </summary>
public void VerifyState() {
lock (serverInstanceLock) {
// Console.WriteLine("MongoServerInstance[{0}]: VerifyState called.", sequentialId);
// if ping fails assume all connections in the connection pool are doomed
try {
Ping();
} catch {
} catch (Exception ex) {
// Console.WriteLine("MongoServerInstance[{0}]: Ping failed: {1}.", sequentialId, ex.Message);
connectionPool.Clear();
}
var connection = connectionPool.AcquireConnection(null);
try {
var previousState = state;
VerifyState(connection);
try {
VerifyState(connection);
} catch (Exception ex) {
// ignore exceptions (if any occured state will already be set to Disconnected)
// Console.WriteLine("MongoServerInstance[{0}]: VerifyState failed: {1}.", sequentialId, ex.Message);
}
if (state != previousState && state == MongoServerState.Disconnected) {
connectionPool.Clear();
}
@ -256,6 +277,7 @@ namespace MongoDB.Driver {
internal void Connect(
bool slaveOk
) {
// Console.WriteLine("MongoServerInstance[{0}]: Connect(slaveOk={1}) called.", sequentialId, slaveOk);
lock (serverInstanceLock) {
// note: don't check that state is Disconnected here
// when reconnecting to a replica set state can transition from Connected -> Connecting -> Connected
@ -290,9 +312,14 @@ namespace MongoDB.Driver {
}
internal void Disconnect() {
// Console.WriteLine("MongoServerInstance[{0}]: Disconnect called.", sequentialId);
lock (serverInstanceLock) {
if (state == MongoServerState.Disconnecting) {
throw new MongoInternalException("Disconnect called while disconnecting.");
}
if (state != MongoServerState.Disconnected) {
try {
State = MongoServerState.Disconnecting;
connectionPool.Clear();
} finally {
State = MongoServerState.Disconnected;
@ -360,6 +387,7 @@ namespace MongoDB.Driver {
this.maxMessageLength = MongoDefaults.MaxMessageLength;
this.buildInfo = null;
this.State = MongoServerState.Disconnected;
throw;
}
}
#endregion

14
Driver/Core/MongoServerState.cs

@ -23,10 +23,6 @@ namespace MongoDB.Driver {
/// The state of a MongoServer instance.
/// </summary>
public enum MongoServerState {
/// <summary>
/// The state has not yet been determined.
/// </summary>
Unknown = 0,
/// <summary>
/// Disconnected from the server.
/// </summary>
@ -42,6 +38,14 @@ namespace MongoDB.Driver {
/// <summary>
/// Connected to a subset of the replica set members.
/// </summary>
ConnectedToSubset
ConnectedToSubset,
/// <summary>
/// The state is temporarily unknown.
/// </summary>
Unknown,
/// <summary>
/// Disconnecting from the server (in progress).
/// </summary>
Disconnecting
}
}

18
Driver/Internal/MongoConnectionPool.cs

@ -46,8 +46,9 @@ namespace MongoDB.Driver.Internal {
this.serverInstance = serverInstance;
poolSize = 0;
var disabled = TimeSpan.FromMilliseconds(-1);
timer = new Timer(TimerCallback, null, disabled, disabled); // will get started when server instance state changes to Connected
var dueTime = TimeSpan.FromSeconds(0);
var period = TimeSpan.FromSeconds(10);
timer = new Timer(TimerCallback, null, dueTime, period);
}
#endregion
@ -81,12 +82,6 @@ namespace MongoDB.Driver.Internal {
}
#endregion
#region internal properties
internal Timer Timer {
get { return timer; }
}
#endregion
#region internal methods
internal MongoConnection AcquireConnection(
MongoDatabase database
@ -254,11 +249,18 @@ namespace MongoDB.Driver.Internal {
private void TimerCallback(
object state // not used
) {
var server = serverInstance.Server;
if (server.State == MongoServerState.Disconnected || server.State == MongoServerState.Disconnecting) {
return;
}
// if another timer callback occurs before we are done with the first one just exit
if (inTimerCallback) {
// Console.WriteLine("MongoConnectionPool[{0}] TimerCallback skipped because previous callback has not completed.", serverInstance.SequentialId);
return;
}
// Console.WriteLine("MongoConnectionPool[{0}]: TimerCallback called.", serverInstance.SequentialId);
inTimerCallback = true;
try {
// on every timer callback verify the state of the server instance because it might have changed

9
Driver/Internal/ReplicaSetConnector.cs

@ -227,10 +227,17 @@ namespace MongoDB.Driver.Internal {
) {
responses.Add(response);
// don't process response if it threw an exception or if the connect attempt has been abandoned
// don't process response if it threw an exception
if (response.Exception != null) {
return;
}
// don't process response if Disconnect was called before the response was received
if (server.State == MongoServerState.Disconnected || server.State == MongoServerState.Disconnecting) {
return;
}
// don't process response if it was for a previous connection attempt
if (server.State == MongoServerState.Disconnected || connectionAttempt != server.ConnectionAttempt) {
return;
}

Loading…
Cancel
Save