SocketIO.cs 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Diagnostics;
  4. using System.Net.Http;
  5. using System.Net.WebSockets;
  6. using System.Threading;
  7. using System.Threading.Tasks;
  8. using SocketIOClient.JsonSerializer;
  9. using SocketIOClient.Messages;
  10. using SocketIOClient.Transport;
  11. using SocketIOClient.UriConverters;
  12. namespace SocketIOClient
  13. {
  14. /// <summary>
  15. /// socket.io client class
  16. /// </summary>
  17. public class SocketIO : IDisposable
  18. {
  19. /// <summary>
  20. /// Create SocketIO object with default options
  21. /// </summary>
  22. /// <param name="uri"></param>
  23. public SocketIO(string uri) : this(new Uri(uri)) { }
  24. /// <summary>
  25. /// Create SocketIO object with options
  26. /// </summary>
  27. /// <param name="uri"></param>
  28. public SocketIO(Uri uri) : this(uri, new SocketIOOptions()) { }
  29. /// <summary>
  30. /// Create SocketIO object with options
  31. /// </summary>
  32. /// <param name="uri"></param>
  33. /// <param name="options"></param>
  34. public SocketIO(string uri, SocketIOOptions options) : this(new Uri(uri), options) { }
  35. /// <summary>
  36. /// Create SocketIO object with options
  37. /// </summary>
  38. /// <param name="uri"></param>
  39. /// <param name="options"></param>
  40. public SocketIO(Uri uri, SocketIOOptions options)
  41. {
  42. ServerUri = uri ?? throw new ArgumentNullException("uri");
  43. Options = options ?? throw new ArgumentNullException("options");
  44. Initialize();
  45. }
  46. Uri _serverUri;
  47. public Uri ServerUri
  48. {
  49. get => _serverUri;
  50. set
  51. {
  52. if (_serverUri != value)
  53. {
  54. _serverUri = value;
  55. if (value != null && value.AbsolutePath != "/")
  56. {
  57. Namespace = value.AbsolutePath;
  58. }
  59. }
  60. }
  61. }
  62. /// <summary>
  63. /// An unique identifier for the socket session. Set after the connect event is triggered, and updated after the reconnect event.
  64. /// </summary>
  65. public string Id { get; set; }
  66. public string Namespace { get; private set; }
  67. /// <summary>
  68. /// Whether or not the socket is connected to the server.
  69. /// </summary>
  70. public bool Connected { get; private set; }
  71. /// <summary>
  72. /// Gets current attempt of reconnection.
  73. /// </summary>
  74. public int Attempts { get; private set; }
  75. /// <summary>
  76. /// Whether or not the socket is disconnected from the server.
  77. /// </summary>
  78. public bool Disconnected => !Connected;
  79. public SocketIOOptions Options { get; }
  80. public IJsonSerializer JsonSerializer { get; set; }
  81. public IUriConverter UriConverter { get; set; }
  82. public HttpClient HttpClient { get; set; }
  83. public Func<IClientWebSocket> ClientWebSocketProvider { get; set; }
  84. private IClientWebSocket _clientWebsocket;
  85. public BaseTransport _transport;
  86. List<Type> _expectedExceptions;
  87. int _packetId;
  88. bool _isConnectCoreRunning;
  89. Uri _realServerUri;
  90. Exception _connectCoreException;
  91. Dictionary<int, Action<SocketIOResponse>> _ackHandlers;
  92. List<OnAnyHandler> _onAnyHandlers;
  93. Dictionary<string, Action<SocketIOResponse>> _eventHandlers;
  94. CancellationTokenSource _connectionTokenSource;
  95. double _reconnectionDelay;
  96. #region Socket.IO event
  97. public event EventHandler OnConnected;
  98. //public event EventHandler<string> OnConnectError;
  99. //public event EventHandler<string> OnConnectTimeout;
  100. public event EventHandler<string> OnError;
  101. public event EventHandler<string> OnDisconnected;
  102. /// <summary>
  103. /// Fired upon a successful reconnection.
  104. /// </summary>
  105. public event EventHandler<int> OnReconnected;
  106. /// <summary>
  107. /// Fired upon an attempt to reconnect.
  108. /// </summary>
  109. public event EventHandler<int> OnReconnectAttempt;
  110. /// <summary>
  111. /// Fired upon a reconnection attempt error.
  112. /// </summary>
  113. public event EventHandler<Exception> OnReconnectError;
  114. /// <summary>
  115. /// Fired when couldn’t reconnect within reconnectionAttempts
  116. /// </summary>
  117. public event EventHandler OnReconnectFailed;
  118. public event EventHandler OnPing;
  119. public event EventHandler<TimeSpan> OnPong;
  120. #endregion
  121. #region Observable Event
  122. //Subject<Unit> _onConnected;
  123. //public IObservable<Unit> ConnectedObservable { get; private set; }
  124. #endregion
  125. private void Initialize()
  126. {
  127. _packetId = -1;
  128. _ackHandlers = new Dictionary<int, Action<SocketIOResponse>>();
  129. _eventHandlers = new Dictionary<string, Action<SocketIOResponse>>();
  130. _onAnyHandlers = new List<OnAnyHandler>();
  131. JsonSerializer = new SystemTextJsonSerializer();
  132. UriConverter = new UriConverter();
  133. HttpClient = new HttpClient();
  134. ClientWebSocketProvider = () => new SystemNetWebSocketsClientWebSocket(Options.EIO);
  135. _expectedExceptions = new List<Type>
  136. {
  137. typeof(TimeoutException),
  138. typeof(WebSocketException),
  139. typeof(HttpRequestException),
  140. typeof(OperationCanceledException),
  141. typeof(TaskCanceledException)
  142. };
  143. }
  144. private async Task CreateTransportAsync()
  145. {
  146. Options.Transport = await GetProtocolAsync();
  147. if (Options.Transport == TransportProtocol.Polling)
  148. {
  149. HttpPollingHandler handler;
  150. if (Options.EIO == 3)
  151. handler = new Eio3HttpPollingHandler(HttpClient);
  152. else
  153. handler = new Eio4HttpPollingHandler(HttpClient);
  154. _transport = new HttpTransport(HttpClient, handler, Options, JsonSerializer);
  155. }
  156. else
  157. {
  158. _clientWebsocket = ClientWebSocketProvider();
  159. _transport = new WebSocketTransport(_clientWebsocket, Options, JsonSerializer);
  160. }
  161. _transport.Namespace = Namespace;
  162. SetHeaders();
  163. }
  164. private void SetHeaders()
  165. {
  166. if (Options.ExtraHeaders != null)
  167. {
  168. foreach (var item in Options.ExtraHeaders)
  169. {
  170. _transport.AddHeader(item.Key, item.Value);
  171. }
  172. }
  173. }
  174. private void SyncExceptionToMain(Exception e)
  175. {
  176. _connectCoreException = e;
  177. _isConnectCoreRunning = false;
  178. }
  179. private void ConnectCore()
  180. {
  181. DisposeForReconnect();
  182. _reconnectionDelay = Options.ReconnectionDelay;
  183. _connectionTokenSource = new CancellationTokenSource();
  184. var cct = _connectionTokenSource.Token;
  185. _isConnectCoreRunning = true;
  186. _connectCoreException = null;
  187. Task.Factory.StartNew(async () =>
  188. {
  189. while (true)
  190. {
  191. _clientWebsocket?.Dispose();
  192. _transport?.Dispose();
  193. CreateTransportAsync().Wait();
  194. _realServerUri = UriConverter.GetServerUri(Options.Transport == TransportProtocol.WebSocket, ServerUri, Options.EIO, Options.Path, Options.Query);
  195. try
  196. {
  197. if (cct.IsCancellationRequested)
  198. break;
  199. if (Attempts > 0)
  200. OnReconnectAttempt?.Invoke(this, Attempts);
  201. var timeoutCts = new CancellationTokenSource(Options.ConnectionTimeout);
  202. _transport.Subscribe(OnMessageReceived, OnErrorReceived);
  203. await _transport.ConnectAsync(_realServerUri, timeoutCts.Token).ConfigureAwait(false);
  204. break;
  205. }
  206. catch (Exception e)
  207. {
  208. if (_expectedExceptions.Contains(e.GetType()))
  209. {
  210. if (!Options.Reconnection)
  211. {
  212. SyncExceptionToMain(e);
  213. throw;
  214. }
  215. if (Attempts > 0)
  216. {
  217. OnReconnectError?.Invoke(this, e);
  218. }
  219. Attempts++;
  220. if (Attempts <= Options.ReconnectionAttempts)
  221. {
  222. if (_reconnectionDelay < Options.ReconnectionDelayMax)
  223. {
  224. _reconnectionDelay += 2 * Options.RandomizationFactor;
  225. }
  226. if (_reconnectionDelay > Options.ReconnectionDelayMax)
  227. {
  228. _reconnectionDelay = Options.ReconnectionDelayMax;
  229. }
  230. await Task.Delay((int)_reconnectionDelay).ConfigureAwait(false);
  231. }
  232. else
  233. {
  234. OnReconnectFailed?.Invoke(this, EventArgs.Empty);
  235. break;
  236. }
  237. }
  238. else
  239. {
  240. SyncExceptionToMain(e);
  241. throw;
  242. }
  243. }
  244. }
  245. _isConnectCoreRunning = false;
  246. });
  247. }
  248. private async Task<TransportProtocol> GetProtocolAsync()
  249. {
  250. if (Options.Transport == TransportProtocol.Polling && Options.AutoUpgrade)
  251. {
  252. Uri uri = UriConverter.GetServerUri(false, ServerUri, Options.EIO, Options.Path, Options.Query);
  253. try
  254. {
  255. string text = await HttpClient.GetStringAsync(uri);
  256. if (text.Contains("websocket"))
  257. {
  258. return TransportProtocol.WebSocket;
  259. }
  260. }
  261. catch { }
  262. }
  263. return Options.Transport;
  264. }
  265. public async Task ConnectAsync()
  266. {
  267. ConnectCore();
  268. while (_isConnectCoreRunning)
  269. {
  270. await Task.Delay(20);
  271. }
  272. if (_connectCoreException != null)
  273. {
  274. throw _connectCoreException;
  275. }
  276. }
  277. private void PingHandler()
  278. {
  279. OnPing?.Invoke(this, EventArgs.Empty);
  280. }
  281. private void PongHandler(PongMessage msg)
  282. {
  283. OnPong?.Invoke(this, msg.Duration);
  284. }
  285. private void ConnectedHandler(ConnectedMessage msg)
  286. {
  287. Id = msg.Sid;
  288. Connected = true;
  289. OnConnected?.Invoke(this, EventArgs.Empty);
  290. if (Attempts > 0)
  291. {
  292. OnReconnected?.Invoke(this, Attempts);
  293. }
  294. Attempts = 0;
  295. }
  296. private void DisconnectedHandler()
  297. {
  298. InvokeDisconnect(DisconnectReason.IOServerDisconnect);
  299. }
  300. private void EventMessageHandler(EventMessage m)
  301. {
  302. var res = new SocketIOResponse(m.JsonElements, this)
  303. {
  304. PacketId = m.Id
  305. };
  306. foreach (var item in _onAnyHandlers)
  307. {
  308. try
  309. {
  310. item(m.Event, res);
  311. }
  312. catch (Exception e)
  313. {
  314. Debug.WriteLine(e);
  315. }
  316. }
  317. if (_eventHandlers.ContainsKey(m.Event))
  318. {
  319. try
  320. {
  321. _eventHandlers[m.Event](res);
  322. }
  323. catch (Exception e)
  324. {
  325. Debug.WriteLine(e);
  326. }
  327. }
  328. }
  329. private void AckMessageHandler(ClientAckMessage m)
  330. {
  331. if (_ackHandlers.ContainsKey(m.Id))
  332. {
  333. var res = new SocketIOResponse(m.JsonElements, this);
  334. try
  335. {
  336. _ackHandlers[m.Id](res);
  337. }
  338. finally
  339. {
  340. _ackHandlers.Remove(m.Id);
  341. }
  342. }
  343. }
  344. private void ErrorMessageHandler(ErrorMessage msg)
  345. {
  346. OnError?.Invoke(this, msg.Message);
  347. }
  348. private void BinaryMessageHandler(BinaryMessage msg)
  349. {
  350. if (_eventHandlers.ContainsKey(msg.Event))
  351. {
  352. try
  353. {
  354. var response = new SocketIOResponse(msg.JsonElements, this)
  355. {
  356. PacketId = msg.Id
  357. };
  358. response.InComingBytes.AddRange(msg.IncomingBytes);
  359. _eventHandlers[msg.Event](response);
  360. }
  361. catch (Exception e)
  362. {
  363. Debug.WriteLine(e);
  364. }
  365. }
  366. }
  367. private void BinaryAckMessageHandler(ClientBinaryAckMessage msg)
  368. {
  369. if (_ackHandlers.ContainsKey(msg.Id))
  370. {
  371. try
  372. {
  373. var response = new SocketIOResponse(msg.JsonElements, this)
  374. {
  375. PacketId = msg.Id,
  376. };
  377. response.InComingBytes.AddRange(msg.IncomingBytes);
  378. _ackHandlers[msg.Id](response);
  379. }
  380. catch (Exception e)
  381. {
  382. Debug.WriteLine(e);
  383. }
  384. }
  385. }
  386. private void OnErrorReceived(Exception ex)
  387. {
  388. InvokeDisconnect(DisconnectReason.TransportClose);
  389. }
  390. private void OnMessageReceived(IMessage msg)
  391. {
  392. try
  393. {
  394. switch (msg.Type)
  395. {
  396. case MessageType.Ping:
  397. PingHandler();
  398. break;
  399. case MessageType.Pong:
  400. PongHandler(msg as PongMessage);
  401. break;
  402. case MessageType.Connected:
  403. ConnectedHandler(msg as ConnectedMessage);
  404. break;
  405. case MessageType.Disconnected:
  406. DisconnectedHandler();
  407. break;
  408. case MessageType.EventMessage:
  409. EventMessageHandler(msg as EventMessage);
  410. break;
  411. case MessageType.AckMessage:
  412. AckMessageHandler(msg as ClientAckMessage);
  413. break;
  414. case MessageType.ErrorMessage:
  415. ErrorMessageHandler(msg as ErrorMessage);
  416. break;
  417. case MessageType.BinaryMessage:
  418. BinaryMessageHandler(msg as BinaryMessage);
  419. break;
  420. case MessageType.BinaryAckMessage:
  421. BinaryAckMessageHandler(msg as ClientBinaryAckMessage);
  422. break;
  423. }
  424. }
  425. catch (Exception e)
  426. {
  427. Debug.WriteLine(e);
  428. }
  429. }
  430. public async Task DisconnectAsync()
  431. {
  432. if (Connected)
  433. {
  434. var msg = new DisconnectedMessage
  435. {
  436. Namespace = Namespace
  437. };
  438. try
  439. {
  440. await _transport.SendAsync(msg, CancellationToken.None).ConfigureAwait(false);
  441. }
  442. catch (Exception e)
  443. {
  444. Debug.WriteLine(e);
  445. }
  446. InvokeDisconnect(DisconnectReason.IOClientDisconnect);
  447. }
  448. }
  449. /// <summary>
  450. /// Register a new handler for the given event.
  451. /// </summary>
  452. /// <param name="eventName"></param>
  453. /// <param name="callback"></param>
  454. public void On(string eventName, Action<SocketIOResponse> callback)
  455. {
  456. if (_eventHandlers.ContainsKey(eventName))
  457. {
  458. _eventHandlers.Remove(eventName);
  459. }
  460. _eventHandlers.Add(eventName, callback);
  461. }
  462. /// <summary>
  463. /// Unregister a new handler for the given event.
  464. /// </summary>
  465. /// <param name="eventName"></param>
  466. public void Off(string eventName)
  467. {
  468. if (_eventHandlers.ContainsKey(eventName))
  469. {
  470. _eventHandlers.Remove(eventName);
  471. }
  472. }
  473. public void OnAny(OnAnyHandler handler)
  474. {
  475. if (handler != null)
  476. {
  477. _onAnyHandlers.Add(handler);
  478. }
  479. }
  480. public void PrependAny(OnAnyHandler handler)
  481. {
  482. if (handler != null)
  483. {
  484. _onAnyHandlers.Insert(0, handler);
  485. }
  486. }
  487. public void OffAny(OnAnyHandler handler)
  488. {
  489. if (handler != null)
  490. {
  491. _onAnyHandlers.Remove(handler);
  492. }
  493. }
  494. public OnAnyHandler[] ListenersAny() => _onAnyHandlers.ToArray();
  495. internal async Task ClientAckAsync(int packetId, CancellationToken cancellationToken, params object[] data)
  496. {
  497. IMessage msg;
  498. if (data != null && data.Length > 0)
  499. {
  500. var result = JsonSerializer.Serialize(data);
  501. if (result.Bytes.Count > 0)
  502. {
  503. msg = new ServerBinaryAckMessage
  504. {
  505. Id = packetId,
  506. Namespace = Namespace,
  507. Json = result.Json
  508. };
  509. msg.OutgoingBytes = new List<byte[]>(result.Bytes);
  510. }
  511. else
  512. {
  513. msg = new ServerAckMessage
  514. {
  515. Namespace = Namespace,
  516. Id = packetId,
  517. Json = result.Json
  518. };
  519. }
  520. }
  521. else
  522. {
  523. msg = new ServerAckMessage
  524. {
  525. Namespace = Namespace,
  526. Id = packetId
  527. };
  528. }
  529. await _transport.SendAsync(msg, cancellationToken).ConfigureAwait(false);
  530. }
  531. /// <summary>
  532. /// Emits an event to the socket
  533. /// </summary>
  534. /// <param name="eventName"></param>
  535. /// <param name="data">Any other parameters can be included. All serializable datastructures are supported, including byte[]</param>
  536. /// <returns></returns>
  537. public async Task EmitAsync(string eventName, params object[] data)
  538. {
  539. await EmitAsync(eventName, CancellationToken.None, data).ConfigureAwait(false);
  540. }
  541. public async Task EmitAsync(string eventName, CancellationToken cancellationToken, params object[] data)
  542. {
  543. if (data != null && data.Length > 0)
  544. {
  545. var result = JsonSerializer.Serialize(data);
  546. if (result.Bytes.Count > 0)
  547. {
  548. var msg = new BinaryMessage
  549. {
  550. Namespace = Namespace,
  551. OutgoingBytes = new List<byte[]>(result.Bytes),
  552. Event = eventName,
  553. Json = result.Json
  554. };
  555. await _transport.SendAsync(msg, cancellationToken).ConfigureAwait(false);
  556. }
  557. else
  558. {
  559. var msg = new EventMessage
  560. {
  561. Namespace = Namespace,
  562. Event = eventName,
  563. Json = result.Json
  564. };
  565. await _transport.SendAsync(msg, cancellationToken).ConfigureAwait(false);
  566. }
  567. }
  568. else
  569. {
  570. var msg = new EventMessage
  571. {
  572. Namespace = Namespace,
  573. Event = eventName
  574. };
  575. await _transport.SendAsync(msg, cancellationToken).ConfigureAwait(false);
  576. }
  577. }
  578. /// <summary>
  579. /// Emits an event to the socket
  580. /// </summary>
  581. /// <param name="eventName"></param>
  582. /// <param name="ack">will be called with the server answer.</param>
  583. /// <param name="data">Any other parameters can be included. All serializable datastructures are supported, including byte[]</param>
  584. /// <returns></returns>
  585. public async Task EmitAsync(string eventName, Action<SocketIOResponse> ack, params object[] data)
  586. {
  587. await EmitAsync(eventName, CancellationToken.None, ack, data).ConfigureAwait(false);
  588. }
  589. public async Task EmitAsync(string eventName, CancellationToken cancellationToken, Action<SocketIOResponse> ack, params object[] data)
  590. {
  591. _ackHandlers.Add(++_packetId, ack);
  592. if (data != null && data.Length > 0)
  593. {
  594. var result = JsonSerializer.Serialize(data);
  595. if (result.Bytes.Count > 0)
  596. {
  597. var msg = new ClientBinaryAckMessage
  598. {
  599. Event = eventName,
  600. Namespace = Namespace,
  601. Json = result.Json,
  602. Id = _packetId,
  603. OutgoingBytes = new List<byte[]>(result.Bytes)
  604. };
  605. await _transport.SendAsync(msg, cancellationToken).ConfigureAwait(false);
  606. }
  607. else
  608. {
  609. var msg = new ClientAckMessage
  610. {
  611. Event = eventName,
  612. Namespace = Namespace,
  613. Id = _packetId,
  614. Json = result.Json
  615. };
  616. await _transport.SendAsync(msg, cancellationToken).ConfigureAwait(false);
  617. }
  618. }
  619. else
  620. {
  621. var msg = new ClientAckMessage
  622. {
  623. Event = eventName,
  624. Namespace = Namespace,
  625. Id = _packetId
  626. };
  627. await _transport.SendAsync(msg, cancellationToken).ConfigureAwait(false);
  628. }
  629. }
  630. private async void InvokeDisconnect(string reason)
  631. {
  632. if (Connected)
  633. {
  634. Connected = false;
  635. OnDisconnected?.Invoke(this, reason);
  636. try
  637. {
  638. await _transport.DisconnectAsync(CancellationToken.None).ConfigureAwait(false);
  639. }
  640. catch { }
  641. if (reason != DisconnectReason.IOServerDisconnect && reason != DisconnectReason.IOClientDisconnect)
  642. {
  643. //In the this cases (explicit disconnection), the client will not try to reconnect and you need to manually call socket.connect().
  644. if (Options.Reconnection)
  645. {
  646. ConnectCore();
  647. }
  648. }
  649. }
  650. }
  651. public void AddExpectedException(Type type)
  652. {
  653. if (!_expectedExceptions.Contains(type))
  654. {
  655. _expectedExceptions.Add(type);
  656. }
  657. }
  658. private void DisposeForReconnect()
  659. {
  660. _packetId = -1;
  661. _ackHandlers.Clear();
  662. if (_connectionTokenSource != null)
  663. {
  664. _connectionTokenSource.Cancel();
  665. _connectionTokenSource.Dispose();
  666. }
  667. }
  668. public void Dispose()
  669. {
  670. HttpClient.Dispose();
  671. _transport.Dispose();
  672. _ackHandlers.Clear();
  673. _onAnyHandlers.Clear();
  674. _eventHandlers.Clear();
  675. _connectionTokenSource.Cancel();
  676. _connectionTokenSource.Dispose();
  677. }
  678. }
  679. }