You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

208 lines
8.3 KiB

  1. /* Copyright 2010 10gen Inc.
  2. *
  3. * Licensed under the Apache License, Version 2.0 (the "License");
  4. * you may not use this file except in compliance with the License.
  5. * You may obtain a copy of the License at
  6. *
  7. * http://www.apache.org/licenses/LICENSE-2.0
  8. *
  9. * Unless required by applicable law or agreed to in writing, software
  10. * distributed under the License is distributed on an "AS IS" BASIS,
  11. * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. * See the License for the specific language governing permissions and
  13. * limitations under the License.
  14. */
  15. using System;
  16. using System.Collections.Generic;
  17. using System.Linq;
  18. using System.Text;
  19. using System.Threading;
  20. using MongoDB.Bson;
  21. namespace MongoDB.Driver.Internal {
  22. internal class ReplicaSetConnector {
  23. #region private fields
  24. private MongoUrl url;
  25. private HashSet<MongoServerAddress> queries = new HashSet<MongoServerAddress>();
  26. private Dictionary<MongoServerAddress, QueryNodeResponse> responses = new Dictionary<MongoServerAddress, QueryNodeResponse>();
  27. private MongoConnection primaryConnection;
  28. private List<MongoConnection> secondaryConnections = new List<MongoConnection>();
  29. private List<MongoServerAddress> replicaSet;
  30. #endregion
  31. #region constructors
  32. public ReplicaSetConnector(
  33. MongoUrl url
  34. ) {
  35. this.url = url;
  36. }
  37. #endregion
  38. #region public properties
  39. public MongoConnection PrimaryConnection {
  40. get { return primaryConnection; }
  41. }
  42. public List<MongoConnection> SecondaryConnections {
  43. get { return secondaryConnections; }
  44. }
  45. public IEnumerable<MongoServerAddress> ReplicaSet {
  46. get { return replicaSet; }
  47. }
  48. #endregion
  49. #region public methods
  50. public void Connect(
  51. TimeSpan timeout
  52. ) {
  53. DateTime deadline = DateTime.UtcNow + timeout;
  54. // query all servers in seed list in parallel (they will report responses back through the responsesQueue)
  55. var responsesQueue = QuerySeedListNodes();
  56. // process the responses as they come back and stop as soon as we find the primary (unless slaveOk is true)
  57. // stragglers will continue to report responses to the responsesQueue but no one will read them
  58. // and eventually it will all get garbage collected
  59. var exceptions = new List<Exception>();
  60. while (responses.Count < queries.Count) {
  61. var response = responsesQueue.Dequeue(deadline);
  62. if (response == null) {
  63. break; // we timed out
  64. }
  65. responses.Add(response.Address, response);
  66. if (response.Exception != null) {
  67. exceptions.Add(response.Exception);
  68. continue;
  69. }
  70. if (response.IsPrimary) {
  71. primaryConnection = response.Connection;
  72. replicaSet = GetHostAddresses(response);
  73. if (!url.SlaveOk) {
  74. break; // if we're not going to use the secondaries no need to wait for their replies
  75. }
  76. } else {
  77. if (url.SlaveOk) {
  78. secondaryConnections.Add(response.Connection);
  79. } else {
  80. response.Connection.Close();
  81. }
  82. }
  83. // look for additional members of the replica set that might not have been in the seed list and query them also
  84. foreach (var address in GetHostAddresses(response)) {
  85. if (!queries.Contains(address)) {
  86. var args = new QueryNodeParameters {
  87. Address = address,
  88. ResponseQueue = responsesQueue
  89. };
  90. ThreadPool.QueueUserWorkItem(QueryNodeWorkItem, args);
  91. queries.Add(address);
  92. }
  93. }
  94. }
  95. if (primaryConnection == null) {
  96. var innerException = exceptions.FirstOrDefault();
  97. var exception = new MongoConnectionException("Unable to connect to server", innerException);
  98. if (exceptions.Count > 1) {
  99. exception.Data.Add("InnerExceptions", exceptions);
  100. }
  101. throw exception;
  102. }
  103. }
  104. #endregion
  105. #region private methods
  106. private List<MongoServerAddress> GetHostAddresses(
  107. QueryNodeResponse response
  108. ) {
  109. if (!response.IsMasterResult.Contains("hosts")) {
  110. var message = string.Format("Server is not a member of a replica set: {0}", response.Address);
  111. throw new MongoConnectionException(message);
  112. }
  113. var nodes = new List<MongoServerAddress>();
  114. foreach (BsonString host in response.IsMasterResult["hosts"].AsBsonArray.Values) {
  115. var address = MongoServerAddress.Parse(host.Value);
  116. nodes.Add(address);
  117. }
  118. return nodes;
  119. }
  120. private BlockingQueue<QueryNodeResponse> QuerySeedListNodes() {
  121. var responseQueue = new BlockingQueue<QueryNodeResponse>();
  122. foreach (var address in url.Servers) {
  123. var args = new QueryNodeParameters {
  124. Address = address,
  125. ResponseQueue = responseQueue
  126. };
  127. ThreadPool.QueueUserWorkItem(QueryNodeWorkItem, args);
  128. queries.Add(address);
  129. }
  130. return responseQueue;
  131. }
  132. // note: this method will run on a thread from the ThreadPool
  133. private void QueryNodeWorkItem(
  134. object parameters
  135. ) {
  136. // this method has to work at a very low level because the connection pool isn't set up yet
  137. var args = (QueryNodeParameters) parameters;
  138. var response = new QueryNodeResponse { Address = args.Address };
  139. try {
  140. var connection = new MongoConnection(null, args.Address); // no connection pool
  141. try {
  142. var isMasterCommand = new BsonDocument("ismaster", 1);
  143. var isMasterResult = connection.RunCommand("admin.$cmd", QueryFlags.SlaveOk, isMasterCommand);
  144. response.IsMasterResult = isMasterResult;
  145. response.Connection = connection; // might become the first connection in the connection pool
  146. response.IsPrimary = isMasterResult["ismaster", false].ToBoolean();
  147. if (url.ReplicaSetName != null) {
  148. var getStatusCommand = new BsonDocument("replSetGetStatus", 1);
  149. var getStatusResult = connection.RunCommand("admin.$cmd", QueryFlags.SlaveOk, getStatusCommand);
  150. var replicaSetName = getStatusResult["set"].AsString;
  151. if (replicaSetName != url.ReplicaSetName) {
  152. var message = string.Format("Host {0} belongs to a different replica set: {1}", args.Address, replicaSetName);
  153. throw new MongoConnectionException(message);
  154. }
  155. }
  156. } catch {
  157. try { connection.Close(); } catch { } // ignore exceptions
  158. throw;
  159. }
  160. } catch (Exception ex) {
  161. response.Exception = ex;
  162. }
  163. args.ResponseQueue.Enqueue(response);
  164. }
  165. #endregion
  166. #region private nested classes
  167. // note: OK to use automatic properties on private helper class
  168. private class QueryNodeParameters {
  169. public MongoServerAddress Address { get; set; }
  170. public BlockingQueue<QueryNodeResponse> ResponseQueue { get; set; }
  171. }
  172. // note: OK to use automatic properties on private helper class
  173. private class QueryNodeResponse {
  174. public MongoServerAddress Address { get; set; }
  175. public BsonDocument IsMasterResult { get; set; }
  176. public bool IsPrimary { get; set; }
  177. public MongoConnection Connection { get; set; }
  178. public Exception Exception { get; set; }
  179. }
  180. #endregion
  181. }
  182. }