|
|
#if !NET20
using System; using System.Collections.Generic; using System.IO; using System.Linq; using System.Threading.Tasks;
namespace Apewer.WebSocket {
/// <summary></summary>
public class Connection {
#region internal
internal Connection(SocketWrapper socket, Action<Connection> initialize, Func<byte[], HttpRequest> parseRequest, Func<HttpRequest, ComposableHandler> handlerFactory, Func<IEnumerable<string>, string> negotiateSubProtocol) { Socket = socket; OnOpen = () => { }; OnClose = () => { }; OnMessage = x => { }; OnBytes = x => { }; OnPing = x => Pong(x); OnPong = x => { }; OnError = x => { }; _initialize = initialize; _handlerFactory = handlerFactory; _parseRequest = parseRequest; _negotiateSubProtocol = negotiateSubProtocol; }
internal SocketWrapper Socket { get; set; }
private readonly Action<Connection> _initialize; private readonly Func<HttpRequest, ComposableHandler> _handlerFactory; private readonly Func<IEnumerable<string>, string> _negotiateSubProtocol; readonly Func<byte[], HttpRequest> _parseRequest;
internal ComposableHandler Handler { get; set; }
private bool _closing; private bool _closed; private const int ReadSize = 1024 * 4;
internal Action OnOpen { get; set; }
internal Action OnClose { get; set; }
internal Action<string> OnMessage { get; set; }
internal Action<byte[]> OnBytes { get; set; }
internal Action<byte[]> OnPing { get; set; }
internal Action<byte[]> OnPong { get; set; }
internal Action<Exception> OnError { get; set; }
internal ConnectionInfo ConnectionInfo { get; private set; }
/// <exception cref="InvalidOperationException"></exception>
private Task Send<T>(T message, Func<T, byte[]> createFrame) { if (Handler == null) throw new InvalidOperationException("Cannot send before handshake.");
if (!Available) { const string errorMessage = "Data sent while closing or after close. Ignoring."; WebSocketLog.Warn(errorMessage);
var taskForException = new TaskCompletionSource<object>(); taskForException.SetException(new ConnectionNotAvailableException(errorMessage)); return taskForException.Task; }
var bytes = createFrame(message); return SendBytes(bytes); }
/// <summary></summary>
internal void StartReceiving() { var data = new List<byte>(ReadSize); var buffer = new byte[ReadSize]; Read(data, buffer); }
internal void Close(int code) { if (!Available) return;
_closing = true;
if (Handler == null) { CloseSocket(); return; }
var bytes = Handler.FrameClose(code); if (bytes.Length == 0) CloseSocket(); else SendBytes(bytes, CloseSocket); }
internal void CreateHandler(IEnumerable<byte> data) { var request = _parseRequest(data.ToArray()); if (request == null) return; Handler = _handlerFactory(request); if (Handler == null) return; var subProtocol = _negotiateSubProtocol(request.SubProtocols); ConnectionInfo = ConnectionInfo.Create(request, Socket.RemoteIpAddress, Socket.RemotePort, subProtocol);
_initialize(this);
var handshake = Handler.CreateHandshake(subProtocol); SendBytes(handshake, OnOpen); }
private void Read(List<byte> data, byte[] buffer) { if (!Available) return;
Socket.Receive(buffer, r => { if (r <= 0) { WebSocketLog.Debug("0 bytes read. Closing."); CloseSocket(); return; } WebSocketLog.Debug(r + " bytes read"); var readBytes = buffer.Take(r); if (Handler != null) { Handler.Receive(readBytes); } else { data.AddRange(readBytes); CreateHandler(data); }
Read(data, buffer); }, HandleReadError); }
private void HandleReadError(Exception e) { if (e is AggregateException) { var agg = e as AggregateException; HandleReadError(agg.InnerException); return; }
if (e is ObjectDisposedException) { WebSocketLog.Debug("Swallowing ObjectDisposedException", e); return; }
OnError(e);
if (e is WebSocketException) { WebSocketLog.Debug("Error while reading", e); Close(((WebSocketException)e).StatusCode); } else if (e is SubProtocolNegotiationFailureException) { WebSocketLog.Debug(e.Message); Close(StatusCodes.ProtocolError); } else if (e is IOException) { WebSocketLog.Debug("Error while reading", e); Close(StatusCodes.AbnormalClosure); } else { WebSocketLog.Error("Application Error", e); Close(StatusCodes.InternalServerError); } }
private Task SendBytes(byte[] bytes, Action callback = null) { return Socket.Send(bytes, () => { WebSocketLog.Debug("Sent " + bytes.Length + " bytes"); if (callback != null) callback(); }, e => { if (e is IOException) WebSocketLog.Debug("Failed to send. Disconnecting.", e); else WebSocketLog.Info("Failed to send. Disconnecting.", e); CloseSocket(); }); }
private void CloseSocket() { _closing = true; OnClose(); _closed = true; Socket.Close(); Socket.Dispose(); _closing = false; }
#endregion
#region properties
/// <summary></summary>
public bool Available { get { return !_closing && !_closed && Socket.Connected; } }
/// <summary>客户端 IP 地址。</summary>
public string Address { get { return (ConnectionInfo == null) ? "" : ConnectionInfo.ClientIpAddress; } }
/// <summary>客户端端口。</summary>
public int Port { get { return (ConnectionInfo == null) ? 0 : ConnectionInfo.ClientPort; } }
/// <summary>获取主机信息。例:
/// <para>从请求 ws://127.0.0.1:8000/page?field=value</para>
/// <para>中获取 127.0.0.1:8000</para></summary>
public string Host { get { return (ConnectionInfo == null) ? "" : ConnectionInfo.Host; } }
/// <summary>获取源站信息。例:
/// <para>从请求 ws://127.0.0.1:8000/page?field=value</para>
/// <para>中获取 http://localhost</para></summary>
public string Origin { get { return (ConnectionInfo == null) ? "" : ConnectionInfo.Origin; } }
/// <summary>获取 URL 查询路径。例:
/// <para>从请求 ws://127.0.0.1:8000/page?field=value</para>
/// <para>中获取 /page?field=value</para></summary>
public string QueryPath { get { return (ConnectionInfo == null) ? "" : ConnectionInfo.Path; } }
#endregion
#region methods
/// <summary>发送文本。</summary>
/// <exception cref="InvalidOperationException"></exception>
public Task Send(params char[] message) { if (message == null || message.Length < 1) return null;
if (message.Length == 1) { return Send(message[0].ToString(), Handler.FrameText); } else { var sb = new System.Text.StringBuilder(); foreach (var i in message) { if ((object)i != null) sb.Append(i); } var text = sb.ToString(); if (text.Length < 1) return null; return Send(text, Handler.FrameText); } }
/// <summary>发送文本。</summary>
/// <exception cref="InvalidOperationException"></exception>
public Task Send(params string[] message) { if (message == null || message.Length < 1) return null; if (message.Length == 1) { if (string.IsNullOrEmpty(message[0])) return null; return Send(message[0], Handler.FrameText); } else { var sb = new System.Text.StringBuilder(); foreach (var i in message) { if (i != null) sb.Append(i); } var text = sb.ToString(); if (text.Length < 1) return null; return Send(text, Handler.FrameText); } }
/// <summary>发送字节数组。</summary>
/// <exception cref="InvalidOperationException"></exception>
public Task Send(params byte[] message) { if (message == null || message.Length < 1) return null; return Send(message, Handler.FrameBytes); }
/// <summary>发送 PING。</summary>
/// <exception cref="InvalidOperationException"></exception>
public Task Ping(params byte[] message) { return Send(message, Handler.FramePing); }
/// <summary>发送 PONG。</summary>
/// <exception cref="InvalidOperationException"></exception>
public Task Pong(params byte[] message) { return Send(message, Handler.FramePong); }
/// <summary>关闭 Socket 连接。</summary>
public void Close() { Close(StatusCodes.NormalClosure); }
#endregion
}
}
#endif
|