WebSocketTransport.cs 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990
  1. using System;
  2. using System.Reactive.Linq;
  3. using System.Text;
  4. using System.Threading;
  5. using System.Threading.Tasks;
  6. using SocketIOClient.JsonSerializer;
  7. namespace SocketIOClient.Transport
  8. {
  9. public class WebSocketTransport : BaseTransport
  10. {
  11. public WebSocketTransport(IClientWebSocket ws, SocketIOOptions options, IJsonSerializer jsonSerializer) : base(options, jsonSerializer)
  12. {
  13. _ws = ws;
  14. _sendLock = new SemaphoreSlim(1, 1);
  15. _ws.TextObservable.Subscribe(this);
  16. _ws.BytesObservable.Subscribe(this);
  17. }
  18. const int ReceiveChunkSize = 1024 * 8;
  19. const int SendChunkSize = 1024 * 8;
  20. readonly IClientWebSocket _ws;
  21. readonly SemaphoreSlim _sendLock;
  22. private async Task SendAsync(TransportMessageType type, byte[] bytes, CancellationToken cancellationToken)
  23. {
  24. try
  25. {
  26. await _sendLock.WaitAsync().ConfigureAwait(false);
  27. if (type == TransportMessageType.Binary && Options.EIO == 3)
  28. {
  29. byte[] buffer = new byte[bytes.Length + 1];
  30. buffer[0] = 4;
  31. Buffer.BlockCopy(bytes, 0, buffer, 1, bytes.Length);
  32. bytes = buffer;
  33. }
  34. int pages = (int)Math.Ceiling(bytes.Length * 1.0 / SendChunkSize);
  35. for (int i = 0; i < pages; i++)
  36. {
  37. int offset = i * SendChunkSize;
  38. int length = SendChunkSize;
  39. if (offset + length > bytes.Length)
  40. {
  41. length = bytes.Length - offset;
  42. }
  43. byte[] subBuffer = new byte[length];
  44. Buffer.BlockCopy(bytes, offset, subBuffer, 0, subBuffer.Length);
  45. bool endOfMessage = pages - 1 == i;
  46. await _ws.SendAsync(subBuffer, type, endOfMessage, cancellationToken).ConfigureAwait(false);
  47. }
  48. }
  49. finally
  50. {
  51. _sendLock.Release();
  52. }
  53. }
  54. public override async Task ConnectAsync(Uri uri, CancellationToken cancellationToken)
  55. {
  56. await _ws.ConnectAsync(uri, cancellationToken);
  57. }
  58. public override async Task DisconnectAsync(CancellationToken cancellationToken)
  59. {
  60. await _ws.DisconnectAsync(cancellationToken);
  61. }
  62. public override async Task SendAsync(Payload payload, CancellationToken cancellationToken)
  63. {
  64. byte[] bytes = Encoding.UTF8.GetBytes(payload.Text);
  65. await SendAsync(TransportMessageType.Text, bytes, cancellationToken);
  66. if (payload.Bytes != null)
  67. {
  68. foreach (var item in payload.Bytes)
  69. {
  70. await SendAsync(TransportMessageType.Binary, item, cancellationToken);
  71. }
  72. }
  73. }
  74. public override void AddHeader(string key, string val) => _ws.AddHeader(key, val);
  75. public override void Dispose()
  76. {
  77. base.Dispose();
  78. _sendLock.Dispose();
  79. }
  80. }
  81. }