SocketManager.cs 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734
  1. #if !BESTHTTP_DISABLE_SOCKETIO
  2. using System;
  3. using System.Collections.Generic;
  4. using BestHTTP.SocketIO3.Transports;
  5. using BestHTTP.Extensions;
  6. using BestHTTP.SocketIO3.Parsers;
  7. using BestHTTP.SocketIO3.Events;
  8. using BestHTTP.Logger;
  9. namespace BestHTTP.SocketIO3
  10. {
  11. public sealed class SocketManager : IHeartbeat, IManager
  12. {
  13. /// <summary>
  14. /// Possible states of a SocketManager instance.
  15. /// </summary>
  16. public enum States
  17. {
  18. /// <summary>
  19. /// Initial state of the SocketManager
  20. /// </summary>
  21. Initial,
  22. /// <summary>
  23. /// The SocketManager is currently opening.
  24. /// </summary>
  25. Opening,
  26. /// <summary>
  27. /// The SocketManager is open, events can be sent to the server.
  28. /// </summary>
  29. Open,
  30. /// <summary>
  31. /// Paused for transport upgrade
  32. /// </summary>
  33. Paused,
  34. /// <summary>
  35. /// An error occurred, the SocketManager now trying to connect again to the server.
  36. /// </summary>
  37. Reconnecting,
  38. /// <summary>
  39. /// The SocketManager is closed, initiated by the user or by the server
  40. /// </summary>
  41. Closed
  42. }
  43. /// <summary>
  44. /// Supported Socket.IO protocol version
  45. /// </summary>
  46. public int ProtocolVersion { get { return 4; } }
  47. #region Public Properties
  48. /// <summary>
  49. /// The current state of this Socket.IO manager.
  50. /// </summary>
  51. public States State { get { return state; } private set { PreviousState = state; state = value; } }
  52. private States state;
  53. /// <summary>
  54. /// The SocketOptions instance that this manager will use.
  55. /// </summary>
  56. public SocketOptions Options { get; private set; }
  57. /// <summary>
  58. /// The Uri to the Socket.IO endpoint.
  59. /// </summary>
  60. public Uri Uri { get; private set; }
  61. /// <summary>
  62. /// The server sent and parsed Handshake data.
  63. /// </summary>
  64. public HandshakeData Handshake { get; private set; }
  65. /// <summary>
  66. /// The currently used main transport instance.
  67. /// </summary>
  68. public ITransport Transport { get; private set; }
  69. /// <summary>
  70. /// The Request counter for request-based transports.
  71. /// </summary>
  72. public ulong RequestCounter { get; internal set; }
  73. /// <summary>
  74. /// The root("/") Socket.
  75. /// </summary>
  76. public Socket Socket { get { return GetSocket(); } }
  77. /// <summary>
  78. /// Indexer to access socket associated to the given namespace.
  79. /// </summary>
  80. public Socket this[string nsp] { get { return GetSocket(nsp); } }
  81. /// <summary>
  82. /// How many reconnect attempts made.
  83. /// </summary>
  84. public int ReconnectAttempts { get; private set; }
  85. /// <summary>
  86. /// Parser to encode and decode messages and create strongly typed objects.
  87. /// </summary>
  88. public IParser Parser { get; set; }
  89. /// <summary>
  90. /// Logging context of this socket.io connection.
  91. /// </summary>
  92. public LoggingContext Context { get; private set; }
  93. #endregion
  94. #region Internal Properties
  95. /// <summary>
  96. /// Timestamp support to the request based transports.
  97. /// </summary>
  98. internal UInt64 Timestamp { get { return (UInt64)(DateTime.UtcNow.Subtract(new DateTime(1970, 1, 1))).TotalMilliseconds; } }
  99. /// <summary>
  100. /// Auto-incrementing property to return Ack ids.
  101. /// </summary>
  102. internal int NextAckId { get { return System.Threading.Interlocked.Increment(ref nextAckId); } }
  103. private int nextAckId;
  104. /// <summary>
  105. /// Internal property to store the previous state of the manager.
  106. /// </summary>
  107. internal States PreviousState { get; private set; }
  108. /// <summary>
  109. /// Transport currently upgrading.
  110. /// </summary>
  111. internal ITransport UpgradingTransport { get; set; }
  112. #endregion
  113. #region Privates
  114. /// <summary>
  115. /// Namespace name -> Socket mapping
  116. /// </summary>
  117. private Dictionary<string, Socket> Namespaces = new Dictionary<string, Socket>();
  118. /// <summary>
  119. /// List of the sockets to able to iterate over them easily.
  120. /// </summary>
  121. private List<Socket> Sockets = new List<Socket>();
  122. /// <summary>
  123. /// List of unsent packets. Only instantiated when we have to use it.
  124. /// </summary>
  125. private List<OutgoingPacket> OfflinePackets;
  126. /// <summary>
  127. /// When we sent out the last heartbeat(Ping) message.
  128. /// </summary>
  129. private DateTime LastHeartbeat = DateTime.MinValue;
  130. /// <summary>
  131. /// When we have to try to do a reconnect attempt
  132. /// </summary>
  133. private DateTime ReconnectAt;
  134. /// <summary>
  135. /// When we started to connect to the server.
  136. /// </summary>
  137. private DateTime ConnectionStarted;
  138. /// <summary>
  139. /// Private flag to avoid multiple Close call
  140. /// </summary>
  141. private bool closing;
  142. /// <summary>
  143. /// In Engine.io v4 / socket.io v3 the server sends the ping messages, not the client.
  144. /// </summary>
  145. private DateTime lastPingReceived;
  146. #endregion
  147. #region Constructors
  148. /// <summary>
  149. /// Constructor to create a SocketManager instance that will connect to the given uri.
  150. /// </summary>
  151. public SocketManager(Uri uri)
  152. :this(uri, new DefaultJsonParser(), new SocketOptions())
  153. { }
  154. public SocketManager(Uri uri, IParser parser)
  155. : this(uri, parser, new SocketOptions())
  156. { }
  157. public SocketManager(Uri uri, SocketOptions options)
  158. :this(uri, new DefaultJsonParser(), options)
  159. { }
  160. /// <summary>
  161. /// Constructor to create a SocketManager instance.
  162. /// </summary>
  163. public SocketManager(Uri uri, IParser parser, SocketOptions options)
  164. {
  165. this.Context = new LoggingContext(this);
  166. if (uri.Scheme.StartsWith("ws"))
  167. options.ConnectWith = TransportTypes.WebSocket;
  168. string path = uri.PathAndQuery;
  169. if (path.Length <= 1)
  170. {
  171. string append;
  172. if (uri.OriginalString[uri.OriginalString.Length - 1] == '/')
  173. append = "socket.io/";
  174. else
  175. append = "/socket.io/";
  176. uri = new Uri(uri.OriginalString + append);
  177. }
  178. this.Uri = uri;
  179. this.Options = options ?? new SocketOptions();
  180. this.State = States.Initial;
  181. this.PreviousState = States.Initial;
  182. this.Parser = parser ?? new DefaultJsonParser();
  183. }
  184. #endregion
  185. /// <summary>
  186. /// Returns with the "/" namespace, the same as the Socket property.
  187. /// </summary>
  188. public Socket GetSocket()
  189. {
  190. return GetSocket("/");
  191. }
  192. /// <summary>
  193. /// Returns with the specified namespace
  194. /// </summary>
  195. public Socket GetSocket(string nsp)
  196. {
  197. if (string.IsNullOrEmpty(nsp))
  198. throw new ArgumentNullException("Namespace parameter is null or empty!");
  199. /*if (nsp[0] != '/')
  200. nsp = "/" + nsp;*/
  201. Socket socket = null;
  202. if (!Namespaces.TryGetValue(nsp, out socket))
  203. {
  204. // No socket found, create one
  205. socket = new Socket(nsp, this);
  206. Namespaces.Add(nsp, socket);
  207. Sockets.Add(socket);
  208. (socket as ISocket).Open();
  209. }
  210. return socket;
  211. }
  212. /// <summary>
  213. /// Internal function to remove a Socket instance from this manager.
  214. /// </summary>
  215. /// <param name="socket"></param>
  216. void IManager.Remove(Socket socket)
  217. {
  218. Namespaces.Remove(socket.Namespace);
  219. Sockets.Remove(socket);
  220. if (Sockets.Count == 0)
  221. Close();
  222. }
  223. #region Connection to the server, and upgrading
  224. /// <summary>
  225. /// This function will begin to open the Socket.IO connection by sending out the handshake request.
  226. /// If the Options' AutoConnect is true, it will be called automatically.
  227. /// </summary>
  228. public void Open()
  229. {
  230. if (State != States.Initial &&
  231. State != States.Closed &&
  232. State != States.Reconnecting)
  233. return;
  234. HTTPManager.Logger.Information("SocketManager", "Opening", this.Context);
  235. ReconnectAt = DateTime.MinValue;
  236. switch (Options.ConnectWith)
  237. {
  238. case TransportTypes.Polling: Transport = new PollingTransport(this); break;
  239. #if !BESTHTTP_DISABLE_WEBSOCKET
  240. case TransportTypes.WebSocket:
  241. Transport = new WebSocketTransport(this);
  242. break;
  243. #endif
  244. }
  245. Transport.Open();
  246. (this as IManager).EmitEvent("connecting");
  247. State = States.Opening;
  248. ConnectionStarted = DateTime.UtcNow;
  249. HTTPManager.Heartbeats.Subscribe(this);
  250. // The root namespace will be opened by default
  251. //GetSocket("/");
  252. }
  253. /// <summary>
  254. /// Closes this Socket.IO connection.
  255. /// </summary>
  256. public void Close()
  257. {
  258. (this as IManager).Close(true);
  259. }
  260. /// <summary>
  261. /// Closes this Socket.IO connection.
  262. /// </summary>
  263. void IManager.Close(bool removeSockets)
  264. {
  265. if (State == States.Closed || closing)
  266. return;
  267. closing = true;
  268. HTTPManager.Logger.Information("SocketManager", "Closing", this.Context);
  269. HTTPManager.Heartbeats.Unsubscribe(this);
  270. // Disconnect the sockets. The Disconnect function will call the Remove function to remove it from the Sockets list.
  271. if (removeSockets)
  272. while (Sockets.Count > 0)
  273. (Sockets[Sockets.Count - 1] as ISocket).Disconnect(removeSockets);
  274. else
  275. for (int i = 0; i < Sockets.Count; ++i)
  276. (Sockets[i] as ISocket).Disconnect(removeSockets);
  277. // Set to Closed after Socket's Disconnect. This way we can send the disconnect events to the server.
  278. State = States.Closed;
  279. LastHeartbeat = DateTime.MinValue;
  280. lastPingReceived = DateTime.MinValue;
  281. if (removeSockets && OfflinePackets != null)
  282. OfflinePackets.Clear();
  283. // Remove the references from the dictionary too.
  284. if (removeSockets)
  285. Namespaces.Clear();
  286. Handshake = null;
  287. if (Transport != null)
  288. Transport.Close();
  289. Transport = null;
  290. if (UpgradingTransport != null)
  291. UpgradingTransport.Close();
  292. UpgradingTransport = null;
  293. closing = false;
  294. }
  295. /// <summary>
  296. /// Called from a ITransport implementation when an error occurs and we may have to try to reconnect.
  297. /// </summary>
  298. void IManager.TryToReconnect()
  299. {
  300. if (State == States.Reconnecting ||
  301. State == States.Closed)
  302. return;
  303. if (!Options.Reconnection || HTTPManager.IsQuitting)
  304. {
  305. Close();
  306. return;
  307. }
  308. if (++ReconnectAttempts >= Options.ReconnectionAttempts)
  309. {
  310. (this as IManager).EmitEvent("reconnect_failed");
  311. Close();
  312. return;
  313. }
  314. Random rand = new Random();
  315. int delay = (int)Options.ReconnectionDelay.TotalMilliseconds * ReconnectAttempts;
  316. ReconnectAt = DateTime.UtcNow +
  317. TimeSpan.FromMilliseconds(Math.Min(rand.Next(/*rand min:*/(int)(delay - (delay * Options.RandomizationFactor)),
  318. /*rand max:*/(int)(delay + (delay * Options.RandomizationFactor))),
  319. (int)Options.ReconnectionDelayMax.TotalMilliseconds));
  320. (this as IManager).Close(false);
  321. State = States.Reconnecting;
  322. for (int i = 0; i < Sockets.Count; ++i)
  323. (Sockets[i] as ISocket).Open();
  324. // In the Close() function we unregistered
  325. HTTPManager.Heartbeats.Subscribe(this);
  326. HTTPManager.Logger.Information("SocketManager", "Reconnecting", this.Context);
  327. }
  328. /// <summary>
  329. /// Called by transports when they are connected to the server.
  330. /// </summary>
  331. bool IManager.OnTransportConnected(ITransport trans)
  332. {
  333. HTTPManager.Logger.Information("SocketManager", string.Format("OnTransportConnected State: {0}, PreviousState: {1}, Current Transport: {2}, Upgrading Transport: {3}", this.State, this.PreviousState, trans.Type, UpgradingTransport != null ? UpgradingTransport.Type.ToString() : "null"), this.Context);
  334. if (State != States.Opening)
  335. return false;
  336. if (PreviousState == States.Reconnecting)
  337. (this as IManager).EmitEvent("reconnect");
  338. State = States.Open;
  339. if (PreviousState == States.Reconnecting)
  340. (this as IManager).EmitEvent("reconnect_before_offline_packets");
  341. for (int i = 0; i < Sockets.Count; ++i)
  342. {
  343. var socket = Sockets[i];
  344. if (socket != null)
  345. socket.OnTransportOpen();
  346. }
  347. ReconnectAttempts = 0;
  348. // Send out packets that we collected while there were no available transport.
  349. SendOfflinePackets();
  350. #if !BESTHTTP_DISABLE_WEBSOCKET
  351. // Can we upgrade to WebSocket transport?
  352. if (Transport.Type != TransportTypes.WebSocket &&
  353. Handshake.Upgrades.Contains("websocket"))
  354. {
  355. UpgradingTransport = new WebSocketTransport(this);
  356. UpgradingTransport.Open();
  357. }
  358. #endif
  359. return true;
  360. }
  361. void IManager.OnTransportError(ITransport trans, string err)
  362. {
  363. if (UpgradingTransport != null && trans != UpgradingTransport)
  364. return;
  365. (this as IManager).EmitError(err);
  366. trans.Close();
  367. (this as IManager).TryToReconnect();
  368. }
  369. void IManager.OnTransportProbed(ITransport trans)
  370. {
  371. HTTPManager.Logger.Information("SocketManager", "\"probe\" packet received", this.Context);
  372. // If we have to reconnect, we will go straight with the transport we were able to upgrade
  373. Options.ConnectWith = trans.Type;
  374. // Pause ourself to wait for any send and receive turn to finish.
  375. State = States.Paused;
  376. }
  377. #endregion
  378. #region Packet Handling
  379. /// <summary>
  380. /// Select the best transport to send out packets.
  381. /// </summary>
  382. private ITransport SelectTransport()
  383. {
  384. if (State != States.Open || Transport == null)
  385. return null;
  386. return Transport.IsRequestInProgress ? null : Transport;
  387. }
  388. /// <summary>
  389. /// Will select the best transport and sends out all packets that are in the OfflinePackets list.
  390. /// </summary>
  391. private void SendOfflinePackets()
  392. {
  393. ITransport trans = SelectTransport();
  394. // Send out packets that we not sent while no transport was available.
  395. // This function is called before the event handlers get the 'connected' event, so
  396. // theoretically the packet orders are remains.
  397. if (OfflinePackets != null && OfflinePackets.Count > 0 && trans != null)
  398. {
  399. trans.Send(OfflinePackets);
  400. OfflinePackets.Clear();
  401. }
  402. }
  403. /// <summary>
  404. /// Internal function that called from the Socket class. It will send out the packet instantly, or if no transport is available it will store
  405. /// the packet in the OfflinePackets list.
  406. /// </summary>
  407. void IManager.SendPacket(OutgoingPacket packet)
  408. {
  409. HTTPManager.Logger.Information("SocketManager", "SendPacket " + packet.ToString(), this.Context);
  410. ITransport trans = SelectTransport();
  411. if (trans != null)
  412. {
  413. try
  414. {
  415. trans.Send(packet);
  416. }
  417. catch(Exception ex)
  418. {
  419. (this as IManager).EmitError(ex.Message + " " + ex.StackTrace);
  420. }
  421. }
  422. else
  423. {
  424. if (packet.IsVolatile)
  425. return;
  426. HTTPManager.Logger.Information("SocketManager", "SendPacket - Offline stashing packet", this.Context);
  427. if (OfflinePackets == null)
  428. OfflinePackets = new List<OutgoingPacket>();
  429. // The same packet can be sent through multiple Sockets.
  430. OfflinePackets.Add(packet);
  431. }
  432. }
  433. /// <summary>
  434. /// Called from the currently operating Transport. Will pass forward to the Socket that has to call the callbacks.
  435. /// </summary>
  436. void IManager.OnPacket(IncomingPacket packet)
  437. {
  438. if (State == States.Closed)
  439. {
  440. HTTPManager.Logger.Information("SocketManager", "OnPacket - State == States.Closed", this.Context);
  441. return;
  442. }
  443. switch(packet.TransportEvent)
  444. {
  445. case TransportEventTypes.Open:
  446. if (Handshake == null)
  447. {
  448. Handshake = packet.DecodedArg as HandshakeData;
  449. (this as IManager).OnTransportConnected(Transport);
  450. return;
  451. }
  452. else
  453. HTTPManager.Logger.Information("SocketManager", "OnPacket - Already received handshake data!", this.Context);
  454. break;
  455. case TransportEventTypes.Ping:
  456. lastPingReceived = DateTime.UtcNow;
  457. //IncomingPacket pingPacket = new Packet(TransportEventTypes.Pong, SocketIOEventTypes.Unknown, "/", 0);
  458. (this as IManager).SendPacket(this.Parser.CreateOutgoing(TransportEventTypes.Pong, null));
  459. break;
  460. case TransportEventTypes.Pong: break;
  461. }
  462. Socket socket = null;
  463. if (Namespaces.TryGetValue(packet.Namespace, out socket))
  464. (socket as ISocket).OnPacket(packet);
  465. else if (packet.TransportEvent == TransportEventTypes.Message)
  466. HTTPManager.Logger.Warning("SocketManager", "Namespace \"" + packet.Namespace + "\" not found!", this.Context);
  467. }
  468. #endregion
  469. /// <summary>
  470. /// Sends an event to all available namespaces.
  471. /// </summary>
  472. public void EmitAll(string eventName, params object[] args)
  473. {
  474. for (int i = 0; i < Sockets.Count; ++i)
  475. Sockets[i].Emit(eventName, args);
  476. }
  477. /// <summary>
  478. /// Emits an internal packet-less event to the root namespace without creating it if it isn't exists yet.
  479. /// </summary>
  480. void IManager.EmitEvent(string eventName, params object[] args)
  481. {
  482. Socket socket = null;
  483. if (Namespaces.TryGetValue("/", out socket))
  484. (socket as ISocket).EmitEvent(eventName, args);
  485. }
  486. /// <summary>
  487. /// Emits an internal packet-less event to the root namespace without creating it if it isn't exists yet.
  488. /// </summary>
  489. void IManager.EmitEvent(SocketIOEventTypes type, params object[] args)
  490. {
  491. (this as IManager).EmitEvent(EventNames.GetNameFor(type), args);
  492. }
  493. void IManager.EmitError(string msg)
  494. {
  495. var outcoming = this.Parser.CreateOutgoing(this.Sockets[0], SocketIOEventTypes.Error, -1, null, new Error(msg));
  496. IncomingPacket inc = IncomingPacket.Empty;
  497. if (outcoming.IsBinary)
  498. inc = this.Parser.Parse(this, outcoming.PayloadData);
  499. else
  500. inc = this.Parser.Parse(this, outcoming.Payload);
  501. (this as IManager).EmitEvent(SocketIOEventTypes.Error, inc.DecodedArg ?? inc.DecodedArgs);
  502. }
  503. void IManager.EmitAll(string eventName, params object[] args)
  504. {
  505. for (int i = 0; i < Sockets.Count; ++i)
  506. (Sockets[i] as ISocket).EmitEvent(eventName, args);
  507. }
  508. #region IHeartbeat Implementation
  509. /// <summary>
  510. /// Called from the HTTPManager's OnUpdate function every frame. It's main function is to send out heartbeat messages.
  511. /// </summary>
  512. void IHeartbeat.OnHeartbeatUpdate(TimeSpan dif)
  513. {
  514. switch (State)
  515. {
  516. case States.Paused:
  517. // To ensure no messages are lost, the upgrade packet will only be sent once all the buffers of the existing transport are flushed and the transport is considered paused.
  518. if (!Transport.IsRequestInProgress &&
  519. !Transport.IsPollingInProgress)
  520. {
  521. State = States.Open;
  522. // Close the current transport
  523. Transport.Close();
  524. // and switch to the newly upgraded one
  525. Transport = UpgradingTransport;
  526. UpgradingTransport = null;
  527. // We will send an Upgrade("5") packet.
  528. Transport.Send(this.Parser.CreateOutgoing(TransportEventTypes.Upgrade, null));
  529. goto case States.Open;
  530. }
  531. break;
  532. case States.Opening:
  533. if (DateTime.UtcNow - ConnectionStarted >= Options.Timeout)
  534. {
  535. (this as IManager).EmitError("Connection timed out!");
  536. (this as IManager).EmitEvent("connect_error");
  537. (this as IManager).EmitEvent("connect_timeout");
  538. (this as IManager).TryToReconnect();
  539. }
  540. break;
  541. case States.Reconnecting:
  542. if (ReconnectAt != DateTime.MinValue && DateTime.UtcNow >= ReconnectAt)
  543. {
  544. (this as IManager).EmitEvent("reconnect_attempt");
  545. (this as IManager).EmitEvent("reconnecting");
  546. Open();
  547. }
  548. break;
  549. case States.Open:
  550. ITransport trans = null;
  551. // Select transport to use
  552. if (Transport != null && Transport.State == TransportStates.Open)
  553. trans = Transport;
  554. // not yet open?
  555. if (trans == null || trans.State != TransportStates.Open)
  556. return;
  557. // Start to poll the server for events
  558. trans.Poll();
  559. // Start to send out unsent packets
  560. SendOfflinePackets();
  561. // First time we reached this point. Set the LastHeartbeat to the current time, 'cause we are just opened.
  562. if (LastHeartbeat == DateTime.MinValue)
  563. {
  564. LastHeartbeat = DateTime.UtcNow;
  565. lastPingReceived = DateTime.UtcNow;
  566. return;
  567. }
  568. if (DateTime.UtcNow - lastPingReceived > TimeSpan.FromMilliseconds(Handshake.PingInterval + Handshake.PingTimeout))
  569. (this as IManager).TryToReconnect();
  570. break; // case States.Open:
  571. }
  572. }
  573. #endregion
  574. }
  575. }
  576. #endif