123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567 |
- #if !BESTHTTP_DISABLE_SIGNALR_CORE && BESTHTTP_SIGNALR_CORE_ENABLE_MESSAGEPACK_CSHARP
- using System;
- using System.Buffers;
- using System.Collections.Generic;
- using BestHTTP.Extensions;
- using BestHTTP.PlatformSupport.Memory;
- using BestHTTP.SignalRCore.Messages;
- using MessagePack;
- namespace BestHTTP.SignalRCore.Encoders
- {
- class BufferPoolBufferWriter : IBufferWriter<byte>
- {
- private BufferPoolMemoryStream underlyingStream;
- private BufferSegment last;
- public BufferPoolBufferWriter(BufferPoolMemoryStream stream)
- {
- this.underlyingStream = stream;
- this.last = BufferSegment.Empty;
- }
- public void Advance(int count)
- {
- this.underlyingStream.Write(this.last.Data, this.last.Offset, this.last.Count + count);
- BufferPool.Release(this.last);
- this.last = BufferSegment.Empty;
- }
- public Memory<byte> GetMemory(int sizeHint = 0)
- {
- var buffer = BufferPool.Get(Math.Max(sizeHint, BufferPool.MinBufferSize), true);
- //Array.Clear(buffer, 0, buffer.Length);
- this.last = new BufferSegment(buffer, 0, 0);
- return new Memory<byte>(buffer, 0, buffer.Length);
- }
- public Span<byte> GetSpan(int sizeHint = 0)
- {
- var buffer = BufferPool.Get(Math.Max(sizeHint, BufferPool.MinBufferSize), true);
- //Array.Clear(buffer, 0, buffer.Length);
- this.last = new BufferSegment(buffer, 0, 0);
- return new Span<byte>(buffer, 0, buffer.Length);
- }
- }
- public sealed class MessagePackCSharpProtocol : BestHTTP.SignalRCore.IProtocol
- {
- public string Name { get { return "messagepack"; } }
- public TransferModes Type { get { return TransferModes.Binary; } }
- public IEncoder Encoder { get; private set; }
- public HubConnection Connection { get; set; }
- public BufferSegment EncodeMessage(Message message)
- {
- var memBuffer = BufferPool.Get(256, true);
- var stream = new BufferPoolMemoryStream(memBuffer, 0, memBuffer.Length, true, true, false, true);
- // Write 5 bytes for placeholder for length prefix
- stream.WriteByte(0);
- stream.WriteByte(0);
- stream.WriteByte(0);
- stream.WriteByte(0);
- stream.WriteByte(0);
- var bufferWriter = new BufferPoolBufferWriter(stream);
- var writer = new MessagePackWriter(bufferWriter);
- switch (message.type)
- {
- case MessageTypes.StreamItem:
- // https://github.com/aspnet/AspNetCore/blob/master/src/SignalR/docs/specs/HubProtocol.md#streamitem-message-encoding-1
- // [2, Headers, InvocationId, Item]
- writer.WriteArrayHeader(4);
- writer.Write(2);
- WriteHeaders(ref writer);
- WriteString(ref writer, message.invocationId);
- WriteValue(ref writer, bufferWriter, message.item);
- break;
- case MessageTypes.Completion:
- // https://github.com/aspnet/AspNetCore/blob/master/src/SignalR/docs/specs/HubProtocol.md#completion-message-encoding-1
- // [3, Headers, InvocationId, ResultKind, Result?]
- byte resultKind = (byte)(!string.IsNullOrEmpty(message.error) ? /*error*/ 1 : message.result != null ? /*non-void*/ 3 : /*void*/ 2);
- writer.WriteArrayHeader(resultKind == 2 ? 4 : 5);
- writer.Write(3);
- WriteHeaders(ref writer);
- WriteString(ref writer, message.invocationId);
- writer.Write(resultKind);
- if (resultKind == 1) // error
- WriteString(ref writer, message.error);
- else if (resultKind == 3) // non-void
- WriteValue(ref writer, bufferWriter, message.result);
- break;
- case MessageTypes.Invocation:
- // https://github.com/aspnet/AspNetCore/blob/master/src/SignalR/docs/specs/HubProtocol.md#invocation-message-encoding-1
- // [1, Headers, InvocationId, NonBlocking, Target, [Arguments], [StreamIds]]
- case MessageTypes.StreamInvocation:
- // https://github.com/aspnet/AspNetCore/blob/master/src/SignalR/docs/specs/HubProtocol.md#streaminvocation-message-encoding-1
- // [4, Headers, InvocationId, Target, [Arguments], [StreamIds]]
- writer.WriteArrayHeader(message.streamIds != null ? 6 : 5);
- writer.Write((int)message.type);
- WriteHeaders(ref writer);
- WriteString(ref writer, message.invocationId);
- WriteString(ref writer, message.target);
- writer.WriteArrayHeader(message.arguments != null ? message.arguments.Length : 0);
- if (message.arguments != null)
- for (int i = 0; i < message.arguments.Length; ++i)
- WriteValue(ref writer, bufferWriter, message.arguments[i]);
- if (message.streamIds != null)
- {
- writer.WriteArrayHeader(message.streamIds.Length);
- for (int i = 0; i < message.streamIds.Length; ++i)
- WriteValue(ref writer, bufferWriter, message.streamIds[i]);
- }
- break;
- case MessageTypes.CancelInvocation:
- // https://github.com/aspnet/AspNetCore/blob/master/src/SignalR/docs/specs/HubProtocol.md#cancelinvocation-message-encoding-1
- // [5, Headers, InvocationId]
- writer.WriteArrayHeader(3);
- writer.Write(5);
- WriteHeaders(ref writer);
- WriteString(ref writer, message.invocationId);
- break;
- case MessageTypes.Ping:
- // https://github.com/aspnet/AspNetCore/blob/master/src/SignalR/docs/specs/HubProtocol.md#ping-message-encoding-1
- // [6]
- writer.WriteArrayHeader(1);
- writer.Write(6);
- break;
- case MessageTypes.Close:
- // https://github.com/aspnet/AspNetCore/blob/master/src/SignalR/docs/specs/HubProtocol.md#close-message-encoding-1
- // [7, Error, AllowReconnect?]
- writer.WriteArrayHeader(string.IsNullOrEmpty(message.error) ? 1 : 2);
- writer.Write(7);
- if (!string.IsNullOrEmpty(message.error))
- WriteString(ref writer, message.error);
- break;
- }
- writer.Flush();
- // get how much bytes got written to the buffer. This includes the 5 placeholder bytes too.
- int length = (int)stream.Position;
- // this is the length without the 5 placeholder bytes
- int contentLength = length - 5;
- // get the stream's internal buffer. We set the releaseBuffer flag to false, so we can use it safely.
- var buffer = stream.GetBuffer();
- // add varint length prefix
- byte prefixBytes = GetRequiredBytesForLengthPrefix(contentLength);
- WriteLengthAsVarInt(buffer, 5 - prefixBytes, contentLength);
- // return with the final segment
- return new BufferSegment(buffer, 5 - prefixBytes, contentLength + prefixBytes);
- }
- private void WriteValue(ref MessagePackWriter writer, BufferPoolBufferWriter bufferWriter, object item)
- {
- if (item == null)
- writer.WriteNil();
- else
- {
- writer.Flush();
- MessagePackSerializer.Serialize(item.GetType(), bufferWriter, item);
- }
- }
- private void WriteString(ref MessagePackWriter writer, string str)
- {
- if (str == null)
- writer.WriteNil();
- else
- {
- int count = System.Text.Encoding.UTF8.GetByteCount(str);
- var buffer = BufferPool.Get(count, true);
- System.Text.Encoding.UTF8.GetBytes(str, 0, str.Length, buffer, 0);
- writer.WriteString(new ReadOnlySpan<byte>(buffer, 0, count));
- BufferPool.Release(buffer);
- }
- }
- private void WriteHeaders(ref MessagePackWriter writer)
- {
- writer.WriteMapHeader(0);
- }
- public void ParseMessages(BufferSegment segment, ref List<Message> messages)
- {
- int offset = segment.Offset;
- while (offset < segment.Count)
- {
- int length = (int)ReadVarInt(segment.Data, ref offset);
- var reader = new MessagePackReader(new ReadOnlyMemory<byte>(segment.Data, offset, length));
- int arrayLength = reader.ReadArrayHeader();
- int messageType = reader.ReadByte();
- switch ((MessageTypes)messageType)
- {
- case MessageTypes.Invocation: messages.Add(ReadInvocation(ref reader)); break;
- case MessageTypes.StreamItem: messages.Add(ReadStreamItem(ref reader)); break;
- case MessageTypes.Completion: messages.Add(ReadCompletion(ref reader)); break;
- case MessageTypes.StreamInvocation: messages.Add(ReadStreamInvocation(ref reader)); break;
- case MessageTypes.CancelInvocation: messages.Add(ReadCancelInvocation(ref reader)); break;
- case MessageTypes.Ping:
- // https://github.com/aspnet/AspNetCore/blob/master/src/SignalR/docs/specs/HubProtocol.md#ping-message-encoding-1
- messages.Add(new Message { type = MessageTypes.Ping });
- break;
- case MessageTypes.Close: messages.Add(ReadClose(ref reader)); break;
- }
- offset += length;
- }
- }
- private Message ReadClose(ref MessagePackReader reader)
- {
- // https://github.com/aspnet/AspNetCore/blob/master/src/SignalR/docs/specs/HubProtocol.md#close-message-encoding-1
- string error = reader.ReadString();
- bool allowReconnect = false;
- try
- {
- allowReconnect = reader.ReadBoolean();
- }
- catch { }
- return new Message
- {
- type = MessageTypes.Close,
- error = error,
- allowReconnect = allowReconnect
- };
- }
- private Message ReadCancelInvocation(ref MessagePackReader reader)
- {
- // https://github.com/aspnet/AspNetCore/blob/master/src/SignalR/docs/specs/HubProtocol.md#cancelinvocation-message-encoding-1
- ReadHeaders(ref reader);
- string invocationId = reader.ReadString();
- return new Message
- {
- type = MessageTypes.CancelInvocation,
- invocationId = invocationId
- };
- }
- private Message ReadStreamInvocation(ref MessagePackReader reader)
- {
- // https://github.com/aspnet/AspNetCore/blob/master/src/SignalR/docs/specs/HubProtocol.md#streaminvocation-message-encoding-1
- ReadHeaders(ref reader);
- string invocationId = reader.ReadString();
- string target = reader.ReadString();
- object[] arguments = ReadArguments(ref reader, target);
- string[] streamIds = ReadStreamIds(ref reader);
- return new Message
- {
- type = MessageTypes.StreamInvocation,
- invocationId = invocationId,
- target = target,
- arguments = arguments,
- streamIds = streamIds
- };
- }
- private Message ReadCompletion(ref MessagePackReader reader)
- {
- // https://github.com/aspnet/AspNetCore/blob/master/src/SignalR/docs/specs/HubProtocol.md#completion-message-encoding-1
- ReadHeaders(ref reader);
- string invocationId = reader.ReadString();
- byte resultKind = reader.ReadByte();
- switch (resultKind)
- {
- // 1 - Error result - Result contains a String with the error message
- case 1:
- string error = reader.ReadString();
- return new Message
- {
- type = MessageTypes.Completion,
- invocationId = invocationId,
- error = error
- };
- // 2 - Void result - Result is absent
- case 2:
- return new Message
- {
- type = MessageTypes.Completion,
- invocationId = invocationId
- };
- // 3 - Non-Void result - Result contains the value returned by the server
- case 3:
- object item = ReadItem(ref reader, invocationId);
- return new Message
- {
- type = MessageTypes.Completion,
- invocationId = invocationId,
- item = item,
- result = item
- };
- default:
- throw new NotImplementedException("Unknown resultKind: " + resultKind);
- }
- }
- private Message ReadStreamItem(ref MessagePackReader reader)
- {
- // https://github.com/aspnet/AspNetCore/blob/master/src/SignalR/docs/specs/HubProtocol.md#streamitem-message-encoding-1
- ReadHeaders(ref reader);
- string invocationId = reader.ReadString();
- object item = ReadItem(ref reader, invocationId);
- return new Message
- {
- type = MessageTypes.StreamItem,
- invocationId = invocationId,
- item = item
- };
- }
- private Message ReadInvocation(ref MessagePackReader reader)
- {
- // https://github.com/aspnet/AspNetCore/blob/master/src/SignalR/docs/specs/HubProtocol.md#invocation-message-encoding-1
- ReadHeaders(ref reader);
- string invocationId = reader.ReadString();
- string target = reader.ReadString();
- object[] arguments = ReadArguments(ref reader, target);
- string[] streamIds = ReadStreamIds(ref reader);
- return new Message
- {
- type = MessageTypes.Invocation,
- invocationId = invocationId,
- target = target,
- arguments = arguments,
- streamIds = streamIds
- };
- }
- private object ReadItem(ref MessagePackReader reader, string invocationId)
- {
- long longId = 0;
- if (long.TryParse(invocationId, out longId))
- {
- Type itemType = this.Connection.GetItemType(longId);
- return MessagePackSerializer.Deserialize(itemType, reader.ReadRaw());
- }
- else
- {
- reader.Skip();
- return null;
- }
- }
- private string[] ReadStreamIds(ref MessagePackReader reader)
- {
- var count = reader.ReadArrayHeader();
- string[] result = null;
- if (count > 0)
- {
- result = new string[count];
- for (int i = 0; i < count; i++)
- result[i] = reader.ReadString();
- }
- return result;
- }
- private object[] ReadArguments(ref MessagePackReader reader, string target)
- {
- var subscription = this.Connection.GetSubscription(target);
- object[] args = null;
- if (subscription == null || subscription.callbacks == null || subscription.callbacks.Count == 0)
- {
- reader.Skip();
- }
- else
- {
- int count = reader.ReadArrayHeader();
- if (subscription.callbacks[0].ParamTypes != null)
- {
- args = new object[subscription.callbacks[0].ParamTypes.Length];
- for (int i = 0; i < subscription.callbacks[0].ParamTypes.Length; ++i)
- args[i] = MessagePackSerializer.Deserialize(subscription.callbacks[0].ParamTypes[i], reader.ReadRaw());
- }
- else
- args = null;
- }
- return args;
- }
- private Dictionary<string, string> ReadHeaders(ref MessagePackReader reader)
- {
- int count = reader.ReadMapHeader();
- Dictionary<string, string> result = null;
- if (count > 0)
- {
- result = new Dictionary<string, string>(count);
- for (int i = 0; i < count; i++)
- {
- string key = reader.ReadString();
- string value = reader.ReadString();
- result.Add(key, value);
- }
- }
- return result;
- }
- public static byte GetRequiredBytesForLengthPrefix(int length)
- {
- byte bytes = 0;
- do
- {
- length >>= 7;
- bytes++;
- }
- while (length > 0);
- return bytes;
- }
- public static int WriteLengthAsVarInt(byte[] data, int offset, int length)
- {
- do
- {
- var current = data[offset];
- current = (byte)(length & 0x7f);
- length >>= 7;
- if (length > 0)
- {
- current |= 0x80;
- }
- data[offset++] = current;
- }
- while (length > 0);
- return offset;
- }
- public static uint ReadVarInt(byte[] data, ref int offset)
- {
- var length = 0U;
- var numBytes = 0;
- byte byteRead;
- do
- {
- byteRead = data[offset + numBytes];
- length = length | (((uint)(byteRead & 0x7f)) << (numBytes * 7));
- numBytes++;
- }
- while (offset + numBytes < data.Length && ((byteRead & 0x80) != 0));
- offset += numBytes;
- return length;
- }
- public object ConvertTo(Type toType, object obj)
- {
- if (obj == null)
- return null;
- #if NETFX_CORE
- TypeInfo typeInfo = toType.GetTypeInfo();
- #endif
- #if NETFX_CORE
- if (typeInfo.IsEnum)
- #else
- if (toType.IsEnum)
- #endif
- return Enum.Parse(toType, obj.ToString(), true);
- #if NETFX_CORE
- if (typeInfo.IsPrimitive)
- #else
- if (toType.IsPrimitive)
- #endif
- return Convert.ChangeType(obj, toType);
- if (toType == typeof(string))
- return obj.ToString();
- #if NETFX_CORE
- if (typeInfo.IsGenericType && toType.Name == "Nullable`1")
- return Convert.ChangeType(obj, toType.GenericTypeArguments[0]);
- #else
- if (toType.IsGenericType && toType.Name == "Nullable`1")
- return Convert.ChangeType(obj, toType.GetGenericArguments()[0]);
- #endif
- return obj;
- }
- public object[] GetRealArguments(Type[] argTypes, object[] arguments)
- {
- if (arguments == null || arguments.Length == 0)
- return null;
- if (argTypes.Length > arguments.Length)
- throw new Exception(string.Format("argType.Length({0}) < arguments.length({1})", argTypes.Length, arguments.Length));
- return arguments;
- }
- }
- }
- #endif
|