
37 changed files with 39 additions and 1713 deletions
-
24.github/workflows/test.yml
-
2Benchmark/Benchmark.csproj
-
2EchoAgent/EchoAgent.csproj
-
2EchoTest/EchoTest.csproj
-
2HandlerTest/HandlerTest.csproj
-
480NewLife.Net/MQTT/MqttDecoder.cs
-
465NewLife.Net/MQTT/MqttEncoder.cs
-
103NewLife.Net/MQTT/MqttMessage.cs
-
56NewLife.Net/MQTT/MqttType.cs
-
17NewLife.Net/MQTT/Packets/ConnAckPacket.cs
-
40NewLife.Net/MQTT/Packets/ConnectPacket.cs
-
25NewLife.Net/MQTT/Packets/ConnectReturnCode.cs
-
23NewLife.Net/MQTT/Packets/DataPacket.cs
-
15NewLife.Net/MQTT/Packets/DisconnectPacket.cs
-
23NewLife.Net/MQTT/Packets/PacketType.cs
-
11NewLife.Net/MQTT/Packets/PacketWithId.cs
-
15NewLife.Net/MQTT/Packets/PingReqPacket.cs
-
15NewLife.Net/MQTT/Packets/PingRespPacket.cs
-
19NewLife.Net/MQTT/Packets/PubAckPacket.cs
-
19NewLife.Net/MQTT/Packets/PubCompPacket.cs
-
19NewLife.Net/MQTT/Packets/PubRecPacket.cs
-
22NewLife.Net/MQTT/Packets/PubRelPacket.cs
-
79NewLife.Net/MQTT/Packets/PublishPacket.cs
-
22NewLife.Net/MQTT/Packets/QualityOfService.cs
-
34NewLife.Net/MQTT/Packets/SubAckPacket.cs
-
28NewLife.Net/MQTT/Packets/SubscribePacket.cs
-
29NewLife.Net/MQTT/Packets/SubscriptionRequest.cs
-
18NewLife.Net/MQTT/Packets/UnsubAckPacket.cs
-
26NewLife.Net/MQTT/Packets/UnsubscribePacket.cs
-
20NewLife.Net/MQTT/QualityOfService.cs
-
35NewLife.Net/MQTT/Signatures.cs
-
43NewLife.Net/MQTT/Util.cs
-
2NewLife.Net/NewLife.Net.csproj
-
2NewLife.Net/Stun/StunServer.cs
-
2RpcTest/RpcTest.csproj
-
2Test/Test.csproj
-
11网络库.sln
@ -0,0 +1,24 @@ |
|||
name: test |
|||
|
|||
on: |
|||
push: |
|||
branches: [ '*' ] |
|||
pull_request: |
|||
branches: [ '*' ] |
|||
workflow_dispatch: |
|||
|
|||
jobs: |
|||
build-test: |
|||
|
|||
runs-on: ubuntu-latest |
|||
|
|||
steps: |
|||
- uses: actions/checkout@v3 |
|||
- name: Setup .NET |
|||
uses: actions/setup-dotnet@v2 |
|||
with: |
|||
dotnet-version: 6.0.x |
|||
- name: Build |
|||
run: dotnet build -c Release |
|||
- name: Test |
|||
run: dotnet test -c Release |
@ -1,480 +0,0 @@ |
|||
using System; |
|||
using System.Collections.Generic; |
|||
using System.Runtime.CompilerServices; |
|||
using System.Text; |
|||
using NewLife.Data; |
|||
using NewLife.Exceptions; |
|||
using NewLife.Net.Handlers; |
|||
using NewLife.Net.MQTT.Packets; |
|||
|
|||
namespace NewLife.Net.MQTT |
|||
{ |
|||
public sealed class MqttDecoder : ReplayingDecoder<MqttDecoder.ParseState> |
|||
{ |
|||
public enum ParseState |
|||
{ |
|||
Ready, |
|||
Failed |
|||
} |
|||
|
|||
readonly Boolean isServer; |
|||
readonly Int32 maxMessageSize; |
|||
|
|||
public MqttDecoder(Boolean isServer, Int32 maxMessageSize) |
|||
: base(ParseState.Ready) |
|||
{ |
|||
this.isServer = isServer; |
|||
this.maxMessageSize = maxMessageSize; |
|||
} |
|||
|
|||
protected override void Decode(IHandlerContext context, Packet input, List<Object> output) |
|||
{ |
|||
try |
|||
{ |
|||
switch (this.State) |
|||
{ |
|||
case ParseState.Ready: |
|||
if (!TryDecodePacket(input, context, out var packet)) |
|||
{ |
|||
this.RequestReplay(); |
|||
return; |
|||
} |
|||
|
|||
output.Add(packet); |
|||
this.Checkpoint(); |
|||
break; |
|||
case ParseState.Failed: |
|||
// read out data until connection is closed
|
|||
input.SkipBytes(input.ReadableBytes); |
|||
return; |
|||
default: |
|||
throw new ArgumentOutOfRangeException(); |
|||
} |
|||
} |
|||
catch (DecoderException) |
|||
{ |
|||
input.SkipBytes(input.ReadableBytes); |
|||
this.Checkpoint(ParseState.Failed); |
|||
throw; |
|||
} |
|||
} |
|||
|
|||
Boolean TryDecodePacket(Packet buffer, IHandlerContext context, out DataPacket packet) |
|||
{ |
|||
if (!buffer.IsReadable(2)) // packet consists of at least 2 bytes
|
|||
{ |
|||
packet = null; |
|||
return false; |
|||
} |
|||
|
|||
Int32 signature = buffer.ReadByte(); |
|||
|
|||
if (!TryDecodeRemainingLength(buffer, out var remainingLength) || !buffer.IsReadable(remainingLength)) |
|||
{ |
|||
packet = null; |
|||
return false; |
|||
} |
|||
|
|||
packet = DecodePacketInternal(buffer, signature, ref remainingLength, context); |
|||
|
|||
if (remainingLength > 0) |
|||
{ |
|||
throw new DecoderException($"Declared remaining length is bigger than packet data size by {remainingLength}."); |
|||
} |
|||
|
|||
return true; |
|||
} |
|||
|
|||
DataPacket DecodePacketInternal(Packet buffer, Int32 packetSignature, ref Int32 remainingLength, IHandlerContext context) |
|||
{ |
|||
if (Signatures.IsPublish(packetSignature)) |
|||
{ |
|||
var qualityOfService = (QualityOfService)((packetSignature >> 1) & 0x3); // take bits #1 and #2 ONLY and convert them into QoS value
|
|||
if (qualityOfService == QualityOfService.Reserved) |
|||
{ |
|||
throw new DecoderException($"Unexpected QoS value of {(Int32)qualityOfService} for {PacketType.PUBLISH} packet."); |
|||
} |
|||
|
|||
var duplicate = (packetSignature & 0x8) == 0x8; // test bit#3
|
|||
var retain = (packetSignature & 0x1) != 0; // test bit#0
|
|||
var packet = new PublishPacket(qualityOfService, duplicate, retain); |
|||
DecodePublishPacket(buffer, packet, ref remainingLength); |
|||
return packet; |
|||
} |
|||
|
|||
switch (packetSignature) // strict match checks for valid message type + correct values in flags part
|
|||
{ |
|||
case Signatures.PubAck: |
|||
var pubAckPacket = new PubAckPacket(); |
|||
DecodePacketIdVariableHeader(buffer, pubAckPacket, ref remainingLength); |
|||
return pubAckPacket; |
|||
case Signatures.PubRec: |
|||
var pubRecPacket = new PubRecPacket(); |
|||
DecodePacketIdVariableHeader(buffer, pubRecPacket, ref remainingLength); |
|||
return pubRecPacket; |
|||
case Signatures.PubRel: |
|||
var pubRelPacket = new PubRelPacket(); |
|||
DecodePacketIdVariableHeader(buffer, pubRelPacket, ref remainingLength); |
|||
return pubRelPacket; |
|||
case Signatures.PubComp: |
|||
var pubCompPacket = new PubCompPacket(); |
|||
DecodePacketIdVariableHeader(buffer, pubCompPacket, ref remainingLength); |
|||
return pubCompPacket; |
|||
case Signatures.PingReq: |
|||
ValidateServerPacketExpected(packetSignature); |
|||
return PingReqPacket.Instance; |
|||
case Signatures.Subscribe: |
|||
ValidateServerPacketExpected(packetSignature); |
|||
var subscribePacket = new SubscribePacket(); |
|||
DecodePacketIdVariableHeader(buffer, subscribePacket, ref remainingLength); |
|||
DecodeSubscribePayload(buffer, subscribePacket, ref remainingLength); |
|||
return subscribePacket; |
|||
case Signatures.Unsubscribe: |
|||
ValidateServerPacketExpected(packetSignature); |
|||
var unsubscribePacket = new UnsubscribePacket(); |
|||
DecodePacketIdVariableHeader(buffer, unsubscribePacket, ref remainingLength); |
|||
DecodeUnsubscribePayload(buffer, unsubscribePacket, ref remainingLength); |
|||
return unsubscribePacket; |
|||
case Signatures.Connect: |
|||
ValidateServerPacketExpected(packetSignature); |
|||
var connectPacket = new ConnectPacket(); |
|||
DecodeConnectPacket(buffer, connectPacket, ref remainingLength, context); |
|||
return connectPacket; |
|||
case Signatures.Disconnect: |
|||
ValidateServerPacketExpected(packetSignature); |
|||
return DisconnectPacket.Instance; |
|||
case Signatures.ConnAck: |
|||
ValidateClientPacketExpected(packetSignature); |
|||
var connAckPacket = new ConnAckPacket(); |
|||
DecodeConnAckPacket(buffer, connAckPacket, ref remainingLength); |
|||
return connAckPacket; |
|||
case Signatures.SubAck: |
|||
ValidateClientPacketExpected(packetSignature); |
|||
var subAckPacket = new SubAckPacket(); |
|||
DecodePacketIdVariableHeader(buffer, subAckPacket, ref remainingLength); |
|||
DecodeSubAckPayload(buffer, subAckPacket, ref remainingLength); |
|||
return subAckPacket; |
|||
case Signatures.UnsubAck: |
|||
ValidateClientPacketExpected(packetSignature); |
|||
var unsubAckPacket = new UnsubAckPacket(); |
|||
DecodePacketIdVariableHeader(buffer, unsubAckPacket, ref remainingLength); |
|||
return unsubAckPacket; |
|||
case Signatures.PingResp: |
|||
ValidateClientPacketExpected(packetSignature); |
|||
return PingRespPacket.Instance; |
|||
default: |
|||
throw new DecoderException($"First packet byte value of `{packetSignature}` is invalid."); |
|||
} |
|||
} |
|||
|
|||
void ValidateServerPacketExpected(Int32 signature) |
|||
{ |
|||
if (!isServer) |
|||
{ |
|||
throw new DecoderException($"DataPacket type determined through first packet byte `{signature}` is not supported by MQTT client."); |
|||
} |
|||
} |
|||
|
|||
void ValidateClientPacketExpected(Int32 signature) |
|||
{ |
|||
if (isServer) |
|||
{ |
|||
throw new DecoderException($"DataPacket type determined through first packet byte `{signature}` is not supported by MQTT server."); |
|||
} |
|||
} |
|||
|
|||
Boolean TryDecodeRemainingLength(Packet buffer, out Int32 value) |
|||
{ |
|||
Int32 readable = buffer.ReadableBytes; |
|||
|
|||
var result = 0; |
|||
var multiplier = 1; |
|||
Byte digit; |
|||
var read = 0; |
|||
do |
|||
{ |
|||
if (readable < read + 1) |
|||
{ |
|||
value = default(Int32); |
|||
return false; |
|||
} |
|||
digit = buffer.ReadByte(); |
|||
result += (digit & 0x7f) * multiplier; |
|||
multiplier <<= 7; |
|||
read++; |
|||
} |
|||
while ((digit & 0x80) != 0 && read < 4); |
|||
|
|||
if (read == 4 && (digit & 0x80) != 0) |
|||
{ |
|||
throw new DecoderException("Remaining length exceeds 4 bytes in length"); |
|||
} |
|||
|
|||
var completeMessageSize = result + 1 + read; |
|||
if (completeMessageSize > maxMessageSize) |
|||
{ |
|||
throw new DecoderException("Message is too big: " + completeMessageSize); |
|||
} |
|||
|
|||
value = result; |
|||
return true; |
|||
} |
|||
|
|||
static void DecodeConnectPacket(Packet buffer, ConnectPacket packet, ref Int32 remainingLength, IHandlerContext context) |
|||
{ |
|||
var protocolName = DecodeString(buffer, ref remainingLength); |
|||
if (!Util.ProtocolName.Equals(protocolName, StringComparison.Ordinal)) |
|||
{ |
|||
throw new DecoderException($"Unexpected protocol name. Expected: {Util.ProtocolName}. Actual: {protocolName}"); |
|||
} |
|||
packet.ProtocolName = Util.ProtocolName; |
|||
|
|||
DecreaseRemainingLength(ref remainingLength, 1); |
|||
packet.ProtocolLevel = buffer.ReadByte(); |
|||
|
|||
if (packet.ProtocolLevel != Util.ProtocolLevel) |
|||
{ |
|||
var connAckPacket = new ConnAckPacket(); |
|||
connAckPacket.ReturnCode = ConnectReturnCode.RefusedUnacceptableProtocolVersion; |
|||
context.WriteAndFlushAsync(connAckPacket); |
|||
throw new DecoderException($"Unexpected protocol level. Expected: {Util.ProtocolLevel}. Actual: {packet.ProtocolLevel}"); |
|||
} |
|||
|
|||
DecreaseRemainingLength(ref remainingLength, 1); |
|||
Int32 connectFlags = buffer.ReadByte(); |
|||
|
|||
packet.CleanSession = (connectFlags & 0x02) == 0x02; |
|||
|
|||
var hasWill = (connectFlags & 0x04) == 0x04; |
|||
if (hasWill) |
|||
{ |
|||
packet.HasWill = true; |
|||
packet.WillRetain = (connectFlags & 0x20) == 0x20; |
|||
packet.WillQualityOfService = (QualityOfService)((connectFlags & 0x18) >> 3); |
|||
if (packet.WillQualityOfService == QualityOfService.Reserved) |
|||
{ |
|||
throw new DecoderException($"[MQTT-3.1.2-14] Unexpected Will QoS value of {(Int32)packet.WillQualityOfService}."); |
|||
} |
|||
packet.WillTopicName = String.Empty; |
|||
} |
|||
else if ((connectFlags & 0x38) != 0) // bits 3,4,5 [MQTT-3.1.2-11]
|
|||
{ |
|||
throw new DecoderException("[MQTT-3.1.2-11]"); |
|||
} |
|||
|
|||
packet.HasUsername = (connectFlags & 0x80) == 0x80; |
|||
packet.HasPassword = (connectFlags & 0x40) == 0x40; |
|||
if (packet.HasPassword && !packet.HasUsername) |
|||
{ |
|||
throw new DecoderException("[MQTT-3.1.2-22]"); |
|||
} |
|||
if ((connectFlags & 0x1) != 0) // [MQTT-3.1.2-3]
|
|||
{ |
|||
throw new DecoderException("[MQTT-3.1.2-3]"); |
|||
} |
|||
|
|||
packet.KeepAliveInSeconds = DecodeUnsignedShort(buffer, ref remainingLength); |
|||
|
|||
var clientId = DecodeString(buffer, ref remainingLength); |
|||
Util.ValidateClientId(clientId); |
|||
packet.ClientId = clientId; |
|||
|
|||
if (hasWill) |
|||
{ |
|||
packet.WillTopicName = DecodeString(buffer, ref remainingLength); |
|||
var willMessageLength = DecodeUnsignedShort(buffer, ref remainingLength); |
|||
DecreaseRemainingLength(ref remainingLength, willMessageLength); |
|||
packet.WillMessage = buffer.ReadBytes(willMessageLength); |
|||
} |
|||
|
|||
if (packet.HasUsername) |
|||
{ |
|||
packet.Username = DecodeString(buffer, ref remainingLength); |
|||
} |
|||
|
|||
if (packet.HasPassword) |
|||
{ |
|||
packet.Password = DecodeString(buffer, ref remainingLength); |
|||
} |
|||
} |
|||
|
|||
static void DecodeConnAckPacket(Packet buffer, ConnAckPacket packet, ref Int32 remainingLength) |
|||
{ |
|||
var ackData = DecodeUnsignedShort(buffer, ref remainingLength); |
|||
packet.SessionPresent = ((ackData >> 8) & 0x1) != 0; |
|||
packet.ReturnCode = (ConnectReturnCode)(ackData & 0xFF); |
|||
} |
|||
|
|||
static void DecodePublishPacket(Packet buffer, PublishPacket packet, ref Int32 remainingLength) |
|||
{ |
|||
var topicName = DecodeString(buffer, ref remainingLength, 1); |
|||
Util.ValidateTopicName(topicName); |
|||
|
|||
packet.TopicName = topicName; |
|||
if (packet.QualityOfService > QualityOfService.AtMostOnce) |
|||
{ |
|||
DecodePacketIdVariableHeader(buffer, packet, ref remainingLength); |
|||
} |
|||
|
|||
Packet payload; |
|||
if (remainingLength > 0) |
|||
{ |
|||
payload = buffer.ReadSlice(remainingLength); |
|||
payload.Retain(); |
|||
remainingLength = 0; |
|||
} |
|||
else |
|||
{ |
|||
payload = Unpooled.Empty; |
|||
} |
|||
packet.Payload = payload; |
|||
} |
|||
|
|||
static void DecodePacketIdVariableHeader(Packet buffer, PacketWithId packet, ref Int32 remainingLength) |
|||
{ |
|||
var packetId = packet.PacketId = DecodeUnsignedShort(buffer, ref remainingLength); |
|||
if (packetId == 0) |
|||
{ |
|||
throw new DecoderException("[MQTT-2.3.1-1]"); |
|||
} |
|||
} |
|||
|
|||
static void DecodeSubscribePayload(Packet buffer, SubscribePacket packet, ref Int32 remainingLength) |
|||
{ |
|||
var subscribeTopics = new List<SubscriptionRequest>(); |
|||
while (remainingLength > 0) |
|||
{ |
|||
var topicFilter = DecodeString(buffer, ref remainingLength); |
|||
ValidateTopicFilter(topicFilter); |
|||
|
|||
DecreaseRemainingLength(ref remainingLength, 1); |
|||
Int32 qos = buffer.ReadByte(); |
|||
if (qos >= (Int32)QualityOfService.Reserved) |
|||
{ |
|||
throw new DecoderException($"[MQTT-3.8.3-4]. Invalid QoS value: {qos}."); |
|||
} |
|||
|
|||
subscribeTopics.Add(new SubscriptionRequest(topicFilter, (QualityOfService)qos)); |
|||
} |
|||
|
|||
if (subscribeTopics.Count == 0) |
|||
{ |
|||
throw new DecoderException("[MQTT-3.8.3-3]"); |
|||
} |
|||
|
|||
packet.Requests = subscribeTopics; |
|||
} |
|||
|
|||
static void ValidateTopicFilter(String topicFilter) |
|||
{ |
|||
var length = topicFilter.Length; |
|||
if (length == 0) |
|||
{ |
|||
throw new DecoderException("[MQTT-4.7.3-1]"); |
|||
} |
|||
|
|||
for (var i = 0; i < length; i++) |
|||
{ |
|||
var c = topicFilter[i]; |
|||
switch (c) |
|||
{ |
|||
case '+': |
|||
if ((i > 0 && topicFilter[i - 1] != '/') || (i < length - 1 && topicFilter[i + 1] != '/')) |
|||
{ |
|||
throw new DecoderException($"[MQTT-4.7.1-3]. Invalid topic filter: {topicFilter}"); |
|||
} |
|||
break; |
|||
case '#': |
|||
if (i < length - 1 || (i > 0 && topicFilter[i - 1] != '/')) |
|||
{ |
|||
throw new DecoderException($"[MQTT-4.7.1-2]. Invalid topic filter: {topicFilter}"); |
|||
} |
|||
break; |
|||
} |
|||
} |
|||
} |
|||
|
|||
static void DecodeSubAckPayload(Packet buffer, SubAckPacket packet, ref Int32 remainingLength) |
|||
{ |
|||
var returnCodes = new QualityOfService[remainingLength]; |
|||
for (var i = 0; i < remainingLength; i++) |
|||
{ |
|||
var returnCode = (QualityOfService)buffer.ReadByte(); |
|||
if (returnCode > QualityOfService.ExactlyOnce && returnCode != QualityOfService.Failure) |
|||
{ |
|||
throw new DecoderException($"[MQTT-3.9.3-2]. Invalid return code: {returnCode}"); |
|||
} |
|||
returnCodes[i] = returnCode; |
|||
} |
|||
packet.ReturnCodes = returnCodes; |
|||
|
|||
remainingLength = 0; |
|||
} |
|||
|
|||
static void DecodeUnsubscribePayload(Packet buffer, UnsubscribePacket packet, ref Int32 remainingLength) |
|||
{ |
|||
var unsubscribeTopics = new List<String>(); |
|||
while (remainingLength > 0) |
|||
{ |
|||
var topicFilter = DecodeString(buffer, ref remainingLength); |
|||
ValidateTopicFilter(topicFilter); |
|||
unsubscribeTopics.Add(topicFilter); |
|||
} |
|||
|
|||
if (unsubscribeTopics.Count == 0) |
|||
{ |
|||
throw new DecoderException("[MQTT-3.10.3-2]"); |
|||
} |
|||
|
|||
packet.TopicFilters = unsubscribeTopics; |
|||
|
|||
remainingLength = 0; |
|||
} |
|||
|
|||
static Int32 DecodeUnsignedShort(Packet buffer, ref Int32 remainingLength) |
|||
{ |
|||
DecreaseRemainingLength(ref remainingLength, 2); |
|||
return buffer.ReadUnsignedShort(); |
|||
} |
|||
|
|||
static String DecodeString(Packet buffer, ref Int32 remainingLength) => DecodeString(buffer, ref remainingLength, 0, Int32.MaxValue); |
|||
|
|||
static String DecodeString(Packet buffer, ref Int32 remainingLength, Int32 minBytes) => DecodeString(buffer, ref remainingLength, minBytes, Int32.MaxValue); |
|||
|
|||
static String DecodeString(Packet buffer, ref Int32 remainingLength, Int32 minBytes, Int32 maxBytes) |
|||
{ |
|||
var size = DecodeUnsignedShort(buffer, ref remainingLength); |
|||
|
|||
if (size < minBytes) |
|||
{ |
|||
throw new DecoderException($"String value is shorter than minimum allowed {minBytes}. Advertised length: {size}"); |
|||
} |
|||
if (size > maxBytes) |
|||
{ |
|||
throw new DecoderException($"String value is longer than maximum allowed {maxBytes}. Advertised length: {size}"); |
|||
} |
|||
|
|||
if (size == 0) |
|||
{ |
|||
return String.Empty; |
|||
} |
|||
|
|||
DecreaseRemainingLength(ref remainingLength, size); |
|||
|
|||
var value = buffer.ToString(buffer.ReaderIndex, size, Encoding.UTF8); |
|||
// todo: enforce string definition by MQTT spec
|
|||
buffer.SetReaderIndex(buffer.ReaderIndex + size); |
|||
return value; |
|||
} |
|||
|
|||
[MethodImpl(MethodImplOptions.AggressiveInlining)] // we don't care about the method being on exception's stack so it's OK to inline
|
|||
static void DecreaseRemainingLength(ref Int32 remainingLength, Int32 minExpectedLength) |
|||
{ |
|||
if (remainingLength < minExpectedLength) |
|||
{ |
|||
throw new DecoderException($"Current Remaining Length of {remainingLength} is smaller than expected {minExpectedLength}."); |
|||
} |
|||
remainingLength -= minExpectedLength; |
|||
} |
|||
} |
|||
} |
@ -1,465 +0,0 @@ |
|||
using System; |
|||
using System.Collections.Generic; |
|||
using System.Text; |
|||
using NewLife.Data; |
|||
using NewLife.Net.MQTT.Packets; |
|||
|
|||
namespace NewLife.Net.MQTT |
|||
{ |
|||
public sealed class MqttEncoder : MessageToMessageEncoder<DataPacket> |
|||
{ |
|||
public static readonly MqttEncoder Instance = new MqttEncoder(); |
|||
const Int32 PacketIdLength = 2; |
|||
const Int32 StringSizeLength = 2; |
|||
const Int32 MaxVariableLength = 4; |
|||
|
|||
protected override void Encode(IHandlerContext context, DataPacket message, List<Object> output) => DoEncode(context.Allocator, message, output); |
|||
|
|||
public override Boolean IsSharable => true; |
|||
|
|||
/// <summary>
|
|||
/// This is the main encoding method.
|
|||
/// It's only visible for testing.
|
|||
/// @param bufferAllocator Allocates ByteBuf
|
|||
/// @param packet MQTT packet to encode
|
|||
/// @return ByteBuf with encoded bytes
|
|||
/// </summary>
|
|||
internal static void DoEncode(IByteBufferAllocator bufferAllocator, DataPacket packet, List<Object> output) |
|||
{ |
|||
switch (packet.PacketType) |
|||
{ |
|||
case PacketType.CONNECT: |
|||
EncodeConnectMessage(bufferAllocator, (ConnectPacket)packet, output); |
|||
break; |
|||
case PacketType.CONNACK: |
|||
EncodeConnAckMessage(bufferAllocator, (ConnAckPacket)packet, output); |
|||
break; |
|||
case PacketType.PUBLISH: |
|||
EncodePublishMessage(bufferAllocator, (PublishPacket)packet, output); |
|||
break; |
|||
case PacketType.PUBACK: |
|||
case PacketType.PUBREC: |
|||
case PacketType.PUBREL: |
|||
case PacketType.PUBCOMP: |
|||
case PacketType.UNSUBACK: |
|||
EncodePacketWithIdOnly(bufferAllocator, (PacketWithId)packet, output); |
|||
break; |
|||
case PacketType.SUBSCRIBE: |
|||
EncodeSubscribeMessage(bufferAllocator, (SubscribePacket)packet, output); |
|||
break; |
|||
case PacketType.SUBACK: |
|||
EncodeSubAckMessage(bufferAllocator, (SubAckPacket)packet, output); |
|||
break; |
|||
case PacketType.UNSUBSCRIBE: |
|||
EncodeUnsubscribeMessage(bufferAllocator, (UnsubscribePacket)packet, output); |
|||
break; |
|||
case PacketType.PINGREQ: |
|||
case PacketType.PINGRESP: |
|||
case PacketType.DISCONNECT: |
|||
EncodePacketWithFixedHeaderOnly(bufferAllocator, packet, output); |
|||
break; |
|||
default: |
|||
throw new ArgumentException("Unknown packet type: " + packet.PacketType, nameof(packet)); |
|||
} |
|||
} |
|||
|
|||
static void EncodeConnectMessage(IByteBufferAllocator bufferAllocator, ConnectPacket packet, List<Object> output) |
|||
{ |
|||
var payloadBufferSize = 0; |
|||
|
|||
// Client id
|
|||
var clientId = packet.ClientId; |
|||
Util.ValidateClientId(clientId); |
|||
var clientIdBytes = EncodeStringInUtf8(clientId); |
|||
payloadBufferSize += StringSizeLength + clientIdBytes.Length; |
|||
|
|||
Byte[] willTopicBytes; |
|||
Packet willMessage; |
|||
if (packet.HasWill) |
|||
{ |
|||
// Will topic and message
|
|||
var willTopic = packet.WillTopicName; |
|||
willTopicBytes = EncodeStringInUtf8(willTopic); |
|||
willMessage = packet.WillMessage; |
|||
payloadBufferSize += StringSizeLength + willTopicBytes.Length; |
|||
payloadBufferSize += 2 + willMessage.ReadableBytes; |
|||
} |
|||
else |
|||
{ |
|||
willTopicBytes = null; |
|||
willMessage = null; |
|||
} |
|||
|
|||
var userName = packet.Username; |
|||
Byte[] userNameBytes; |
|||
if (packet.HasUsername) |
|||
{ |
|||
userNameBytes = EncodeStringInUtf8(userName); |
|||
payloadBufferSize += StringSizeLength + userNameBytes.Length; |
|||
} |
|||
else |
|||
{ |
|||
userNameBytes = null; |
|||
} |
|||
|
|||
Byte[] passwordBytes; |
|||
if (packet.HasPassword) |
|||
{ |
|||
var password = packet.Password; |
|||
passwordBytes = EncodeStringInUtf8(password); |
|||
payloadBufferSize += StringSizeLength + passwordBytes.Length; |
|||
} |
|||
else |
|||
{ |
|||
passwordBytes = null; |
|||
} |
|||
|
|||
// Fixed header
|
|||
var protocolNameBytes = EncodeStringInUtf8(Util.ProtocolName); |
|||
var variableHeaderBufferSize = StringSizeLength + protocolNameBytes.Length + 4; |
|||
var variablePartSize = variableHeaderBufferSize + payloadBufferSize; |
|||
var fixedHeaderBufferSize = 1 + MaxVariableLength; |
|||
Packet buf = null; |
|||
try |
|||
{ |
|||
buf = bufferAllocator.Buffer(fixedHeaderBufferSize + variablePartSize); |
|||
buf.WriteByte(CalculateFirstByteOfFixedHeader(packet)); |
|||
WriteVariableLengthInt(buf, variablePartSize); |
|||
|
|||
buf.WriteShort(protocolNameBytes.Length); |
|||
buf.WriteBytes(protocolNameBytes); |
|||
|
|||
buf.WriteByte(Util.ProtocolLevel); |
|||
buf.WriteByte(CalculateConnectFlagsByte(packet)); |
|||
buf.WriteShort(packet.KeepAliveInSeconds); |
|||
|
|||
// Payload
|
|||
buf.WriteShort(clientIdBytes.Length); |
|||
buf.WriteBytes(clientIdBytes, 0, clientIdBytes.Length); |
|||
if (packet.HasWill) |
|||
{ |
|||
buf.WriteShort(willTopicBytes.Length); |
|||
buf.WriteBytes(willTopicBytes, 0, willTopicBytes.Length); |
|||
buf.WriteShort(willMessage.ReadableBytes); |
|||
if (willMessage.IsReadable()) |
|||
{ |
|||
buf.WriteBytes(willMessage); |
|||
} |
|||
willMessage.Release(); |
|||
willMessage = null; |
|||
} |
|||
if (packet.HasUsername) |
|||
{ |
|||
buf.WriteShort(userNameBytes.Length); |
|||
buf.WriteBytes(userNameBytes, 0, userNameBytes.Length); |
|||
|
|||
if (packet.HasPassword) |
|||
{ |
|||
buf.WriteShort(passwordBytes.Length); |
|||
buf.WriteBytes(passwordBytes, 0, passwordBytes.Length); |
|||
} |
|||
} |
|||
|
|||
output.Add(buf); |
|||
buf = null; |
|||
} |
|||
finally |
|||
{ |
|||
buf?.SafeRelease(); |
|||
willMessage?.SafeRelease(); |
|||
} |
|||
} |
|||
|
|||
static Int32 CalculateConnectFlagsByte(ConnectPacket packet) |
|||
{ |
|||
var flagByte = 0; |
|||
if (packet.HasUsername) |
|||
{ |
|||
flagByte |= 0x80; |
|||
} |
|||
if (packet.HasPassword) |
|||
{ |
|||
flagByte |= 0x40; |
|||
} |
|||
if (packet.HasWill) |
|||
{ |
|||
flagByte |= 0x04; |
|||
flagByte |= ((Int32)packet.WillQualityOfService & 0x03) << 3; |
|||
if (packet.WillRetain) |
|||
{ |
|||
flagByte |= 0x20; |
|||
} |
|||
} |
|||
if (packet.CleanSession) |
|||
{ |
|||
flagByte |= 0x02; |
|||
} |
|||
return flagByte; |
|||
} |
|||
|
|||
static void EncodeConnAckMessage(IByteBufferAllocator bufferAllocator, ConnAckPacket message, List<Object> output) |
|||
{ |
|||
Packet buffer = null; |
|||
try |
|||
{ |
|||
buffer = bufferAllocator.Buffer(4); |
|||
buffer.WriteByte(CalculateFirstByteOfFixedHeader(message)); |
|||
buffer.WriteByte(2); // remaining length
|
|||
if (message.SessionPresent) |
|||
{ |
|||
buffer.WriteByte(1); // 7 reserved 0-bits and SP = 1
|
|||
} |
|||
else |
|||
{ |
|||
buffer.WriteByte(0); // 7 reserved 0-bits and SP = 0
|
|||
} |
|||
buffer.WriteByte((Byte)message.ReturnCode); |
|||
|
|||
|
|||
output.Add(buffer); |
|||
buffer = null; |
|||
} |
|||
finally |
|||
{ |
|||
buffer?.SafeRelease(); |
|||
} |
|||
} |
|||
|
|||
static void EncodePublishMessage(IByteBufferAllocator bufferAllocator, PublishPacket packet, List<Object> output) |
|||
{ |
|||
Packet payload = packet.Payload ?? Unpooled.Empty; |
|||
|
|||
var topicName = packet.TopicName; |
|||
Util.ValidateTopicName(topicName); |
|||
var topicNameBytes = EncodeStringInUtf8(topicName); |
|||
|
|||
Int32 variableHeaderBufferSize = StringSizeLength + topicNameBytes.Length + |
|||
(packet.QualityOfService > QualityOfService.AtMostOnce ? PacketIdLength : 0); |
|||
Int32 payloadBufferSize = payload.ReadableBytes; |
|||
var variablePartSize = variableHeaderBufferSize + payloadBufferSize; |
|||
var fixedHeaderBufferSize = 1 + MaxVariableLength; |
|||
|
|||
Packet buf = null; |
|||
try |
|||
{ |
|||
buf = bufferAllocator.Buffer(fixedHeaderBufferSize + variablePartSize); |
|||
buf.WriteByte(CalculateFirstByteOfFixedHeader(packet)); |
|||
WriteVariableLengthInt(buf, variablePartSize); |
|||
buf.WriteShort(topicNameBytes.Length); |
|||
buf.WriteBytes(topicNameBytes); |
|||
if (packet.QualityOfService > QualityOfService.AtMostOnce) |
|||
{ |
|||
buf.WriteShort(packet.PacketId); |
|||
} |
|||
|
|||
output.Add(buf); |
|||
buf = null; |
|||
} |
|||
finally |
|||
{ |
|||
buf?.SafeRelease(); |
|||
} |
|||
|
|||
if (payload.IsReadable()) |
|||
{ |
|||
output.Add(payload.Retain()); |
|||
} |
|||
} |
|||
|
|||
static void EncodePacketWithIdOnly(IByteBufferAllocator bufferAllocator, PacketWithId packet, List<Object> output) |
|||
{ |
|||
var msgId = packet.PacketId; |
|||
|
|||
const Int32 VariableHeaderBufferSize = PacketIdLength; // variable part only has a packet id
|
|||
var fixedHeaderBufferSize = 1 + MaxVariableLength; |
|||
Packet buffer = null; |
|||
try |
|||
{ |
|||
buffer = bufferAllocator.Buffer(fixedHeaderBufferSize + VariableHeaderBufferSize); |
|||
buffer.WriteByte(CalculateFirstByteOfFixedHeader(packet)); |
|||
WriteVariableLengthInt(buffer, VariableHeaderBufferSize); |
|||
buffer.WriteShort(msgId); |
|||
|
|||
output.Add(buffer); |
|||
buffer = null; |
|||
} |
|||
finally |
|||
{ |
|||
buffer?.SafeRelease(); |
|||
} |
|||
} |
|||
|
|||
static void EncodeSubscribeMessage(IByteBufferAllocator bufferAllocator, SubscribePacket packet, List<Object> output) |
|||
{ |
|||
const Int32 VariableHeaderSize = PacketIdLength; |
|||
var payloadBufferSize = 0; |
|||
|
|||
ThreadLocalObjectList encodedTopicFilters = ThreadLocalObjectList.NewInstance(); |
|||
|
|||
Packet buf = null; |
|||
try |
|||
{ |
|||
foreach (var topic in packet.Requests) |
|||
{ |
|||
var topicFilterBytes = EncodeStringInUtf8(topic.TopicFilter); |
|||
payloadBufferSize += StringSizeLength + topicFilterBytes.Length + 1; // length, value, QoS
|
|||
encodedTopicFilters.Add(topicFilterBytes); |
|||
} |
|||
|
|||
var variablePartSize = VariableHeaderSize + payloadBufferSize; |
|||
var fixedHeaderBufferSize = 1 + MaxVariableLength; |
|||
|
|||
buf = bufferAllocator.Buffer(fixedHeaderBufferSize + variablePartSize); |
|||
buf.WriteByte(CalculateFirstByteOfFixedHeader(packet)); |
|||
WriteVariableLengthInt(buf, variablePartSize); |
|||
|
|||
// Variable Header
|
|||
buf.WriteShort(packet.PacketId); // todo: review: validate?
|
|||
|
|||
// Payload
|
|||
for (var i = 0; i < encodedTopicFilters.Count; i++) |
|||
{ |
|||
var topicFilterBytes = (Byte[])encodedTopicFilters[i]; |
|||
buf.WriteShort(topicFilterBytes.Length); |
|||
buf.WriteBytes(topicFilterBytes, 0, topicFilterBytes.Length); |
|||
buf.WriteByte((Int32)packet.Requests[i].QualityOfService); |
|||
} |
|||
|
|||
output.Add(buf); |
|||
buf = null; |
|||
} |
|||
finally |
|||
{ |
|||
buf?.SafeRelease(); |
|||
encodedTopicFilters.Return(); |
|||
} |
|||
} |
|||
|
|||
static void EncodeSubAckMessage(IByteBufferAllocator bufferAllocator, SubAckPacket message, List<Object> output) |
|||
{ |
|||
var payloadBufferSize = message.ReturnCodes.Count; |
|||
var variablePartSize = PacketIdLength + payloadBufferSize; |
|||
var fixedHeaderBufferSize = 1 + MaxVariableLength; |
|||
Packet buf = null; |
|||
try |
|||
{ |
|||
buf = bufferAllocator.Buffer(fixedHeaderBufferSize + variablePartSize); |
|||
buf.WriteByte(CalculateFirstByteOfFixedHeader(message)); |
|||
WriteVariableLengthInt(buf, variablePartSize); |
|||
buf.WriteShort(message.PacketId); |
|||
foreach (QualityOfService qos in message.ReturnCodes) |
|||
{ |
|||
buf.WriteByte((Byte)qos); |
|||
} |
|||
|
|||
output.Add(buf); |
|||
buf = null; |
|||
|
|||
} |
|||
finally |
|||
{ |
|||
buf?.SafeRelease(); |
|||
} |
|||
} |
|||
|
|||
static void EncodeUnsubscribeMessage(IByteBufferAllocator bufferAllocator, UnsubscribePacket packet, List<Object> output) |
|||
{ |
|||
const Int32 VariableHeaderSize = 2; |
|||
var payloadBufferSize = 0; |
|||
|
|||
ThreadLocalObjectList encodedTopicFilters = ThreadLocalObjectList.NewInstance(); |
|||
|
|||
Packet buf = null; |
|||
try |
|||
{ |
|||
foreach (var topic in packet.TopicFilters) |
|||
{ |
|||
var topicFilterBytes = EncodeStringInUtf8(topic); |
|||
payloadBufferSize += StringSizeLength + topicFilterBytes.Length; // length, value
|
|||
encodedTopicFilters.Add(topicFilterBytes); |
|||
} |
|||
|
|||
var variablePartSize = VariableHeaderSize + payloadBufferSize; |
|||
var fixedHeaderBufferSize = 1 + MaxVariableLength; |
|||
|
|||
buf = bufferAllocator.Buffer(fixedHeaderBufferSize + variablePartSize); |
|||
buf.WriteByte(CalculateFirstByteOfFixedHeader(packet)); |
|||
WriteVariableLengthInt(buf, variablePartSize); |
|||
|
|||
// Variable Header
|
|||
buf.WriteShort(packet.PacketId); // todo: review: validate?
|
|||
|
|||
// Payload
|
|||
for (var i = 0; i < encodedTopicFilters.Count; i++) |
|||
{ |
|||
var topicFilterBytes = (Byte[])encodedTopicFilters[i]; |
|||
buf.WriteShort(topicFilterBytes.Length); |
|||
buf.WriteBytes(topicFilterBytes, 0, topicFilterBytes.Length); |
|||
} |
|||
|
|||
output.Add(buf); |
|||
buf = null; |
|||
} |
|||
finally |
|||
{ |
|||
buf?.SafeRelease(); |
|||
encodedTopicFilters.Return(); |
|||
} |
|||
} |
|||
|
|||
static void EncodePacketWithFixedHeaderOnly(IByteBufferAllocator bufferAllocator, DataPacket packet, List<Object> output) |
|||
{ |
|||
Packet buffer = null; |
|||
try |
|||
{ |
|||
buffer = bufferAllocator.Buffer(2); |
|||
buffer.WriteByte(CalculateFirstByteOfFixedHeader(packet)); |
|||
buffer.WriteByte(0); |
|||
|
|||
output.Add(buffer); |
|||
buffer = null; |
|||
} |
|||
finally |
|||
{ |
|||
buffer?.SafeRelease(); |
|||
} |
|||
} |
|||
|
|||
static Int32 CalculateFirstByteOfFixedHeader(DataPacket packet) |
|||
{ |
|||
var ret = 0; |
|||
ret |= (Int32)packet.PacketType << 4; |
|||
if (packet.Duplicate) |
|||
{ |
|||
ret |= 0x08; |
|||
} |
|||
ret |= (Int32)packet.QualityOfService << 1; |
|||
if (packet.RetainRequested) |
|||
{ |
|||
ret |= 0x01; |
|||
} |
|||
return ret; |
|||
} |
|||
|
|||
static void WriteVariableLengthInt(Packet buffer, Int32 value) |
|||
{ |
|||
do |
|||
{ |
|||
var digit = value % 128; |
|||
value /= 128; |
|||
if (value > 0) |
|||
{ |
|||
digit |= 0x80; |
|||
} |
|||
buffer.WriteByte(digit); |
|||
} |
|||
while (value > 0); |
|||
} |
|||
|
|||
static Byte[] EncodeStringInUtf8(String s) |
|||
{ |
|||
// todo: validate against extra limitations per MQTT's UTF-8 string definition
|
|||
return Encoding.UTF8.GetBytes(s); |
|||
} |
|||
} |
|||
} |
@ -1,103 +0,0 @@ |
|||
using System; |
|||
using System.IO; |
|||
using NewLife.Serialization; |
|||
|
|||
namespace NewLife.Net.MQTT |
|||
{ |
|||
/// <summary>MQTT(Message Queue Telemetry Transport),遥测传输协议</summary>
|
|||
/// <remarks>
|
|||
/// 提供订阅/发布模式,更为简约、轻量,易于使用,针对受限环境(带宽低、网络延迟高、网络通信不稳定),可以简单概括为物联网打造,官方总结特点如下:
|
|||
/// 1.使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合。
|
|||
/// 2. 对负载内容屏蔽的消息传输。
|
|||
/// 3. 使用 TCP/IP 提供网络连接。
|
|||
/// 4. 有三种消息发布服务质量:
|
|||
/// “至多一次”,消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。
|
|||
/// “至少一次”,确保消息到达,但消息重复可能会发生。
|
|||
/// “只有一次”,确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果。
|
|||
/// 5. 小型传输,开销很小(固定长度的头部是 2 字节),协议交换最小化,以降低网络流量。
|
|||
/// 6. 使用 Last Will 和 Testament 特性通知有关各方客户端异常中断的机制。
|
|||
/// </remarks>
|
|||
public class MqttMessage : IAccessor |
|||
{ |
|||
#region 属性
|
|||
/// <summary>消息类型</summary>
|
|||
public Byte Type { get; set; } |
|||
|
|||
/// <summary>打开标识。值为1时表示当前消息先前已经被传送过</summary>
|
|||
/// <remarks>
|
|||
/// 保证消息可靠传输,默认为0,只占用一个字节,表示第一次发送。不能用于检测消息重复发送等。只适用于客户端或服务器端尝试重发PUBLISH, PUBREL, SUBSCRIBE 或 UNSUBSCRIBE消息,注意需要满足以下条件:
|
|||
/// 当QoS > 0
|
|||
/// 消息需要回复确认
|
|||
/// 此时,在可变头部需要包含消息ID。当值为1时,表示当前消息先前已经被传送过。
|
|||
/// </remarks>
|
|||
public Byte Dup { get; set; } |
|||
|
|||
/// <summary>QoS等级</summary>
|
|||
public Byte QoS { get; set; } |
|||
|
|||
/// <summary>保持。仅针对PUBLISH消息。不同值,不同含义</summary>
|
|||
/// <remarks>
|
|||
/// 1:表示发送的消息需要一直持久保存(不受服务器重启影响),不但要发送给当前的订阅者,并且以后新来的订阅了此Topic name的订阅者会马上得到推送。
|
|||
/// 备注:新来乍到的订阅者,只会取出最新的一个RETAIN flag = 1的消息推送。
|
|||
/// 0:仅仅为当前订阅者推送此消息。
|
|||
/// 假如服务器收到一个空消息体(zero-length payload)、RETAIN = 1、已存在Topic name的PUBLISH消息,服务器可以删除掉对应的已被持久化的PUBLISH消息。
|
|||
/// </remarks>
|
|||
public Byte Retain { get; set; } |
|||
|
|||
/// <summary>长度。7位压缩编码整数</summary>
|
|||
/// <remarks>
|
|||
/// 在当前消息中剩余的byte(字节)数,包含可变头部和负荷(内容)。
|
|||
/// 单个字节最大值:01111111,16进制:0x7F,10进制为127。
|
|||
/// MQTT协议规定,第八位(最高位)若为1,则表示还有后续字节存在。
|
|||
/// </remarks>
|
|||
public Byte Length { get; set; } |
|||
#endregion
|
|||
|
|||
#region 核心读写方法
|
|||
/// <summary>从数据流中读取消息</summary>
|
|||
/// <param name="stream">数据流</param>
|
|||
/// <param name="context">上下文</param>
|
|||
/// <returns>是否成功</returns>
|
|||
public virtual Boolean Read(Stream stream, Object context) |
|||
{ |
|||
var flag = stream.ReadByte(); |
|||
if (flag < 0) return false; |
|||
|
|||
Type = (Byte)((flag & 0b1111_0000) >> 4); |
|||
Dup = (Byte)((flag & 0b0000_1000) >> 3); |
|||
QoS = (Byte)((flag & 0b0000_0110) >> 1); |
|||
Retain = (Byte)((flag & 0b0000_0001) >> 0); |
|||
|
|||
Length = (Byte)stream.ReadByte(); |
|||
|
|||
return true; |
|||
} |
|||
|
|||
/// <summary>把消息写入到数据流中</summary>
|
|||
/// <param name="stream">数据流</param>
|
|||
/// <param name="context">上下文</param>
|
|||
public virtual Boolean Write(Stream stream, Object context) |
|||
{ |
|||
var flag = 0; |
|||
flag |= (Type << 4) & 0b1111_0000; |
|||
flag |= (Dup << 3) & 0b0000_1000; |
|||
flag |= (QoS << 1) & 0b0000_0110; |
|||
flag |= (Retain << 0) & 0b0000_0001; |
|||
|
|||
stream.Write((Byte)flag); |
|||
stream.Write(Length); |
|||
|
|||
return true; |
|||
} |
|||
|
|||
/// <summary>消息转为字节数组</summary>
|
|||
/// <returns></returns>
|
|||
public virtual Byte[] ToArray() |
|||
{ |
|||
var ms = new MemoryStream(); |
|||
Write(ms, null); |
|||
return ms.ToArray(); |
|||
} |
|||
#endregion
|
|||
} |
|||
} |
@ -1,56 +0,0 @@ |
|||
using System; |
|||
|
|||
namespace NewLife.Net.MQTT |
|||
{ |
|||
/// <summary>消息类型</summary>
|
|||
public enum MqttType : Byte |
|||
{ |
|||
/// <summary>保留</summary>
|
|||
Reserved = 0, |
|||
|
|||
/// <summary>连接</summary>
|
|||
Connect, |
|||
|
|||
/// <summary>连接确认</summary>
|
|||
ConnAck, |
|||
|
|||
/// <summary>发布消息</summary>
|
|||
Publish, |
|||
|
|||
/// <summary>发布确认</summary>
|
|||
PubAck, |
|||
|
|||
/// <summary>发布已接收</summary>
|
|||
PubRec, |
|||
|
|||
/// <summary>发布已释放</summary>
|
|||
PubRel, |
|||
|
|||
/// <summary>发布已完成</summary>
|
|||
PubComp, |
|||
|
|||
/// <summary>客户端订阅请求</summary>
|
|||
Subscribe, |
|||
|
|||
/// <summary>订阅确认</summary>
|
|||
SubAck, |
|||
|
|||
/// <summary>取消订阅</summary>
|
|||
UnSubscribe, |
|||
|
|||
/// <summary>取消订阅确认</summary>
|
|||
UnSubAck, |
|||
|
|||
/// <summary>Ping请求</summary>
|
|||
PingReq, |
|||
|
|||
/// <summary>Ping响应</summary>
|
|||
PingResp, |
|||
|
|||
/// <summary>断开连接</summary>
|
|||
Disconnect, |
|||
|
|||
/// <summary>保留</summary>
|
|||
Reserved2 |
|||
} |
|||
} |
@ -1,17 +0,0 @@ |
|||
using System; |
|||
|
|||
namespace NewLife.Net.MQTT.Packets |
|||
{ |
|||
/// <summary>连接响应包</summary>
|
|||
public sealed class ConnAckPacket : DataPacket |
|||
{ |
|||
/// <summary>包类型</summary>
|
|||
public override PacketType PacketType => PacketType.CONNACK; |
|||
|
|||
/// <summary>会话</summary>
|
|||
public Boolean SessionPresent { get; set; } |
|||
|
|||
/// <summary>响应代码</summary>
|
|||
public ConnectReturnCode ReturnCode { get; set; } |
|||
} |
|||
} |
@ -1,40 +0,0 @@ |
|||
using System; |
|||
using NewLife.Data; |
|||
|
|||
namespace NewLife.Net.MQTT.Packets |
|||
{ |
|||
/// <summary>连接包</summary>
|
|||
public sealed class ConnectPacket : DataPacket |
|||
{ |
|||
/// <summary>包类型</summary>
|
|||
public override PacketType PacketType => PacketType.CONNECT; |
|||
|
|||
public String ProtocolName { get; set; } |
|||
|
|||
public Int32 ProtocolLevel { get; set; } |
|||
|
|||
public Boolean CleanSession { get; set; } |
|||
|
|||
public Boolean HasWill { get; set; } |
|||
|
|||
public QualityOfService WillQualityOfService { get; set; } |
|||
|
|||
public Boolean WillRetain { get; set; } |
|||
|
|||
public Boolean HasPassword { get; set; } |
|||
|
|||
public Boolean HasUsername { get; set; } |
|||
|
|||
public Int32 KeepAliveInSeconds { get; set; } |
|||
|
|||
public String Username { get; set; } |
|||
|
|||
public String Password { get; set; } |
|||
|
|||
public String ClientId { get; set; } |
|||
|
|||
public String WillTopicName { get; set; } |
|||
|
|||
public Packet WillMessage { get; set; } |
|||
} |
|||
} |
@ -1,25 +0,0 @@ |
|||
|
|||
namespace NewLife.Net.MQTT.Packets |
|||
{ |
|||
/// <summary>连接返回代码</summary>
|
|||
public enum ConnectReturnCode |
|||
{ |
|||
/// <summary>已接受</summary>
|
|||
Accepted = 0x00, |
|||
|
|||
/// <summary>拒绝不可用协议版本</summary>
|
|||
RefusedUnacceptableProtocolVersion = 0X01, |
|||
|
|||
/// <summary>拒绝标识</summary>
|
|||
RefusedIdentifierRejected = 0x02, |
|||
|
|||
/// <summary>服务不可用</summary>
|
|||
RefusedServerUnavailable = 0x03, |
|||
|
|||
/// <summary>错误用户名密码</summary>
|
|||
RefusedBadUsernameOrPassword = 0x04, |
|||
|
|||
/// <summary>为认证</summary>
|
|||
RefusedNotAuthorized = 0x05 |
|||
} |
|||
} |
@ -1,23 +0,0 @@ |
|||
using System; |
|||
|
|||
namespace NewLife.Net.MQTT.Packets |
|||
{ |
|||
/// <summary>数据包</summary>
|
|||
public abstract class DataPacket |
|||
{ |
|||
/// <summary>包类型</summary>
|
|||
public abstract PacketType PacketType { get; } |
|||
|
|||
/// <summary>双向</summary>
|
|||
public virtual Boolean Duplicate => false; |
|||
|
|||
/// <summary>服务质量</summary>
|
|||
public virtual QualityOfService QualityOfService => QualityOfService.AtMostOnce; |
|||
|
|||
/// <summary>保留请求</summary>
|
|||
public virtual Boolean RetainRequested => false; |
|||
|
|||
/// <summary>已重载</summary>
|
|||
public override String ToString() => $"{GetType().Name}[Type={PacketType}, QualityOfService={QualityOfService}, Duplicate={Duplicate}, Retain={RetainRequested}]"; |
|||
} |
|||
} |
@ -1,15 +0,0 @@ |
|||
|
|||
namespace NewLife.Net.MQTT.Packets |
|||
{ |
|||
/// <summary>断开连接</summary>
|
|||
public sealed class DisconnectPacket : DataPacket |
|||
{ |
|||
/// <summary>实例</summary>
|
|||
public static readonly DisconnectPacket Instance = new DisconnectPacket(); |
|||
|
|||
DisconnectPacket() { } |
|||
|
|||
/// <summary>包类型</summary>
|
|||
public override PacketType PacketType => PacketType.DISCONNECT; |
|||
} |
|||
} |
@ -1,23 +0,0 @@ |
|||
|
|||
namespace NewLife.Net.MQTT.Packets |
|||
{ |
|||
/// <summary>包类型</summary>
|
|||
public enum PacketType |
|||
{ |
|||
// ReSharper disable InconsistentNaming
|
|||
CONNECT = 1, |
|||
CONNACK = 2, |
|||
PUBLISH = 3, |
|||
PUBACK = 4, |
|||
PUBREC = 5, |
|||
PUBREL = 6, |
|||
PUBCOMP = 7, |
|||
SUBSCRIBE = 8, |
|||
SUBACK = 9, |
|||
UNSUBSCRIBE = 10, |
|||
UNSUBACK = 11, |
|||
PINGREQ = 12, |
|||
PINGRESP = 13, |
|||
DISCONNECT = 14 |
|||
} |
|||
} |
@ -1,11 +0,0 @@ |
|||
using System; |
|||
|
|||
namespace NewLife.Net.MQTT.Packets |
|||
{ |
|||
/// <summary>带ID数据包</summary>
|
|||
public abstract class PacketWithId : DataPacket |
|||
{ |
|||
/// <summary>包ID</summary>
|
|||
public Int32 PacketId { get; set; } |
|||
} |
|||
} |
@ -1,15 +0,0 @@ |
|||
|
|||
namespace NewLife.Net.MQTT.Packets |
|||
{ |
|||
/// <summary>心跳请求</summary>
|
|||
public sealed class PingReqPacket : DataPacket |
|||
{ |
|||
/// <summary>实例</summary>
|
|||
public static readonly PingReqPacket Instance = new PingReqPacket(); |
|||
|
|||
PingReqPacket() { } |
|||
|
|||
/// <summary>包类型</summary>
|
|||
public override PacketType PacketType => PacketType.PINGREQ; |
|||
} |
|||
} |
@ -1,15 +0,0 @@ |
|||
|
|||
namespace NewLife.Net.MQTT.Packets |
|||
{ |
|||
/// <summary>心跳响应</summary>
|
|||
public sealed class PingRespPacket : DataPacket |
|||
{ |
|||
/// <summary>实例</summary>
|
|||
public static readonly PingRespPacket Instance = new PingRespPacket(); |
|||
|
|||
PingRespPacket() { } |
|||
|
|||
/// <summary>包类型</summary>
|
|||
public override PacketType PacketType => PacketType.PINGRESP; |
|||
} |
|||
} |
@ -1,19 +0,0 @@ |
|||
|
|||
namespace NewLife.Net.MQTT.Packets |
|||
{ |
|||
/// <summary>发布确认</summary>
|
|||
public sealed class PubAckPacket : PacketWithId |
|||
{ |
|||
/// <summary>包类型</summary>
|
|||
public override PacketType PacketType => PacketType.PUBACK; |
|||
|
|||
/// <summary>在响应中</summary>
|
|||
public static PubAckPacket InResponseTo(PublishPacket publishPacket) |
|||
{ |
|||
return new PubAckPacket |
|||
{ |
|||
PacketId = publishPacket.PacketId |
|||
}; |
|||
} |
|||
} |
|||
} |
@ -1,19 +0,0 @@ |
|||
|
|||
namespace NewLife.Net.MQTT.Packets |
|||
{ |
|||
/// <summary>发布完成</summary>
|
|||
public sealed class PubCompPacket : PacketWithId |
|||
{ |
|||
/// <summary>包类型</summary>
|
|||
public override PacketType PacketType => PacketType.PUBCOMP; |
|||
|
|||
/// <summary>包类型</summary>
|
|||
public static PubCompPacket InResponseTo(PubRelPacket publishPacket) |
|||
{ |
|||
return new PubCompPacket |
|||
{ |
|||
PacketId = publishPacket.PacketId |
|||
}; |
|||
} |
|||
} |
|||
} |
@ -1,19 +0,0 @@ |
|||
|
|||
namespace NewLife.Net.MQTT.Packets |
|||
{ |
|||
/// <summary>发布</summary>
|
|||
public sealed class PubRecPacket : PacketWithId |
|||
{ |
|||
/// <summary>包类型</summary>
|
|||
public override PacketType PacketType => PacketType.PUBREC; |
|||
|
|||
/// <summary>在响应中</summary>
|
|||
public static PubRecPacket InResponseTo(PublishPacket publishPacket) |
|||
{ |
|||
return new PubRecPacket |
|||
{ |
|||
PacketId = publishPacket.PacketId |
|||
}; |
|||
} |
|||
} |
|||
} |
@ -1,22 +0,0 @@ |
|||
|
|||
namespace NewLife.Net.MQTT.Packets |
|||
{ |
|||
/// <summary>发布</summary>
|
|||
public sealed class PubRelPacket : PacketWithId |
|||
{ |
|||
/// <summary>包类型</summary>
|
|||
public override PacketType PacketType => PacketType.PUBREL; |
|||
|
|||
/// <summary>服务质量</summary>
|
|||
public override QualityOfService QualityOfService => QualityOfService.AtLeastOnce; |
|||
|
|||
/// <summary>包类型</summary>
|
|||
public static PubRelPacket InResponseTo(PubRecPacket publishPacket) |
|||
{ |
|||
return new PubRelPacket |
|||
{ |
|||
PacketId = publishPacket.PacketId |
|||
}; |
|||
} |
|||
} |
|||
} |
@ -1,79 +0,0 @@ |
|||
|
|||
using System; |
|||
|
|||
namespace NewLife.Net.MQTT.Packets |
|||
{ |
|||
/// <summary>发布包</summary>
|
|||
public sealed class PublishPacket : PacketWithId, IByteBufferHolder |
|||
{ |
|||
readonly QualityOfService qos; |
|||
readonly Boolean duplicate; |
|||
readonly Boolean retainRequested; |
|||
|
|||
public PublishPacket(QualityOfService qos, Boolean duplicate, Boolean retain) |
|||
{ |
|||
this.qos = qos; |
|||
this.duplicate = duplicate; |
|||
retainRequested = retain; |
|||
} |
|||
|
|||
/// <summary>包类型</summary>
|
|||
public override PacketType PacketType => PacketType.PUBLISH; |
|||
|
|||
public override Boolean Duplicate => duplicate; |
|||
|
|||
public override QualityOfService QualityOfService => qos; |
|||
|
|||
public override Boolean RetainRequested => retainRequested; |
|||
|
|||
public String TopicName { get; set; } |
|||
|
|||
public Packet Payload { get; set; } |
|||
|
|||
public Int32 ReferenceCount => Payload.ReferenceCount; |
|||
|
|||
public IReferenceCounted Retain() |
|||
{ |
|||
Payload.Retain(); |
|||
return this; |
|||
} |
|||
|
|||
public IReferenceCounted Retain(Int32 increment) |
|||
{ |
|||
Payload.Retain(increment); |
|||
return this; |
|||
} |
|||
|
|||
public IReferenceCounted Touch() |
|||
{ |
|||
Payload.Touch(); |
|||
return this; |
|||
} |
|||
|
|||
public IReferenceCounted Touch(Object hint) |
|||
{ |
|||
Payload.Touch(hint); |
|||
return this; |
|||
} |
|||
|
|||
public Boolean Release() => Payload.Release(); |
|||
|
|||
public Boolean Release(Int32 decrement) => Payload.Release(decrement); |
|||
|
|||
Packet IByteBufferHolder.Content => Payload; |
|||
|
|||
public IByteBufferHolder Copy() => this.Replace(Payload.Copy()); |
|||
|
|||
public IByteBufferHolder Replace(Packet content) |
|||
{ |
|||
var result = new PublishPacket(qos, duplicate, retainRequested); |
|||
result.TopicName = TopicName; |
|||
result.Payload = content; |
|||
return result; |
|||
} |
|||
|
|||
IByteBufferHolder IByteBufferHolder.Duplicate() => this.Replace(Payload.Duplicate()); |
|||
|
|||
public IByteBufferHolder RetainedDuplicate() => this.Replace(Payload.RetainedDuplicate()); |
|||
} |
|||
} |
@ -1,22 +0,0 @@ |
|||
|
|||
namespace NewLife.Net.MQTT.Packets |
|||
{ |
|||
/// <summary>服务质量</summary>
|
|||
public enum QualityOfService |
|||
{ |
|||
/// <summary>最多一次</summary>
|
|||
AtMostOnce = 0, |
|||
|
|||
/// <summary>至少一次</summary>
|
|||
AtLeastOnce = 0x1, |
|||
|
|||
/// <summary>刚好一次</summary>
|
|||
ExactlyOnce = 0x2, |
|||
|
|||
/// <summary>保留</summary>
|
|||
Reserved = 0x3, |
|||
|
|||
/// <summary>失败</summary>
|
|||
Failure = 0x80 |
|||
} |
|||
} |
@ -1,34 +0,0 @@ |
|||
using System; |
|||
using System.Collections.Generic; |
|||
|
|||
namespace NewLife.Net.MQTT.Packets |
|||
{ |
|||
/// <summary>发布确认</summary>
|
|||
public sealed class SubAckPacket : PacketWithId |
|||
{ |
|||
/// <summary>包类型</summary>
|
|||
public override PacketType PacketType => PacketType.SUBACK; |
|||
|
|||
/// <summary>返回代码</summary>
|
|||
public IReadOnlyList<QualityOfService> ReturnCodes { get; set; } |
|||
|
|||
public static SubAckPacket InResponseTo(SubscribePacket subscribePacket, QualityOfService maxQoS) |
|||
{ |
|||
var subAckPacket = new SubAckPacket |
|||
{ |
|||
PacketId = subscribePacket.PacketId |
|||
}; |
|||
var subscriptionRequests = subscribePacket.Requests; |
|||
var returnCodes = new QualityOfService[subscriptionRequests.Count]; |
|||
for (var i = 0; i < subscriptionRequests.Count; i++) |
|||
{ |
|||
var requestedQos = subscriptionRequests[i].QualityOfService; |
|||
returnCodes[i] = requestedQos <= maxQoS ? requestedQos : maxQoS; |
|||
} |
|||
|
|||
subAckPacket.ReturnCodes = returnCodes; |
|||
|
|||
return subAckPacket; |
|||
} |
|||
} |
|||
} |
@ -1,28 +0,0 @@ |
|||
|
|||
using System.Collections.Generic; |
|||
|
|||
namespace NewLife.Net.MQTT.Packets |
|||
{ |
|||
/// <summary>订阅</summary>
|
|||
public sealed class SubscribePacket : PacketWithId |
|||
{ |
|||
public SubscribePacket() |
|||
{ |
|||
} |
|||
|
|||
public SubscribePacket(System.Int32 packetId, params SubscriptionRequest[] requests) |
|||
{ |
|||
PacketId = packetId; |
|||
Requests = requests; |
|||
} |
|||
|
|||
/// <summary>包类型</summary>
|
|||
public override PacketType PacketType => PacketType.SUBSCRIBE; |
|||
|
|||
/// <summary>服务质量</summary>
|
|||
public override QualityOfService QualityOfService => QualityOfService.AtLeastOnce; |
|||
|
|||
/// <summary>请求集合</summary>
|
|||
public IReadOnlyList<SubscriptionRequest> Requests { get; set; } |
|||
} |
|||
} |
@ -1,29 +0,0 @@ |
|||
using System; |
|||
using System.Diagnostics.Contracts; |
|||
|
|||
namespace NewLife.Net.MQTT.Packets |
|||
{ |
|||
/// <summary>包类型</summary>
|
|||
public class SubscriptionRequest : IEquatable<SubscriptionRequest> |
|||
{ |
|||
public SubscriptionRequest(String topicFilter, QualityOfService qualityOfService) |
|||
{ |
|||
Contract.Requires(!String.IsNullOrEmpty(topicFilter)); |
|||
|
|||
TopicFilter = topicFilter; |
|||
QualityOfService = qualityOfService; |
|||
} |
|||
|
|||
public String TopicFilter { get; } |
|||
|
|||
public QualityOfService QualityOfService { get; } |
|||
|
|||
public Boolean Equals(SubscriptionRequest other) |
|||
{ |
|||
return QualityOfService == other.QualityOfService |
|||
&& TopicFilter.Equals(other.TopicFilter, StringComparison.Ordinal); |
|||
} |
|||
|
|||
public override String ToString() => $"{GetType().Name}[TopicFilter={TopicFilter}, QualityOfService={QualityOfService}]"; |
|||
} |
|||
} |
@ -1,18 +0,0 @@ |
|||
|
|||
namespace NewLife.Net.MQTT.Packets |
|||
{ |
|||
/// <summary>取消订阅确认</summary>
|
|||
public sealed class UnsubAckPacket : PacketWithId |
|||
{ |
|||
/// <summary>包类型</summary>
|
|||
public override PacketType PacketType => PacketType.UNSUBACK; |
|||
|
|||
public static UnsubAckPacket InResponseTo(UnsubscribePacket unsubscribePacket) |
|||
{ |
|||
return new UnsubAckPacket |
|||
{ |
|||
PacketId = unsubscribePacket.PacketId |
|||
}; |
|||
} |
|||
} |
|||
} |
@ -1,26 +0,0 @@ |
|||
using System; |
|||
using System.Collections.Generic; |
|||
|
|||
namespace NewLife.Net.MQTT.Packets |
|||
{ |
|||
/// <summary>取消订阅</summary>
|
|||
public sealed class UnsubscribePacket : PacketWithId |
|||
{ |
|||
public UnsubscribePacket() { } |
|||
|
|||
public UnsubscribePacket(Int32 packetId, params String[] topicFilters) |
|||
{ |
|||
PacketId = packetId; |
|||
TopicFilters = topicFilters; |
|||
} |
|||
|
|||
/// <summary>包类型</summary>
|
|||
public override PacketType PacketType => PacketType.UNSUBSCRIBE; |
|||
|
|||
/// <summary>服务质量</summary>
|
|||
public override QualityOfService QualityOfService => QualityOfService.AtLeastOnce; |
|||
|
|||
/// <summary>主题过滤器</summary>
|
|||
public IEnumerable<String> TopicFilters { get; set; } |
|||
} |
|||
} |
@ -1,20 +0,0 @@ |
|||
using System; |
|||
|
|||
namespace NewLife.Net.MQTT |
|||
{ |
|||
/// <summary>服务质量</summary>
|
|||
public enum QualityOfService : Byte |
|||
{ |
|||
/// <summary>至多一次 发完即丢弃</summary>
|
|||
Q0 = 0, |
|||
|
|||
/// <summary>至少一次 需要确认回复</summary>
|
|||
Q1, |
|||
|
|||
/// <summary>只有一次 需要确认回复</summary>
|
|||
Q2, |
|||
|
|||
/// <summary>保留</summary>
|
|||
Q3 |
|||
} |
|||
} |
@ -1,35 +0,0 @@ |
|||
using System; |
|||
using System.Runtime.CompilerServices; |
|||
using NewLife.Net.MQTT.Packets; |
|||
|
|||
namespace NewLife.Net.MQTT |
|||
{ |
|||
/// <summary>签名</summary>
|
|||
static class Signatures |
|||
{ |
|||
const Byte QoS1Signature = (Int32)QualityOfService.AtLeastOnce << 1; |
|||
|
|||
// most often used (anticipated) come first
|
|||
|
|||
[MethodImpl(MethodImplOptions.AggressiveInlining)] |
|||
public static Boolean IsPublish(Int32 signature) |
|||
{ |
|||
const Byte TypeOnlyMask = 0xf << 4; |
|||
return (signature & TypeOnlyMask) == ((Int32)PacketType.PUBLISH << 4); |
|||
} |
|||
|
|||
public const Byte PubAck = (Int32)PacketType.PUBACK << 4; |
|||
public const Byte PubRec = (Int32)PacketType.PUBREC << 4; |
|||
public const Byte PubRel = ((Int32)PacketType.PUBREL << 4) | QoS1Signature; |
|||
public const Byte PubComp = (Int32)PacketType.PUBCOMP << 4; |
|||
public const Byte Connect = (Int32)PacketType.CONNECT << 4; |
|||
public const Byte ConnAck = (Int32)PacketType.CONNACK << 4; |
|||
public const Byte Subscribe = ((Int32)PacketType.SUBSCRIBE << 4) | QoS1Signature; |
|||
public const Byte SubAck = (Int32)PacketType.SUBACK << 4; |
|||
public const Byte PingReq = (Int32)PacketType.PINGREQ << 4; |
|||
public const Byte PingResp = (Int32)PacketType.PINGRESP << 4; |
|||
public const Byte Disconnect = (Int32)PacketType.DISCONNECT << 4; |
|||
public const Byte Unsubscribe = ((Int32)PacketType.UNSUBSCRIBE << 4) | QoS1Signature; |
|||
public const Byte UnsubAck = (Int32)PacketType.UNSUBACK << 4; |
|||
} |
|||
} |
@ -1,43 +0,0 @@ |
|||
using System; |
|||
using NewLife.Exceptions; |
|||
|
|||
namespace NewLife.Net.MQTT |
|||
{ |
|||
/// <summary>工具类</summary>
|
|||
static class Util |
|||
{ |
|||
public const String ProtocolName = "MQTT"; |
|||
public const Int32 ProtocolLevel = 4; |
|||
|
|||
static readonly Char[] TopicWildcards = { '#', '+' }; |
|||
|
|||
public static void ValidateTopicName(String topicName) |
|||
{ |
|||
if (topicName.Length == 0) |
|||
{ |
|||
throw new DecoderException("[MQTT-4.7.3-1]"); |
|||
} |
|||
|
|||
if (topicName.IndexOfAny(TopicWildcards) > 0) |
|||
{ |
|||
throw new DecoderException($"Invalid PUBLISH topic name: {topicName}"); |
|||
} |
|||
} |
|||
|
|||
public static void ValidatePacketId(Int32 packetId) |
|||
{ |
|||
if (packetId < 1) |
|||
{ |
|||
throw new DecoderException("Invalid packet identifier: " + packetId); |
|||
} |
|||
} |
|||
|
|||
public static void ValidateClientId(String clientId) |
|||
{ |
|||
if (clientId == null) |
|||
{ |
|||
throw new DecoderException("Client identifier is required."); |
|||
} |
|||
} |
|||
} |
|||
} |
Write
Preview
Loading…
Cancel
Save
Reference in new issue