You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

344 lines
10 KiB

4 years ago
4 years ago
4 years ago
  1. #if !NET20
  2. using System;
  3. using System.Collections.Generic;
  4. using System.IO;
  5. using System.Linq;
  6. using System.Threading.Tasks;
  7. namespace Apewer.WebSocket
  8. {
  9. /// <summary></summary>
  10. public class Connection
  11. {
  12. #region internal
  13. internal Connection(SocketWrapper socket, Action<Connection> initialize, Func<byte[], HttpRequest> parseRequest, Func<HttpRequest, ComposableHandler> handlerFactory, Func<IEnumerable<string>, string> negotiateSubProtocol)
  14. {
  15. Socket = socket;
  16. OnOpen = () => { };
  17. OnClose = () => { };
  18. OnMessage = x => { };
  19. OnBytes = x => { };
  20. OnPing = x => Pong(x);
  21. OnPong = x => { };
  22. OnError = x => { };
  23. _initialize = initialize;
  24. _handlerFactory = handlerFactory;
  25. _parseRequest = parseRequest;
  26. _negotiateSubProtocol = negotiateSubProtocol;
  27. }
  28. internal SocketWrapper Socket { get; set; }
  29. private readonly Action<Connection> _initialize;
  30. private readonly Func<HttpRequest, ComposableHandler> _handlerFactory;
  31. private readonly Func<IEnumerable<string>, string> _negotiateSubProtocol;
  32. readonly Func<byte[], HttpRequest> _parseRequest;
  33. internal ComposableHandler Handler { get; set; }
  34. private bool _closing;
  35. private bool _closed;
  36. private const int ReadSize = 1024 * 4;
  37. internal Action OnOpen { get; set; }
  38. internal Action OnClose { get; set; }
  39. internal Action<string> OnMessage { get; set; }
  40. internal Action<byte[]> OnBytes { get; set; }
  41. internal Action<byte[]> OnPing { get; set; }
  42. internal Action<byte[]> OnPong { get; set; }
  43. internal Action<Exception> OnError { get; set; }
  44. internal ConnectionInfo ConnectionInfo { get; private set; }
  45. /// <exception cref="InvalidOperationException"></exception>
  46. private Task Send<T>(T message, Func<T, byte[]> createFrame)
  47. {
  48. if (Handler == null) throw new InvalidOperationException("Cannot send before handshake.");
  49. if (!Available)
  50. {
  51. const string errorMessage = "Data sent while closing or after close. Ignoring.";
  52. WebSocketLog.Warn(errorMessage);
  53. var taskForException = new TaskCompletionSource<object>();
  54. taskForException.SetException(new ConnectionNotAvailableException(errorMessage));
  55. return taskForException.Task;
  56. }
  57. var bytes = createFrame(message);
  58. return SendBytes(bytes);
  59. }
  60. /// <summary></summary>
  61. internal void StartReceiving()
  62. {
  63. var data = new List<byte>(ReadSize);
  64. var buffer = new byte[ReadSize];
  65. Read(data, buffer);
  66. }
  67. internal void Close(int code)
  68. {
  69. if (!Available) return;
  70. _closing = true;
  71. if (Handler == null)
  72. {
  73. CloseSocket();
  74. return;
  75. }
  76. var bytes = Handler.FrameClose(code);
  77. if (bytes.Length == 0) CloseSocket();
  78. else SendBytes(bytes, CloseSocket);
  79. }
  80. internal void CreateHandler(IEnumerable<byte> data)
  81. {
  82. var request = _parseRequest(data.ToArray());
  83. if (request == null) return;
  84. Handler = _handlerFactory(request);
  85. if (Handler == null) return;
  86. var subProtocol = _negotiateSubProtocol(request.SubProtocols);
  87. ConnectionInfo = ConnectionInfo.Create(request, Socket.RemoteIpAddress, Socket.RemotePort, subProtocol);
  88. _initialize(this);
  89. var handshake = Handler.CreateHandshake(subProtocol);
  90. SendBytes(handshake, OnOpen);
  91. }
  92. private void Read(List<byte> data, byte[] buffer)
  93. {
  94. if (!Available) return;
  95. Socket.Receive(buffer, r =>
  96. {
  97. if (r <= 0)
  98. {
  99. WebSocketLog.Debug("0 bytes read. Closing.");
  100. CloseSocket();
  101. return;
  102. }
  103. WebSocketLog.Debug(r + " bytes read");
  104. var readBytes = buffer.Take(r);
  105. if (Handler != null)
  106. {
  107. Handler.Receive(readBytes);
  108. }
  109. else
  110. {
  111. data.AddRange(readBytes);
  112. CreateHandler(data);
  113. }
  114. Read(data, buffer);
  115. },
  116. HandleReadError);
  117. }
  118. private void HandleReadError(Exception e)
  119. {
  120. if (e is AggregateException)
  121. {
  122. var agg = e as AggregateException;
  123. HandleReadError(agg.InnerException);
  124. return;
  125. }
  126. if (e is ObjectDisposedException)
  127. {
  128. WebSocketLog.Debug("Swallowing ObjectDisposedException", e);
  129. return;
  130. }
  131. OnError(e);
  132. if (e is WebSocketException)
  133. {
  134. WebSocketLog.Debug("Error while reading", e);
  135. Close(((WebSocketException)e).StatusCode);
  136. }
  137. else if (e is SubProtocolNegotiationFailureException)
  138. {
  139. WebSocketLog.Debug(e.Message);
  140. Close(StatusCodes.ProtocolError);
  141. }
  142. else if (e is IOException)
  143. {
  144. WebSocketLog.Debug("Error while reading", e);
  145. Close(StatusCodes.AbnormalClosure);
  146. }
  147. else
  148. {
  149. WebSocketLog.Error("Application Error", e);
  150. Close(StatusCodes.InternalServerError);
  151. }
  152. }
  153. private Task SendBytes(byte[] bytes, Action callback = null)
  154. {
  155. return Socket.Send(bytes, () =>
  156. {
  157. WebSocketLog.Debug("Sent " + bytes.Length + " bytes");
  158. if (callback != null) callback();
  159. }, e =>
  160. {
  161. if (e is IOException) WebSocketLog.Debug("Failed to send. Disconnecting.", e);
  162. else WebSocketLog.Info("Failed to send. Disconnecting.", e);
  163. CloseSocket();
  164. });
  165. }
  166. private void CloseSocket()
  167. {
  168. _closing = true;
  169. OnClose();
  170. _closed = true;
  171. Socket.Close();
  172. Socket.Dispose();
  173. _closing = false;
  174. }
  175. #endregion
  176. #region properties
  177. /// <summary></summary>
  178. public bool Available
  179. {
  180. get { return !_closing && !_closed && Socket.Connected; }
  181. }
  182. /// <summary>客户端 IP 地址。</summary>
  183. public string Address
  184. {
  185. get { return (ConnectionInfo == null) ? "" : ConnectionInfo.ClientIpAddress; }
  186. }
  187. /// <summary>客户端端口。</summary>
  188. public int Port
  189. {
  190. get { return (ConnectionInfo == null) ? 0 : ConnectionInfo.ClientPort; }
  191. }
  192. /// <summary>获取主机信息。例:
  193. /// <para>从请求 ws://127.0.0.1:8000/page?field=value</para>
  194. /// <para>中获取 127.0.0.1:8000</para></summary>
  195. public string Host
  196. {
  197. get { return (ConnectionInfo == null) ? "" : ConnectionInfo.Host; }
  198. }
  199. /// <summary>获取源站信息。例:
  200. /// <para>从请求 ws://127.0.0.1:8000/page?field=value</para>
  201. /// <para>中获取 http://localhost</para></summary>
  202. public string Origin
  203. {
  204. get { return (ConnectionInfo == null) ? "" : ConnectionInfo.Origin; }
  205. }
  206. /// <summary>获取 URL 查询路径。例:
  207. /// <para>从请求 ws://127.0.0.1:8000/page?field=value</para>
  208. /// <para>中获取 /page?field=value</para></summary>
  209. public string QueryPath
  210. {
  211. get { return (ConnectionInfo == null) ? "" : ConnectionInfo.Path; }
  212. }
  213. #endregion
  214. #region methods
  215. /// <summary>发送文本。</summary>
  216. /// <exception cref="InvalidOperationException"></exception>
  217. public Task Send(params char[] message)
  218. {
  219. if (message == null || message.Length < 1) return null;
  220. if (message.Length == 1)
  221. {
  222. return Send(message[0].ToString(), Handler.FrameText);
  223. }
  224. else
  225. {
  226. var sb = new System.Text.StringBuilder();
  227. foreach (var i in message)
  228. {
  229. if ((object)i != null) sb.Append(i);
  230. }
  231. var text = sb.ToString();
  232. if (text.Length < 1) return null;
  233. return Send(text, Handler.FrameText);
  234. }
  235. }
  236. /// <summary>发送文本。</summary>
  237. /// <exception cref="InvalidOperationException"></exception>
  238. public Task Send(params string[] message)
  239. {
  240. if (message == null || message.Length < 1) return null;
  241. if (message.Length == 1)
  242. {
  243. if (string.IsNullOrEmpty(message[0])) return null;
  244. return Send(message[0], Handler.FrameText);
  245. }
  246. else
  247. {
  248. var sb = new System.Text.StringBuilder();
  249. foreach (var i in message)
  250. {
  251. if (i != null) sb.Append(i);
  252. }
  253. var text = sb.ToString();
  254. if (text.Length < 1) return null;
  255. return Send(text, Handler.FrameText);
  256. }
  257. }
  258. /// <summary>发送字节数组。</summary>
  259. /// <exception cref="InvalidOperationException"></exception>
  260. public Task Send(params byte[] message)
  261. {
  262. if (message == null || message.Length < 1) return null;
  263. return Send(message, Handler.FrameBytes);
  264. }
  265. /// <summary>发送 PING。</summary>
  266. /// <exception cref="InvalidOperationException"></exception>
  267. public Task Ping(params byte[] message)
  268. {
  269. return Send(message, Handler.FramePing);
  270. }
  271. /// <summary>发送 PONG。</summary>
  272. /// <exception cref="InvalidOperationException"></exception>
  273. public Task Pong(params byte[] message)
  274. {
  275. return Send(message, Handler.FramePong);
  276. }
  277. /// <summary>关闭 Socket 连接。</summary>
  278. public void Close()
  279. {
  280. Close(StatusCodes.NormalClosure);
  281. }
  282. #endregion
  283. }
  284. }
  285. #endif