MessagePackCSharpProtocol.cs 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567
  1. #if !BESTHTTP_DISABLE_SIGNALR_CORE && BESTHTTP_SIGNALR_CORE_ENABLE_MESSAGEPACK_CSHARP
  2. using System;
  3. using System.Buffers;
  4. using System.Collections.Generic;
  5. using BestHTTP.Extensions;
  6. using BestHTTP.PlatformSupport.Memory;
  7. using BestHTTP.SignalRCore.Messages;
  8. using MessagePack;
  9. namespace BestHTTP.SignalRCore.Encoders
  10. {
  11. class BufferPoolBufferWriter : IBufferWriter<byte>
  12. {
  13. private BufferPoolMemoryStream underlyingStream;
  14. private BufferSegment last;
  15. public BufferPoolBufferWriter(BufferPoolMemoryStream stream)
  16. {
  17. this.underlyingStream = stream;
  18. this.last = BufferSegment.Empty;
  19. }
  20. public void Advance(int count)
  21. {
  22. this.underlyingStream.Write(this.last.Data, this.last.Offset, this.last.Count + count);
  23. BufferPool.Release(this.last);
  24. this.last = BufferSegment.Empty;
  25. }
  26. public Memory<byte> GetMemory(int sizeHint = 0)
  27. {
  28. var buffer = BufferPool.Get(Math.Max(sizeHint, BufferPool.MinBufferSize), true);
  29. //Array.Clear(buffer, 0, buffer.Length);
  30. this.last = new BufferSegment(buffer, 0, 0);
  31. return new Memory<byte>(buffer, 0, buffer.Length);
  32. }
  33. public Span<byte> GetSpan(int sizeHint = 0)
  34. {
  35. var buffer = BufferPool.Get(Math.Max(sizeHint, BufferPool.MinBufferSize), true);
  36. //Array.Clear(buffer, 0, buffer.Length);
  37. this.last = new BufferSegment(buffer, 0, 0);
  38. return new Span<byte>(buffer, 0, buffer.Length);
  39. }
  40. }
  41. public sealed class MessagePackCSharpProtocol : BestHTTP.SignalRCore.IProtocol
  42. {
  43. public string Name { get { return "messagepack"; } }
  44. public TransferModes Type { get { return TransferModes.Binary; } }
  45. public IEncoder Encoder { get; private set; }
  46. public HubConnection Connection { get; set; }
  47. public BufferSegment EncodeMessage(Message message)
  48. {
  49. var memBuffer = BufferPool.Get(256, true);
  50. var stream = new BufferPoolMemoryStream(memBuffer, 0, memBuffer.Length, true, true, false, true);
  51. // Write 5 bytes for placeholder for length prefix
  52. stream.WriteByte(0);
  53. stream.WriteByte(0);
  54. stream.WriteByte(0);
  55. stream.WriteByte(0);
  56. stream.WriteByte(0);
  57. var bufferWriter = new BufferPoolBufferWriter(stream);
  58. var writer = new MessagePackWriter(bufferWriter);
  59. switch (message.type)
  60. {
  61. case MessageTypes.StreamItem:
  62. // https://github.com/aspnet/AspNetCore/blob/master/src/SignalR/docs/specs/HubProtocol.md#streamitem-message-encoding-1
  63. // [2, Headers, InvocationId, Item]
  64. writer.WriteArrayHeader(4);
  65. writer.Write(2);
  66. WriteHeaders(ref writer);
  67. WriteString(ref writer, message.invocationId);
  68. WriteValue(ref writer, bufferWriter, message.item);
  69. break;
  70. case MessageTypes.Completion:
  71. // https://github.com/aspnet/AspNetCore/blob/master/src/SignalR/docs/specs/HubProtocol.md#completion-message-encoding-1
  72. // [3, Headers, InvocationId, ResultKind, Result?]
  73. byte resultKind = (byte)(!string.IsNullOrEmpty(message.error) ? /*error*/ 1 : message.result != null ? /*non-void*/ 3 : /*void*/ 2);
  74. writer.WriteArrayHeader(resultKind == 2 ? 4 : 5);
  75. writer.Write(3);
  76. WriteHeaders(ref writer);
  77. WriteString(ref writer, message.invocationId);
  78. writer.Write(resultKind);
  79. if (resultKind == 1) // error
  80. WriteString(ref writer, message.error);
  81. else if (resultKind == 3) // non-void
  82. WriteValue(ref writer, bufferWriter, message.result);
  83. break;
  84. case MessageTypes.Invocation:
  85. // https://github.com/aspnet/AspNetCore/blob/master/src/SignalR/docs/specs/HubProtocol.md#invocation-message-encoding-1
  86. // [1, Headers, InvocationId, NonBlocking, Target, [Arguments], [StreamIds]]
  87. case MessageTypes.StreamInvocation:
  88. // https://github.com/aspnet/AspNetCore/blob/master/src/SignalR/docs/specs/HubProtocol.md#streaminvocation-message-encoding-1
  89. // [4, Headers, InvocationId, Target, [Arguments], [StreamIds]]
  90. writer.WriteArrayHeader(message.streamIds != null ? 6 : 5);
  91. writer.Write((int)message.type);
  92. WriteHeaders(ref writer);
  93. WriteString(ref writer, message.invocationId);
  94. WriteString(ref writer, message.target);
  95. writer.WriteArrayHeader(message.arguments != null ? message.arguments.Length : 0);
  96. if (message.arguments != null)
  97. for (int i = 0; i < message.arguments.Length; ++i)
  98. WriteValue(ref writer, bufferWriter, message.arguments[i]);
  99. if (message.streamIds != null)
  100. {
  101. writer.WriteArrayHeader(message.streamIds.Length);
  102. for (int i = 0; i < message.streamIds.Length; ++i)
  103. WriteValue(ref writer, bufferWriter, message.streamIds[i]);
  104. }
  105. break;
  106. case MessageTypes.CancelInvocation:
  107. // https://github.com/aspnet/AspNetCore/blob/master/src/SignalR/docs/specs/HubProtocol.md#cancelinvocation-message-encoding-1
  108. // [5, Headers, InvocationId]
  109. writer.WriteArrayHeader(3);
  110. writer.Write(5);
  111. WriteHeaders(ref writer);
  112. WriteString(ref writer, message.invocationId);
  113. break;
  114. case MessageTypes.Ping:
  115. // https://github.com/aspnet/AspNetCore/blob/master/src/SignalR/docs/specs/HubProtocol.md#ping-message-encoding-1
  116. // [6]
  117. writer.WriteArrayHeader(1);
  118. writer.Write(6);
  119. break;
  120. case MessageTypes.Close:
  121. // https://github.com/aspnet/AspNetCore/blob/master/src/SignalR/docs/specs/HubProtocol.md#close-message-encoding-1
  122. // [7, Error, AllowReconnect?]
  123. writer.WriteArrayHeader(string.IsNullOrEmpty(message.error) ? 1 : 2);
  124. writer.Write(7);
  125. if (!string.IsNullOrEmpty(message.error))
  126. WriteString(ref writer, message.error);
  127. break;
  128. }
  129. writer.Flush();
  130. // get how much bytes got written to the buffer. This includes the 5 placeholder bytes too.
  131. int length = (int)stream.Position;
  132. // this is the length without the 5 placeholder bytes
  133. int contentLength = length - 5;
  134. // get the stream's internal buffer. We set the releaseBuffer flag to false, so we can use it safely.
  135. var buffer = stream.GetBuffer();
  136. // add varint length prefix
  137. byte prefixBytes = GetRequiredBytesForLengthPrefix(contentLength);
  138. WriteLengthAsVarInt(buffer, 5 - prefixBytes, contentLength);
  139. // return with the final segment
  140. return new BufferSegment(buffer, 5 - prefixBytes, contentLength + prefixBytes);
  141. }
  142. private void WriteValue(ref MessagePackWriter writer, BufferPoolBufferWriter bufferWriter, object item)
  143. {
  144. if (item == null)
  145. writer.WriteNil();
  146. else
  147. {
  148. writer.Flush();
  149. MessagePackSerializer.Serialize(item.GetType(), bufferWriter, item);
  150. }
  151. }
  152. private void WriteString(ref MessagePackWriter writer, string str)
  153. {
  154. if (str == null)
  155. writer.WriteNil();
  156. else
  157. {
  158. int count = System.Text.Encoding.UTF8.GetByteCount(str);
  159. var buffer = BufferPool.Get(count, true);
  160. System.Text.Encoding.UTF8.GetBytes(str, 0, str.Length, buffer, 0);
  161. writer.WriteString(new ReadOnlySpan<byte>(buffer, 0, count));
  162. BufferPool.Release(buffer);
  163. }
  164. }
  165. private void WriteHeaders(ref MessagePackWriter writer)
  166. {
  167. writer.WriteMapHeader(0);
  168. }
  169. public void ParseMessages(BufferSegment segment, ref List<Message> messages)
  170. {
  171. int offset = segment.Offset;
  172. while (offset < segment.Count)
  173. {
  174. int length = (int)ReadVarInt(segment.Data, ref offset);
  175. var reader = new MessagePackReader(new ReadOnlyMemory<byte>(segment.Data, offset, length));
  176. int arrayLength = reader.ReadArrayHeader();
  177. int messageType = reader.ReadByte();
  178. switch ((MessageTypes)messageType)
  179. {
  180. case MessageTypes.Invocation: messages.Add(ReadInvocation(ref reader)); break;
  181. case MessageTypes.StreamItem: messages.Add(ReadStreamItem(ref reader)); break;
  182. case MessageTypes.Completion: messages.Add(ReadCompletion(ref reader)); break;
  183. case MessageTypes.StreamInvocation: messages.Add(ReadStreamInvocation(ref reader)); break;
  184. case MessageTypes.CancelInvocation: messages.Add(ReadCancelInvocation(ref reader)); break;
  185. case MessageTypes.Ping:
  186. // https://github.com/aspnet/AspNetCore/blob/master/src/SignalR/docs/specs/HubProtocol.md#ping-message-encoding-1
  187. messages.Add(new Message { type = MessageTypes.Ping });
  188. break;
  189. case MessageTypes.Close: messages.Add(ReadClose(ref reader)); break;
  190. }
  191. offset += length;
  192. }
  193. }
  194. private Message ReadClose(ref MessagePackReader reader)
  195. {
  196. // https://github.com/aspnet/AspNetCore/blob/master/src/SignalR/docs/specs/HubProtocol.md#close-message-encoding-1
  197. string error = reader.ReadString();
  198. bool allowReconnect = false;
  199. try
  200. {
  201. allowReconnect = reader.ReadBoolean();
  202. }
  203. catch { }
  204. return new Message
  205. {
  206. type = MessageTypes.Close,
  207. error = error,
  208. allowReconnect = allowReconnect
  209. };
  210. }
  211. private Message ReadCancelInvocation(ref MessagePackReader reader)
  212. {
  213. // https://github.com/aspnet/AspNetCore/blob/master/src/SignalR/docs/specs/HubProtocol.md#cancelinvocation-message-encoding-1
  214. ReadHeaders(ref reader);
  215. string invocationId = reader.ReadString();
  216. return new Message
  217. {
  218. type = MessageTypes.CancelInvocation,
  219. invocationId = invocationId
  220. };
  221. }
  222. private Message ReadStreamInvocation(ref MessagePackReader reader)
  223. {
  224. // https://github.com/aspnet/AspNetCore/blob/master/src/SignalR/docs/specs/HubProtocol.md#streaminvocation-message-encoding-1
  225. ReadHeaders(ref reader);
  226. string invocationId = reader.ReadString();
  227. string target = reader.ReadString();
  228. object[] arguments = ReadArguments(ref reader, target);
  229. string[] streamIds = ReadStreamIds(ref reader);
  230. return new Message
  231. {
  232. type = MessageTypes.StreamInvocation,
  233. invocationId = invocationId,
  234. target = target,
  235. arguments = arguments,
  236. streamIds = streamIds
  237. };
  238. }
  239. private Message ReadCompletion(ref MessagePackReader reader)
  240. {
  241. // https://github.com/aspnet/AspNetCore/blob/master/src/SignalR/docs/specs/HubProtocol.md#completion-message-encoding-1
  242. ReadHeaders(ref reader);
  243. string invocationId = reader.ReadString();
  244. byte resultKind = reader.ReadByte();
  245. switch (resultKind)
  246. {
  247. // 1 - Error result - Result contains a String with the error message
  248. case 1:
  249. string error = reader.ReadString();
  250. return new Message
  251. {
  252. type = MessageTypes.Completion,
  253. invocationId = invocationId,
  254. error = error
  255. };
  256. // 2 - Void result - Result is absent
  257. case 2:
  258. return new Message
  259. {
  260. type = MessageTypes.Completion,
  261. invocationId = invocationId
  262. };
  263. // 3 - Non-Void result - Result contains the value returned by the server
  264. case 3:
  265. object item = ReadItem(ref reader, invocationId);
  266. return new Message
  267. {
  268. type = MessageTypes.Completion,
  269. invocationId = invocationId,
  270. item = item,
  271. result = item
  272. };
  273. default:
  274. throw new NotImplementedException("Unknown resultKind: " + resultKind);
  275. }
  276. }
  277. private Message ReadStreamItem(ref MessagePackReader reader)
  278. {
  279. // https://github.com/aspnet/AspNetCore/blob/master/src/SignalR/docs/specs/HubProtocol.md#streamitem-message-encoding-1
  280. ReadHeaders(ref reader);
  281. string invocationId = reader.ReadString();
  282. object item = ReadItem(ref reader, invocationId);
  283. return new Message
  284. {
  285. type = MessageTypes.StreamItem,
  286. invocationId = invocationId,
  287. item = item
  288. };
  289. }
  290. private Message ReadInvocation(ref MessagePackReader reader)
  291. {
  292. // https://github.com/aspnet/AspNetCore/blob/master/src/SignalR/docs/specs/HubProtocol.md#invocation-message-encoding-1
  293. ReadHeaders(ref reader);
  294. string invocationId = reader.ReadString();
  295. string target = reader.ReadString();
  296. object[] arguments = ReadArguments(ref reader, target);
  297. string[] streamIds = ReadStreamIds(ref reader);
  298. return new Message
  299. {
  300. type = MessageTypes.Invocation,
  301. invocationId = invocationId,
  302. target = target,
  303. arguments = arguments,
  304. streamIds = streamIds
  305. };
  306. }
  307. private object ReadItem(ref MessagePackReader reader, string invocationId)
  308. {
  309. long longId = 0;
  310. if (long.TryParse(invocationId, out longId))
  311. {
  312. Type itemType = this.Connection.GetItemType(longId);
  313. return MessagePackSerializer.Deserialize(itemType, reader.ReadRaw());
  314. }
  315. else
  316. {
  317. reader.Skip();
  318. return null;
  319. }
  320. }
  321. private string[] ReadStreamIds(ref MessagePackReader reader)
  322. {
  323. var count = reader.ReadArrayHeader();
  324. string[] result = null;
  325. if (count > 0)
  326. {
  327. result = new string[count];
  328. for (int i = 0; i < count; i++)
  329. result[i] = reader.ReadString();
  330. }
  331. return result;
  332. }
  333. private object[] ReadArguments(ref MessagePackReader reader, string target)
  334. {
  335. var subscription = this.Connection.GetSubscription(target);
  336. object[] args = null;
  337. if (subscription == null || subscription.callbacks == null || subscription.callbacks.Count == 0)
  338. {
  339. reader.Skip();
  340. }
  341. else
  342. {
  343. int count = reader.ReadArrayHeader();
  344. if (subscription.callbacks[0].ParamTypes != null)
  345. {
  346. args = new object[subscription.callbacks[0].ParamTypes.Length];
  347. for (int i = 0; i < subscription.callbacks[0].ParamTypes.Length; ++i)
  348. args[i] = MessagePackSerializer.Deserialize(subscription.callbacks[0].ParamTypes[i], reader.ReadRaw());
  349. }
  350. else
  351. args = null;
  352. }
  353. return args;
  354. }
  355. private Dictionary<string, string> ReadHeaders(ref MessagePackReader reader)
  356. {
  357. int count = reader.ReadMapHeader();
  358. Dictionary<string, string> result = null;
  359. if (count > 0)
  360. {
  361. result = new Dictionary<string, string>(count);
  362. for (int i = 0; i < count; i++)
  363. {
  364. string key = reader.ReadString();
  365. string value = reader.ReadString();
  366. result.Add(key, value);
  367. }
  368. }
  369. return result;
  370. }
  371. public static byte GetRequiredBytesForLengthPrefix(int length)
  372. {
  373. byte bytes = 0;
  374. do
  375. {
  376. length >>= 7;
  377. bytes++;
  378. }
  379. while (length > 0);
  380. return bytes;
  381. }
  382. public static int WriteLengthAsVarInt(byte[] data, int offset, int length)
  383. {
  384. do
  385. {
  386. var current = data[offset];
  387. current = (byte)(length & 0x7f);
  388. length >>= 7;
  389. if (length > 0)
  390. {
  391. current |= 0x80;
  392. }
  393. data[offset++] = current;
  394. }
  395. while (length > 0);
  396. return offset;
  397. }
  398. public static uint ReadVarInt(byte[] data, ref int offset)
  399. {
  400. var length = 0U;
  401. var numBytes = 0;
  402. byte byteRead;
  403. do
  404. {
  405. byteRead = data[offset + numBytes];
  406. length = length | (((uint)(byteRead & 0x7f)) << (numBytes * 7));
  407. numBytes++;
  408. }
  409. while (offset + numBytes < data.Length && ((byteRead & 0x80) != 0));
  410. offset += numBytes;
  411. return length;
  412. }
  413. public object ConvertTo(Type toType, object obj)
  414. {
  415. if (obj == null)
  416. return null;
  417. #if NETFX_CORE
  418. TypeInfo typeInfo = toType.GetTypeInfo();
  419. #endif
  420. #if NETFX_CORE
  421. if (typeInfo.IsEnum)
  422. #else
  423. if (toType.IsEnum)
  424. #endif
  425. return Enum.Parse(toType, obj.ToString(), true);
  426. #if NETFX_CORE
  427. if (typeInfo.IsPrimitive)
  428. #else
  429. if (toType.IsPrimitive)
  430. #endif
  431. return Convert.ChangeType(obj, toType);
  432. if (toType == typeof(string))
  433. return obj.ToString();
  434. #if NETFX_CORE
  435. if (typeInfo.IsGenericType && toType.Name == "Nullable`1")
  436. return Convert.ChangeType(obj, toType.GenericTypeArguments[0]);
  437. #else
  438. if (toType.IsGenericType && toType.Name == "Nullable`1")
  439. return Convert.ChangeType(obj, toType.GetGenericArguments()[0]);
  440. #endif
  441. return obj;
  442. }
  443. public object[] GetRealArguments(Type[] argTypes, object[] arguments)
  444. {
  445. if (arguments == null || arguments.Length == 0)
  446. return null;
  447. if (argTypes.Length > arguments.Length)
  448. throw new Exception(string.Format("argType.Length({0}) < arguments.length({1})", argTypes.Length, arguments.Length));
  449. return arguments;
  450. }
  451. }
  452. }
  453. #endif