BaseTransport.cs 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Diagnostics;
  4. using System.Reactive.Subjects;
  5. using System.Threading;
  6. using System.Threading.Tasks;
  7. using SocketIOClient.JsonSerializer;
  8. using SocketIOClient.Messages;
  9. using SocketIOClient.UriConverters;
  10. namespace SocketIOClient.Transport
  11. {
  12. public abstract class BaseTransport : IObserver<string>, IObserver<byte[]>, IObservable<IMessage>, IDisposable
  13. {
  14. public BaseTransport(SocketIOOptions options, IJsonSerializer jsonSerializer)
  15. {
  16. Options = options;
  17. MessageSubject = new Subject<IMessage>();
  18. JsonSerializer = jsonSerializer;
  19. UriConverter = new UriConverter();
  20. _messageQueue = new Queue<IMessage>();
  21. }
  22. DateTime _pingTime;
  23. readonly Queue<IMessage> _messageQueue;
  24. protected SocketIOOptions Options { get; }
  25. protected Subject<IMessage> MessageSubject { get; }
  26. protected IJsonSerializer JsonSerializer { get; }
  27. protected CancellationTokenSource PingTokenSource { get; private set; }
  28. protected OpenedMessage OpenedMessage { get; private set; }
  29. public string Namespace { get; set; }
  30. public IUriConverter UriConverter { get; set; }
  31. public async Task SendAsync(IMessage msg, CancellationToken cancellationToken)
  32. {
  33. msg.Eio = Options.EIO;
  34. msg.Protocol = Options.Transport;
  35. var payload = new Payload
  36. {
  37. Text = msg.Write()
  38. };
  39. if (msg.OutgoingBytes != null)
  40. {
  41. payload.Bytes = msg.OutgoingBytes;
  42. }
  43. await SendAsync(payload, cancellationToken).ConfigureAwait(false);
  44. }
  45. protected virtual async Task OpenAsync(OpenedMessage msg)
  46. {
  47. OpenedMessage = msg;
  48. if (Options.EIO == 3 && string.IsNullOrEmpty(Namespace))
  49. {
  50. return;
  51. }
  52. var connectMsg = new ConnectedMessage
  53. {
  54. Namespace = Namespace,
  55. Eio = Options.EIO,
  56. Query = Options.Query,
  57. };
  58. if (Options.EIO == 4)
  59. {
  60. if (Options.Auth != null)
  61. {
  62. connectMsg.AuthJsonStr = JsonSerializer.Serialize(new[] { Options.Auth }).Json.TrimStart('[').TrimEnd(']');
  63. }
  64. }
  65. for (int i = 1; i <= 3; i++)
  66. {
  67. try
  68. {
  69. await SendAsync(connectMsg, CancellationToken.None).ConfigureAwait(false);
  70. break;
  71. }
  72. catch (Exception e)
  73. {
  74. if (i == 3)
  75. OnError(e);
  76. else
  77. await Task.Delay(TimeSpan.FromMilliseconds(Math.Pow(2, i) * 100));
  78. }
  79. }
  80. }
  81. /// <summary>
  82. /// <para>Eio3 ping is sent by the client</para>
  83. /// <para>Eio4 ping is sent by the server</para>
  84. /// </summary>
  85. /// <param name="cancellationToken"></param>
  86. private void StartPing(CancellationToken cancellationToken)
  87. {
  88. Debug.WriteLine($"[Ping] Interval: {OpenedMessage.PingInterval}");
  89. Task.Factory.StartNew(async () =>
  90. {
  91. while (!cancellationToken.IsCancellationRequested)
  92. {
  93. await Task.Delay(OpenedMessage.PingInterval);
  94. if (cancellationToken.IsCancellationRequested)
  95. {
  96. break;
  97. }
  98. try
  99. {
  100. var ping = new PingMessage();
  101. Debug.WriteLine($"[Ping] Sending");
  102. await SendAsync(ping, CancellationToken.None).ConfigureAwait(false);
  103. Debug.WriteLine($"[Ping] Has been sent");
  104. _pingTime = DateTime.Now;
  105. MessageSubject.OnNext(ping);
  106. }
  107. catch (Exception e)
  108. {
  109. Debug.WriteLine($"[Ping] Failed to send, {e.Message}");
  110. MessageSubject.OnError(e);
  111. break;
  112. }
  113. }
  114. }, TaskCreationOptions.LongRunning);
  115. }
  116. public abstract Task ConnectAsync(Uri uri, CancellationToken cancellationToken);
  117. public abstract Task DisconnectAsync(CancellationToken cancellationToken);
  118. public abstract void AddHeader(string key, string val);
  119. public virtual void Dispose()
  120. {
  121. MessageSubject.Dispose();
  122. _messageQueue.Clear();
  123. if (PingTokenSource != null)
  124. {
  125. PingTokenSource.Cancel();
  126. PingTokenSource.Dispose();
  127. }
  128. }
  129. public abstract Task SendAsync(Payload payload, CancellationToken cancellationToken);
  130. public void OnCompleted()
  131. {
  132. throw new NotImplementedException();
  133. }
  134. public void OnError(Exception error)
  135. {
  136. MessageSubject.OnError(error);
  137. }
  138. public void OnNext(string text)
  139. {
  140. Debug.WriteLine($"[Receive] {text}");
  141. var msg = MessageFactory.CreateMessage(Options.EIO, text);
  142. if (msg == null)
  143. {
  144. return;
  145. }
  146. if (msg.BinaryCount > 0)
  147. {
  148. msg.IncomingBytes = new List<byte[]>(msg.BinaryCount);
  149. _messageQueue.Enqueue(msg);
  150. return;
  151. }
  152. if (msg.Type == MessageType.Opened)
  153. {
  154. OpenAsync(msg as OpenedMessage).ConfigureAwait(false);
  155. }
  156. if (Options.EIO == 3)
  157. {
  158. if (msg.Type == MessageType.Connected)
  159. {
  160. var connectMsg = msg as ConnectedMessage;
  161. connectMsg.Sid = OpenedMessage.Sid;
  162. if ((string.IsNullOrEmpty(Namespace) && string.IsNullOrEmpty(connectMsg.Namespace)) || connectMsg.Namespace == Namespace)
  163. {
  164. if (PingTokenSource != null)
  165. {
  166. PingTokenSource.Cancel();
  167. }
  168. PingTokenSource = new CancellationTokenSource();
  169. StartPing(PingTokenSource.Token);
  170. }
  171. else
  172. {
  173. return;
  174. }
  175. }
  176. else if (msg.Type == MessageType.Pong)
  177. {
  178. var pong = msg as PongMessage;
  179. pong.Duration = DateTime.Now - _pingTime;
  180. }
  181. }
  182. MessageSubject.OnNext(msg);
  183. if (msg.Type == MessageType.Ping)
  184. {
  185. _pingTime = DateTime.Now;
  186. try
  187. {
  188. SendAsync(new PongMessage(), CancellationToken.None).ConfigureAwait(false);
  189. MessageSubject.OnNext(new PongMessage
  190. {
  191. Eio = Options.EIO,
  192. Protocol = Options.Transport,
  193. Duration = DateTime.Now - _pingTime
  194. });
  195. }
  196. catch (Exception e)
  197. {
  198. OnError(e);
  199. }
  200. }
  201. }
  202. public void OnNext(byte[] bytes)
  203. {
  204. Debug.WriteLine($"[Receive] binary message");
  205. if (_messageQueue.Count > 0)
  206. {
  207. var msg = _messageQueue.Peek();
  208. msg.IncomingBytes.Add(bytes);
  209. if (msg.IncomingBytes.Count == msg.BinaryCount)
  210. {
  211. MessageSubject.OnNext(msg);
  212. _messageQueue.Dequeue();
  213. }
  214. }
  215. }
  216. public IDisposable Subscribe(IObserver<IMessage> observer)
  217. {
  218. return MessageSubject.Subscribe(observer);
  219. }
  220. }
  221. }