Browse Source

Mostly completed work on CSHARP-183, CSHARP-201, CSHARP-302, CSHARP-303: improvements to replica set failover. In MongoServer: added VerifyState and VerifyUnknownStates, ChooseServerInstance now calls VerifyUnknownStates when server state is Unknown, InstanceStateChanged now knows about Unknown and handles starting and stopping the connection pool timer. In MongoServerInstance: now creates a single instance of connectionPool at constructor time (so it's never null and never replaced), added VerifyState, Connect now uses VerifyState. In MongoServerState: added Unknown state. In MongoConnection: removed Damaged from MongoConnectionState, added generationId. In MongoConnectionPool: removed closed and added generationId, added inTimerCallback, timer is now started and stopped by MongoServer, Clear closes all connections and increments the generationId, renamed CreateInitialConnectionsWorkItem to EnsureMinConnectionPoolSizeWorkItem and now queue it from the timer rather than from the constructor, TimerCallback now calls VerifyState to discover when offline replica set members come back online. Initial testing looks good but more testing is required.

pull/66/merge
rstam 14 years ago
parent
commit
c9ce8c1f16
  1. 53
      Driver/Core/MongoServer.cs
  2. 137
      Driver/Core/MongoServerInstance.cs
  3. 2
      Driver/Core/MongoServerState.cs
  4. 47
      Driver/Internal/MongoConnection.cs
  5. 121
      Driver/Internal/MongoConnectionPool.cs

53
Driver/Core/MongoServer.cs

@ -986,6 +986,17 @@ namespace MongoDB.Driver {
}
}
}
/// <summary>
/// Verifies the state of the server (in the case of a replica set all members are contacted one at a time).
/// </summary>
public void VerifyState() {
lock (serverLock) {
foreach (var instance in instances) {
instance.VerifyState();
}
}
}
#endregion
#region internal methods
@ -1053,8 +1064,12 @@ namespace MongoDB.Driver {
bool slaveOk
) {
lock (serverLock) {
if (state == MongoServerState.Unknown) {
VerifyUnknownStates();
}
var waitFor = slaveOk ? ConnectWaitFor.AnySlaveOk : ConnectWaitFor.Primary;
Connect(waitFor);
Connect(waitFor); // will do nothing if already sufficiently connected
if (settings.ConnectionMode == ConnectionMode.ReplicaSet) {
if (slaveOk) {
@ -1119,8 +1134,8 @@ namespace MongoDB.Driver {
lock (stateLock) {
var instance = (MongoServerInstance) sender;
if (instances.Contains(instance)) {
if (instance.IsPrimary && instance.State == MongoServerState.Connected && primary != instance) {
if (instance.IsPrimary && instance.State == MongoServerState.Connected && instances.Contains(instance)) {
if (primary != instance) {
primary = instance; // new primary
}
} else {
@ -1129,14 +1144,44 @@ namespace MongoDB.Driver {
}
}
var oldState = state;
if (instances.All(i => i.State == MongoServerState.Connected)) {
state = MongoServerState.Connected;
} else if (instances.All(i => i.State == MongoServerState.Disconnected)) {
state = MongoServerState.Disconnected;
} else if (instances.Any(i => i.State == MongoServerState.Unknown)) {
state = MongoServerState.Unknown;
} else if (instances.Any(i => i.State == MongoServerState.Connecting)) {
state = MongoServerState.Connecting;
} else if (instances.Any(i => i.State == MongoServerState.Connected)) {
state = MongoServerState.ConnectedToSubset;
} else {
state = MongoServerState.Disconnected;
throw new MongoInternalException("Unexpected server instance states.");
}
// when transitioning from or to Disconnected state the connection pool timers need to be started or stopped
if (state != oldState && (oldState == MongoServerState.Disconnected || state == MongoServerState.Disconnected)) {
TimeSpan dueTime;
TimeSpan period;
if (oldState == MongoServerState.Disconnected) {
dueTime = TimeSpan.Zero;
period = TimeSpan.FromSeconds(10);
} else {
dueTime = period = TimeSpan.FromMilliseconds(-1); // disables the timer
}
foreach (var serverInstance in instances) {
serverInstance.ConnectionPool.Timer.Change(dueTime, period);
}
}
}
}
private void VerifyUnknownStates() {
lock (serverLock) {
foreach (var instance in instances) {
if (instance.State == MongoServerState.Unknown) {
instance.VerifyState();
}
}
}
}

137
Driver/Core/MongoServerInstance.cs

@ -64,6 +64,7 @@ namespace MongoDB.Driver {
this.maxDocumentSize = MongoDefaults.MaxDocumentSize;
this.maxMessageLength = MongoDefaults.MaxMessageLength;
this.state = MongoServerState.Disconnected;
this.connectionPool = new MongoConnectionPool(this);
}
#endregion
@ -187,7 +188,7 @@ namespace MongoDB.Driver {
}
#endregion
#region public method
#region public methods
/// <summary>
/// Checks whether the server is alive (throws an exception if not).
/// </summary>
@ -197,7 +198,32 @@ namespace MongoDB.Driver {
var pingCommand = new CommandDocument("ping", 1);
connection.RunCommand("admin.$cmd", QueryFlags.SlaveOk, pingCommand);
} finally {
connection.ConnectionPool.ReleaseConnection(connection);
connectionPool.ReleaseConnection(connection);
}
}
/// <summary>
/// Verifies the state of the server instance.
/// </summary>
public void VerifyState() {
lock (serverInstanceLock) {
// if ping fails assume all connections in the connection pool are doomed
try {
Ping();
} catch {
connectionPool.Clear();
}
var connection = connectionPool.AcquireConnection(null);
try {
var previousState = state;
VerifyState(connection);
if (state != previousState && state == MongoServerState.Disconnected) {
connectionPool.Clear();
}
} finally {
ReleaseConnection(connection);
}
}
}
#endregion
@ -239,58 +265,21 @@ namespace MongoDB.Driver {
try {
endPoint = address.ToIPEndPoint(server.Settings.AddressFamily);
if (connectionPool == null) {
connectionPool = new MongoConnectionPool(this);
}
try {
var connection = connectionPool.AcquireConnection(null);
try {
try {
var isMasterCommand = new CommandDocument("ismaster", 1);
isMasterResult = connection.RunCommand("admin.$cmd", QueryFlags.SlaveOk, isMasterCommand);
} catch (MongoCommandException ex) {
isMasterResult = ex.CommandResult;
throw;
}
isPrimary = isMasterResult.Response["ismaster", false].ToBoolean();
isSecondary = isMasterResult.Response["secondary", false].ToBoolean();
isPassive = isMasterResult.Response["passive", false].ToBoolean();
isArbiter = isMasterResult.Response["arbiterOnly", false].ToBoolean();
// workaround for CSHARP-273
if (isPassive && isArbiter) { isPassive = false; }
VerifyState(connection);
if (!isPrimary && !slaveOk) {
throw new MongoConnectionException("Server is not a primary and SlaveOk is false.");
}
maxDocumentSize = isMasterResult.Response["maxBsonObjectSize", MongoDefaults.MaxDocumentSize].ToInt32();
maxMessageLength = Math.Max(MongoDefaults.MaxMessageLength, maxDocumentSize + 1024); // derived from maxDocumentSize
var buildInfoCommand = new CommandDocument("buildinfo", 1);
var buildInfoResult = connection.RunCommand("admin.$cmd", QueryFlags.SlaveOk, buildInfoCommand);
buildInfo = new MongoServerBuildInfo(
buildInfoResult.Response["bits"].ToInt32(), // bits
buildInfoResult.Response["gitVersion"].AsString, // gitVersion
buildInfoResult.Response["sysInfo"].AsString, // sysInfo
buildInfoResult.Response["version"].AsString // versionString
);
} finally {
connection.ConnectionPool.ReleaseConnection(connection);
connectionPool.ReleaseConnection(connection);
}
} catch {
if (connectionPool != null) {
connectionPool.Close();
connectionPool = null;
}
connectionPool.Clear();
throw;
}
// for the primary only immediately start creating connections to reach MinConnectionPoolSize
if (isPrimary) {
connectionPool.CreateInitialConnections(); // will be done on a background thread
}
State = MongoServerState.Connected;
} catch (Exception ex) {
State = MongoServerState.Disconnected;
@ -304,11 +293,7 @@ namespace MongoDB.Driver {
lock (serverInstanceLock) {
if (state != MongoServerState.Disconnected) {
try {
// if we fail during Connect the connectionPool field will still be null
if (connectionPool != null) {
connectionPool.Close();
connectionPool = null;
}
connectionPool.Clear();
} finally {
State = MongoServerState.Disconnected;
}
@ -320,9 +305,61 @@ namespace MongoDB.Driver {
MongoConnection connection
) {
lock (serverInstanceLock) {
// the connection might belong to a connection pool that has already been discarded
// so always release it to the connection pool it came from and not the current pool
connection.ConnectionPool.ReleaseConnection(connection);
connectionPool.ReleaseConnection(connection);
}
}
internal void VerifyState(
MongoConnection connection
) {
CommandResult isMasterResult = null;
try {
try {
var isMasterCommand = new CommandDocument("ismaster", 1);
isMasterResult = connection.RunCommand("admin.$cmd", QueryFlags.SlaveOk, isMasterCommand);
} catch (MongoCommandException ex) {
isMasterResult = ex.CommandResult;
throw;
}
var isPrimary = isMasterResult.Response["ismaster", false].ToBoolean();
var isSecondary = isMasterResult.Response["secondary", false].ToBoolean();
var isPassive = isMasterResult.Response["passive", false].ToBoolean();
var isArbiter = isMasterResult.Response["arbiterOnly", false].ToBoolean();
// workaround for CSHARP-273
if (isPassive && isArbiter) { isPassive = false; }
var maxDocumentSize = isMasterResult.Response["maxBsonObjectSize", MongoDefaults.MaxDocumentSize].ToInt32();
var maxMessageLength = Math.Max(MongoDefaults.MaxMessageLength, maxDocumentSize + 1024); // derived from maxDocumentSize
var buildInfoCommand = new CommandDocument("buildinfo", 1);
var buildInfoResult = connection.RunCommand("admin.$cmd", QueryFlags.SlaveOk, buildInfoCommand);
var buildInfo = new MongoServerBuildInfo(
buildInfoResult.Response["bits"].ToInt32(), // bits
buildInfoResult.Response["gitVersion"].AsString, // gitVersion
buildInfoResult.Response["sysInfo"].AsString, // sysInfo
buildInfoResult.Response["version"].AsString // versionString
);
this.isMasterResult = isMasterResult;
this.isPrimary = isPrimary;
this.isSecondary = isSecondary;
this.isPassive = isPassive;
this.isArbiter = isArbiter;
this.maxDocumentSize = maxDocumentSize;
this.maxMessageLength = maxMessageLength;
this.buildInfo = buildInfo;
this.State = MongoServerState.Connected;
} catch {
this.isMasterResult = isMasterResult;
this.isPrimary = false;
this.isSecondary = false;
this.isPassive = false;
this.isArbiter = false;
this.maxDocumentSize = MongoDefaults.MaxDocumentSize;
this.maxMessageLength = MongoDefaults.MaxMessageLength;
this.buildInfo = null;
this.State = MongoServerState.Disconnected;
}
}
#endregion

2
Driver/Core/MongoServerState.cs

@ -26,7 +26,7 @@ namespace MongoDB.Driver {
/// <summary>
/// The state has not yet been determined.
/// </summary>
None = 0,
Unknown = 0,
/// <summary>
/// Disconnected from the server.
/// </summary>

47
Driver/Internal/MongoConnection.cs

@ -39,10 +39,6 @@ namespace MongoDB.Driver.Internal {
/// </summary>
Open,
/// <summary>
/// The connection is damaged.
/// </summary>
Damaged,
/// <summary>
/// The connection is closed.
/// </summary>
Closed
@ -56,6 +52,7 @@ namespace MongoDB.Driver.Internal {
private object connectionLock = new object();
private MongoServerInstance serverInstance;
private MongoConnectionPool connectionPool;
private int generationId; // the generationId of the connection pool at the time this connection was created
private MongoConnectionState state;
private TcpClient tcpClient;
private DateTime createdAt;
@ -71,6 +68,7 @@ namespace MongoDB.Driver.Internal {
) {
this.serverInstance = connectionPool.ServerInstance;
this.connectionPool = connectionPool;
this.generationId = connectionPool.GenerationId;
this.createdAt = DateTime.UtcNow;
this.state = MongoConnectionState.Initial;
}
@ -91,6 +89,13 @@ namespace MongoDB.Driver.Internal {
get { return createdAt; }
}
/// <summary>
/// Gets the generation of the connection pool that this connection belongs to.
/// </summary>
public int GenerationId {
get { return generationId; }
}
/// <summary>
/// Gets the DateTime that this connection was last used at.
/// </summary>
@ -462,42 +467,44 @@ namespace MongoDB.Driver.Internal {
) {
// there are three possible situations:
// 1. we can keep using the connection
// 2. just this one connection needs to be discarded
// 3. the whole connection pool needs to be discarded
// 2. just this one connection needs to be closed
// 3. the whole connection pool needs to be cleared
switch (DetermineAction(ex)) {
case HandleExceptionAction.KeepConnection:
break;
case HandleExceptionAction.DiscardConnection:
state = MongoConnectionState.Damaged;
case HandleExceptionAction.CloseConnection:
Close();
break;
case HandleExceptionAction.DiscardConnectionPool:
state = MongoConnectionState.Damaged;
try {
serverInstance.Disconnect();
} catch { } // ignore exceptions
case HandleExceptionAction.ClearConnectionPool:
Close();
connectionPool.Clear();
break;
default:
throw new MongoInternalException("Invalid HandleExceptionAction");
}
// forces a call to VerifyState before the next message is sent to this server instance
// this is a bit drastic but at least it's safe (and perhaps we can optimize a bit in the future)
serverInstance.State = MongoServerState.Unknown;
}
private enum HandleExceptionAction {
KeepConnection,
DiscardConnection,
DiscardConnectionPool
CloseConnection,
ClearConnectionPool
}
private HandleExceptionAction DetermineAction(
Exception ex
) {
// TODO: figure out when to return KeepConnection or DiscardConnectionPool (if ever)
// TODO: figure out when to return KeepConnection or ClearConnectionPool (if ever)
// don't return DiscardConnectionPool unless you are *sure* it is the right action
// definitely don't make DiscardConnectionPool the default action
// returning DiscardConnectionPool frequently can result in Connect/Disconnect storms
// don't return ClearConnectionPool unless you are *sure* it is the right action
// definitely don't make ClearConnectionPool the default action
// returning ClearConnectionPool frequently can result in Connect/Disconnect storms
return HandleExceptionAction.DiscardConnection; // this should always be the default action
return HandleExceptionAction.CloseConnection; // this should always be the default action
}
#endregion

121
Driver/Internal/MongoConnectionPool.cs

@ -29,11 +29,12 @@ namespace MongoDB.Driver.Internal {
private object connectionPoolLock = new object();
private MongoServer server;
private MongoServerInstance serverInstance;
private bool closed = false;
private int poolSize;
private List<MongoConnection> availableConnections = new List<MongoConnection>();
private int generationId; // whenever the pool is cleared the generationId is incremented
private int waitQueueSize;
private Timer timer;
private bool inTimerCallback;
private int connectionsRemovedSinceLastTimerTick;
#endregion
@ -43,9 +44,10 @@ namespace MongoDB.Driver.Internal {
) {
this.server = serverInstance.Server;
this.serverInstance = serverInstance;
poolSize = 0;
timer = new Timer(TimerCallback, null, TimeSpan.FromSeconds(1), TimeSpan.FromSeconds(10));
var disabled = TimeSpan.FromMilliseconds(-1);
timer = new Timer(TimerCallback, null, disabled, disabled); // will get started when server instance state changes to Connected
}
#endregion
@ -64,6 +66,13 @@ namespace MongoDB.Driver.Internal {
get { return poolSize; }
}
/// <summary>
/// Gets the current generation Id of the connection pool.
/// </summary>
public int GenerationId {
get { return generationId; }
}
/// <summary>
/// Gets the server instance.
/// </summary>
@ -72,6 +81,12 @@ namespace MongoDB.Driver.Internal {
}
#endregion
#region internal properties
internal Timer Timer {
get { return timer; }
}
#endregion
#region internal methods
internal MongoConnection AcquireConnection(
MongoDatabase database
@ -89,10 +104,6 @@ namespace MongoDB.Driver.Internal {
try {
DateTime timeoutAt = DateTime.UtcNow + server.Settings.WaitQueueTimeout;
while (true) {
if (closed) {
throw new InvalidOperationException("Attempt to get a connection from a closed connection pool.");
}
if (availableConnections.Count > 0) {
// first try to find the most recently used connection that is already authenticated for this database
for (int i = availableConnections.Count - 1; i >= 0; i--) {
@ -141,31 +152,26 @@ namespace MongoDB.Driver.Internal {
}
}
internal void Close() {
internal void Clear() {
lock (connectionPoolLock) {
if (!closed) {
foreach (var connection in availableConnections) {
connection.Close();
}
availableConnections = null;
closed = true;
Monitor.Pulse(connectionPoolLock);
foreach (var connection in availableConnections) {
connection.Close();
}
availableConnections.Clear();
generationId += 1;
Monitor.Pulse(connectionPoolLock);
}
}
internal void CreateInitialConnections() {
ThreadPool.QueueUserWorkItem(CreateInitialConnectionsWorkItem);
}
internal void CreateInitialConnectionsWorkItem(
internal void EnsureMinConnectionPoolSizeWorkItem(
object state // ignored
) {
// keep creating connections one at a time until MinConnectionPoolSize is reached
var forGenerationId = (int) state;
while (true) {
lock (connectionPoolLock) {
// stop if connection pool has been closed or we have already reached MinConnectionPoolSize
if (closed || poolSize >= server.Settings.MinConnectionPoolSize) {
// stop if the connection pool generationId has changed or we have already reached MinConnectionPoolSize
if (generationId != forGenerationId || poolSize >= server.Settings.MinConnectionPoolSize) {
return;
}
}
@ -179,7 +185,7 @@ namespace MongoDB.Driver.Internal {
// and we don't want to throw this one away unless we would exceed MaxConnectionPoolSize
var added = false;
lock (connectionPoolLock) {
if (poolSize < server.Settings.MaxConnectionPoolSize) {
if (generationId == forGenerationId && poolSize < server.Settings.MaxConnectionPoolSize) {
availableConnections.Add(connection);
poolSize++;
added = true;
@ -206,13 +212,13 @@ namespace MongoDB.Driver.Internal {
}
lock (connectionPoolLock) {
// if connection pool is closed just close connection
if (closed) {
// if connection is from another generation of the pool just close it
if (connection.GenerationId != generationId) {
connection.Close();
return;
}
// don't put closed or damaged connections back in the pool
// if the connection is no longer open don't remove it from the pool
if (connection.State != MongoConnectionState.Open) {
RemoveConnection(connection);
return;
@ -248,35 +254,52 @@ namespace MongoDB.Driver.Internal {
private void TimerCallback(
object state // not used
) {
lock (connectionPoolLock) {
// if connection pool is closed stop the timer
if (closed) {
timer.Dispose();
return;
}
// if another timer callback occurs before we are done with the first one just exit
if (inTimerCallback) {
return;
}
// only remove one connection per timer tick to avoid reconnection storms
if (connectionsRemovedSinceLastTimerTick == 0) {
MongoConnection oldestConnection = null;
MongoConnection lruConnection = null;
foreach (var connection in availableConnections) {
if (oldestConnection == null || connection.CreatedAt < oldestConnection.CreatedAt) {
oldestConnection = connection;
inTimerCallback = true;
try {
// on every timer callback verify the state of the server instance because it might have changed
// we do this even if this one instance is currently Disconnected so we can discover when a disconnected instance comes back online
serverInstance.VerifyState();
lock (connectionPoolLock) {
// note: the state could have changed to Disconnected when VerifyState was called
if (serverInstance.State == MongoServerState.Disconnected) {
return;
}
// only remove one connection per timer tick to avoid reconnection storms
if (connectionsRemovedSinceLastTimerTick == 0) {
MongoConnection oldestConnection = null;
MongoConnection lruConnection = null;
foreach (var connection in availableConnections) {
if (oldestConnection == null || connection.CreatedAt < oldestConnection.CreatedAt) {
oldestConnection = connection;
}
if (lruConnection == null || connection.LastUsedAt < lruConnection.LastUsedAt) {
lruConnection = connection;
}
}
if (lruConnection == null || connection.LastUsedAt < lruConnection.LastUsedAt) {
lruConnection = connection;
// remove old connections before idle connections
var now = DateTime.UtcNow;
if (oldestConnection != null && now > oldestConnection.CreatedAt + server.Settings.MaxConnectionLifeTime) {
RemoveConnection(oldestConnection);
} else if (poolSize > server.Settings.MinConnectionPoolSize && lruConnection != null && now > lruConnection.LastUsedAt + server.Settings.MaxConnectionIdleTime) {
RemoveConnection(lruConnection);
}
}
connectionsRemovedSinceLastTimerTick = 0;
}
// remove old connections before idle connections
var now = DateTime.UtcNow;
if (oldestConnection != null && now > oldestConnection.CreatedAt + server.Settings.MaxConnectionLifeTime) {
RemoveConnection(oldestConnection);
} else if (poolSize > server.Settings.MinConnectionPoolSize && lruConnection != null && now > lruConnection.LastUsedAt + server.Settings.MaxConnectionIdleTime) {
RemoveConnection(lruConnection);
}
if (poolSize < server.Settings.MinConnectionPoolSize) {
ThreadPool.QueueUserWorkItem(EnsureMinConnectionPoolSizeWorkItem, generationId);
}
connectionsRemovedSinceLastTimerTick = 0;
} finally {
inTimerCallback = false;
}
}
#endregion

Loading…
Cancel
Save