You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

467 lines
16 KiB

/* Copyright 2010 10gen Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using MongoDB.Bson;
using MongoDB.Driver.Internal;
namespace MongoDB.Driver {
public class MongoServer {
#region private static fields
private static object staticLock = new object();
private static Dictionary<MongoUrl, MongoServer> servers = new Dictionary<MongoUrl, MongoServer>();
#endregion
#region private fields
private object serverLock = new object();
private MongoServerState state = MongoServerState.Disconnected;
private IEnumerable<MongoServerAddress> replicaSet;
private Dictionary<string, MongoDatabase> databases = new Dictionary<string, MongoDatabase>();
private MongoConnectionPool connectionPool;
private MongoCredentials adminCredentials;
private MongoCredentials defaultCredentials;
private MongoUrl url;
#endregion
#region constructors
public MongoServer(
MongoUrl url
) {
this.url = url;
// credentials (if any) are for server only if no DatabaseName was provided
if (url.Credentials != null && url.DatabaseName == null) {
if (url.Credentials.Admin) {
this.adminCredentials = url.Credentials;
} else {
this.defaultCredentials = url.Credentials;
}
}
}
#endregion
#region factory methods
public static MongoServer Create() {
return Create("mongodb://localhost");
}
public static MongoServer Create(
MongoConnectionStringBuilder builder
) {
return Create(builder.ToMongoUrl());
}
public static MongoServer Create(
MongoUrl url
) {
lock (staticLock) {
MongoServer server;
if (!servers.TryGetValue(url, out server)) {
server = new MongoServer(url);
servers.Add(url, server);
}
return server;
}
}
public static MongoServer Create(
string connectionString
) {
if (connectionString.StartsWith("mongodb://")) {
var url = MongoUrl.Create(connectionString);
return Create(url);
} else {
MongoConnectionStringBuilder builder = new MongoConnectionStringBuilder(connectionString);
return Create(builder.ToMongoUrl());
}
}
public static MongoServer Create(
Uri uri
) {
return Create(MongoUrl.Create(uri.ToString()));
}
#endregion
#region public properties
public MongoCredentials AdminCredentials {
get { return adminCredentials; }
}
public MongoDatabase AdminDatabase {
get { return GetDatabase("admin", adminCredentials); }
}
public MongoCredentials DefaultCredentials {
get { return defaultCredentials; }
}
public IEnumerable<MongoServerAddress> ReplicaSet {
get { return replicaSet; }
}
public SafeMode SafeMode {
get { return url.SafeMode; }
}
public IEnumerable<MongoServerAddress> SeedList {
get { return url.Servers; }
}
public bool SlaveOk {
get { return url.SlaveOk; }
}
public MongoServerState State {
get { return state; }
}
public MongoUrl Url {
get { return url; }
}
#endregion
#region public indexers
public MongoDatabase this[
string databaseName
] {
get { return GetDatabase(databaseName); }
}
public MongoDatabase this[
string databaseName,
MongoCredentials credentials
] {
get { return GetDatabase(databaseName, credentials); }
}
#endregion
#region public methods
public void CloneDatabase(
string fromHost
) {
throw new NotImplementedException();
}
public void Connect() {
Connect(MongoDefaults.ConnectTimeout);
}
public void Connect(
TimeSpan timeout
) {
lock (serverLock) {
if (state != MongoServerState.Connected) {
state = MongoServerState.Connecting;
try {
// TODO: implement ConnectDirectly
var results = ConnectToReplicaSet(timeout);
List<MongoServerAddress> replicaSet = null;
if (results.CommandResult.Contains("hosts")) {
replicaSet = new List<MongoServerAddress>();
foreach (BsonString host in results.CommandResult["hosts"].AsBsonArray.Values) {
// don't let errors parsing the address prevent us from connecting
// the replicaSet just won't reflect any replicas with addresses we couldn't parse
MongoServerAddress address;
if (MongoServerAddress.TryParse(host.Value, out address)) {
replicaSet.Add(address);
}
}
}
this.replicaSet = replicaSet;
// the connection FindServer made to the primary becomes the first connection in the new connection pool
connectionPool = new MongoConnectionPool(this, results.Address, results.Connection);
state = MongoServerState.Connected;
} catch {
state = MongoServerState.Disconnected;
throw;
}
}
}
}
// TODO: fromHost parameter?
public void CopyDatabase(
string from,
string to
) {
throw new NotImplementedException();
}
public void Disconnect() {
// normally called from a connection when there is a SocketException
// but anyone can call it if they want to close all sockets to the server
lock (serverLock) {
if (state == MongoServerState.Connected) {
connectionPool.Close();
connectionPool = null;
state = MongoServerState.Disconnected;
}
}
}
public BsonDocument DropDatabase(
string databaseName
) {
MongoDatabase database = GetDatabase(databaseName);
var command = new BsonDocument("dropDatabase", 1);
return database.RunCommand(command);
}
public BsonDocument FetchDBRef(
MongoDBRef dbRef
) {
return FetchDBRefAs<BsonDocument>(dbRef);
}
public TDocument FetchDBRefAs<TDocument>(
MongoDBRef dbRef
) {
if (dbRef.DatabaseName == null) {
throw new ArgumentException("MongoDBRef DatabaseName missing");
}
var database = GetDatabase(dbRef.DatabaseName);
return database.FetchDBRefAs<TDocument>(dbRef);
}
public MongoDatabase GetDatabase(
string databaseName
) {
return GetDatabase(databaseName, defaultCredentials);
}
public MongoDatabase GetDatabase(
string databaseName,
MongoCredentials credentials
) {
lock (serverLock) {
string key;
if (credentials == null) {
key = databaseName;
} else {
key = string.Format("{0}[{1}]", databaseName, credentials);
}
MongoDatabase database;
if (!databases.TryGetValue(key, out database)) {
if (credentials == null) {
database = new MongoDatabase(this, databaseName);
} else {
database = new MongoDatabase(this, databaseName, credentials);
}
databases.Add(key, database);
}
return database;
}
}
public IEnumerable<string> GetDatabaseNames() {
return GetDatabaseNames(adminCredentials);
}
public IEnumerable<string> GetDatabaseNames(
MongoCredentials adminCredentials
) {
var adminDatabase = GetDatabase("admin", adminCredentials);
var result = adminDatabase.RunCommand("listDatabases");
var databaseNames = new List<string>();
foreach (BsonDocument database in result["databases"].AsBsonArray.Values) {
string databaseName = database["name"].AsString;
databaseNames.Add(databaseName);
}
databaseNames.Sort();
return databaseNames;
}
public void Reconnect() {
lock (serverLock) {
Disconnect();
Connect();
}
}
public BsonDocument RunAdminCommand<TCommand>(
MongoCredentials adminCredentials,
TCommand command
) {
var adminDatabase = GetDatabase("admin", adminCredentials);
return adminDatabase.RunCommand(command);
}
public BsonDocument RunAdminCommand<TCommand>(
TCommand command
) {
return RunAdminCommand(adminCredentials , command);
}
public BsonDocument RunAdminCommand(
MongoCredentials adminCredentials,
string commandName
) {
var adminDatabase = GetDatabase("admin", adminCredentials);
var command = new BsonDocument(commandName, true);
return adminDatabase.RunCommand(command);
}
public BsonDocument RunAdminCommand(
string commandName
) {
return RunAdminCommand(adminCredentials, commandName);
}
#endregion
#region private methods
private QueryServerResults ConnectToReplicaSet(
TimeSpan timeout
) {
DateTime deadline = DateTime.UtcNow + timeout;
// query all servers in seed list in parallel (they will report results back through the resultsQueue)
var resultsQueue = new BlockingQueue<QueryServerResults>();
var queriedServers = new HashSet<MongoServerAddress>();
int pendingReplies = 0;
foreach (var address in url.Servers) {
var args = new QueryServerParameters {
Address = address,
ResultsQueue = resultsQueue
};
ThreadPool.QueueUserWorkItem(QueryServerWorkItem, args);
queriedServers.Add(address);
pendingReplies++;
}
// process the results as they come back and stop as soon as we find the primary
// stragglers will continue to report results to the resultsQueue but no one will read them
// and eventually it will all get garbage collected
QueryServerResults results;
while (pendingReplies > 0 && (results = resultsQueue.Dequeue(deadline)) != null) {
pendingReplies--;
if (results.Exception != null) {
// TODO: how to report exceptions
continue;
}
var commandResult = results.CommandResult;
if (results.IsPrimary || url.SlaveOk) {
return results;
} else {
results.Connection.Close();
}
// look for additional members of the replica set that might not have been in the seed list and query them also
if (commandResult.Contains("hosts")) {
foreach (BsonString host in commandResult["hosts"].AsBsonArray.Values) {
var address = MongoServerAddress.Parse(host.Value);
if (!queriedServers.Contains(address)) {
var args = new QueryServerParameters {
Address = address,
ResultsQueue = resultsQueue
};
ThreadPool.QueueUserWorkItem(QueryServerWorkItem, args);
queriedServers.Add(address);
pendingReplies++;
}
}
}
}
throw new MongoConnectionException("Unable to connect to server");
}
// note: this method will run on a thread from the ThreadPool
private void QueryServerWorkItem(
object parameters
) {
// this method has to work at a very low level because the connection pool isn't set up yet
var args = (QueryServerParameters) parameters;
var results = new QueryServerResults { Address = args.Address };
try {
var connection = new MongoConnection(null, args.Address); // no connection pool
try {
var command = new BsonDocument("ismaster", 1);
using (
var message = new MongoQueryMessage<BsonDocument>(
"admin.$cmd",
QueryFlags.SlaveOk,
0, // numberToSkip
1, // numberToReturn
command,
null // fields
)
) {
connection.SendMessage(message, SafeMode.False);
}
var reply = connection.ReceiveMessage<BsonDocument>();
results.CommandResult = reply.Documents[0];
results.Connection = connection; // might become the first connection in the connection pool
if (
results.CommandResult["ok", false].ToBoolean() &&
results.CommandResult["ismaster", false].ToBoolean()
) {
results.IsPrimary = true;
}
} catch {
try { connection.Close(); } catch { } // ignore exceptions
throw;
}
} catch (Exception ex) {
results.Exception = ex;
}
args.ResultsQueue.Enqueue(results);
}
#endregion
#region internal methods
internal MongoConnectionPool GetConnectionPool() {
lock (serverLock) {
if (connectionPool == null) {
Connect();
}
return connectionPool;
}
}
#endregion
#region private nested classes
// note: OK to use automatic properties on private helper class
private class QueryServerParameters {
public MongoServerAddress Address { get; set; }
public BlockingQueue<QueryServerResults> ResultsQueue { get; set; }
}
// note: OK to use automatic properties on private helper class
private class QueryServerResults {
public MongoServerAddress Address { get; set; }
public BsonDocument CommandResult { get; set; }
public bool IsPrimary { get; set; }
public MongoConnection Connection { get; set; }
public Exception Exception { get; set; }
}
#endregion
}
}