You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

248 lines
7.4 KiB

  1. #if !NET20
  2. using System;
  3. using System.Collections.Generic;
  4. using System.IO;
  5. using System.Threading;
  6. namespace Apewer.WebSocket
  7. {
  8. /// <summary>
  9. /// Wraps a stream and queues multiple write operations.
  10. /// Useful for wrapping SslStream as it does not support multiple simultaneous write operations.
  11. /// </summary>
  12. internal class QueuedStream : Stream
  13. {
  14. readonly Stream _stream;
  15. readonly Queue<WriteData> _queue = new Queue<WriteData>();
  16. int _pendingWrite;
  17. bool _disposed;
  18. public QueuedStream(Stream stream)
  19. {
  20. _stream = stream;
  21. }
  22. public override bool CanRead
  23. {
  24. get { return _stream.CanRead; }
  25. }
  26. public override bool CanSeek
  27. {
  28. get { return _stream.CanSeek; }
  29. }
  30. public override bool CanWrite
  31. {
  32. get { return _stream.CanWrite; }
  33. }
  34. public override long Length
  35. {
  36. get { return _stream.Length; }
  37. }
  38. public override long Position
  39. {
  40. get { return _stream.Position; }
  41. set { _stream.Position = value; }
  42. }
  43. public override int Read(byte[] buffer, int offset, int count)
  44. {
  45. return _stream.Read(buffer, offset, count);
  46. }
  47. public override long Seek(long offset, SeekOrigin origin)
  48. {
  49. return _stream.Seek(offset, origin);
  50. }
  51. public override void SetLength(long value)
  52. {
  53. _stream.SetLength(value);
  54. }
  55. public override void Write(byte[] buffer, int offset, int count)
  56. {
  57. throw new NotSupportedException("QueuedStream does not support synchronous write operations yet.");
  58. }
  59. public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
  60. {
  61. return _stream.BeginRead(buffer, offset, count, callback, state);
  62. }
  63. public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
  64. {
  65. lock (_queue)
  66. {
  67. var data = new WriteData(buffer, offset, count, callback, state);
  68. if (_pendingWrite > 0)
  69. {
  70. _queue.Enqueue(data);
  71. return data.AsyncResult;
  72. }
  73. return BeginWriteInternal(buffer, offset, count, callback, state, data);
  74. }
  75. }
  76. public override int EndRead(IAsyncResult asyncResult)
  77. {
  78. return _stream.EndRead(asyncResult);
  79. }
  80. public override void EndWrite(IAsyncResult asyncResult)
  81. {
  82. if (asyncResult is QueuedWriteResult)
  83. {
  84. var queuedResult = asyncResult as QueuedWriteResult;
  85. if (queuedResult.Exception != null) throw queuedResult.Exception;
  86. var ar = queuedResult.ActualResult;
  87. if (ar == null)
  88. {
  89. throw new NotSupportedException(
  90. "QueuedStream does not support synchronous write operations. Please wait for callback to be invoked before calling EndWrite.");
  91. }
  92. // EndWrite on actual stream should already be invoked.
  93. }
  94. else
  95. {
  96. throw new ArgumentException();
  97. }
  98. }
  99. public override void Flush()
  100. {
  101. _stream.Flush();
  102. }
  103. public override void Close()
  104. {
  105. _stream.Close();
  106. }
  107. protected override void Dispose(bool disposing)
  108. {
  109. if (!_disposed)
  110. {
  111. if (disposing)
  112. {
  113. _stream.Dispose();
  114. }
  115. _disposed = true;
  116. }
  117. base.Dispose(disposing);
  118. }
  119. IAsyncResult BeginWriteInternal(byte[] buffer, int offset, int count, AsyncCallback callback, object state, WriteData queued)
  120. {
  121. _pendingWrite++;
  122. var result = _stream.BeginWrite(buffer, offset, count, ar =>
  123. {
  124. // callback can be executed even before return value of BeginWriteInternal is set to this property
  125. queued.AsyncResult.ActualResult = ar;
  126. try
  127. {
  128. // so that we can call BeginWrite again
  129. _stream.EndWrite(ar);
  130. }
  131. catch (Exception exc)
  132. {
  133. queued.AsyncResult.Exception = exc;
  134. }
  135. // one down, another is good to go
  136. lock (_queue)
  137. {
  138. _pendingWrite--;
  139. while (_queue.Count > 0)
  140. {
  141. var data = _queue.Dequeue();
  142. try
  143. {
  144. data.AsyncResult.ActualResult = BeginWriteInternal(data.Buffer, data.Offset, data.Count, data.Callback, data.State, data);
  145. break;
  146. }
  147. catch (Exception exc)
  148. {
  149. _pendingWrite--;
  150. data.AsyncResult.Exception = exc;
  151. data.Callback(data.AsyncResult);
  152. }
  153. }
  154. callback(queued.AsyncResult);
  155. }
  156. }, state);
  157. // always return the wrapped async result.
  158. // this is especially important if the underlying stream completed the operation synchronously (hence "result.CompletedSynchronously" is true!)
  159. queued.AsyncResult.ActualResult = result;
  160. return queued.AsyncResult;
  161. }
  162. #region Nested type: WriteData
  163. class WriteData
  164. {
  165. public readonly byte[] Buffer;
  166. public readonly int Offset;
  167. public readonly int Count;
  168. public readonly AsyncCallback Callback;
  169. public readonly object State;
  170. public readonly QueuedWriteResult AsyncResult;
  171. public WriteData(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
  172. {
  173. Buffer = buffer;
  174. Offset = offset;
  175. Count = count;
  176. Callback = callback;
  177. State = state;
  178. AsyncResult = new QueuedWriteResult(state);
  179. }
  180. }
  181. #endregion
  182. #region Nested type: QueuedWriteResult
  183. class QueuedWriteResult : IAsyncResult
  184. {
  185. readonly object _state;
  186. public QueuedWriteResult(object state)
  187. {
  188. _state = state;
  189. }
  190. public Exception Exception { get; set; }
  191. public IAsyncResult ActualResult { get; set; }
  192. public object AsyncState
  193. {
  194. get { return _state; }
  195. }
  196. public WaitHandle AsyncWaitHandle
  197. {
  198. get { throw new NotSupportedException("Queued write operations do not support wait handle."); }
  199. }
  200. public bool CompletedSynchronously
  201. {
  202. get { return false; }
  203. }
  204. public bool IsCompleted
  205. {
  206. get { return ActualResult != null && ActualResult.IsCompleted; }
  207. }
  208. }
  209. #endregion
  210. }
  211. }
  212. #endif