HubConnection.cs 55 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372
  1. #if !BESTHTTP_DISABLE_SIGNALR_CORE
  2. using System.Threading;
  3. #if CSHARP_7_OR_LATER
  4. using System.Threading.Tasks;
  5. #endif
  6. using BestHTTP.Futures;
  7. using BestHTTP.SignalRCore.Authentication;
  8. using BestHTTP.SignalRCore.Messages;
  9. using System;
  10. using System.Collections.Generic;
  11. using BestHTTP.Logger;
  12. using System.Collections.Concurrent;
  13. using BestHTTP.PlatformSupport.Threading;
  14. namespace BestHTTP.SignalRCore
  15. {
  16. public sealed class HubConnection : BestHTTP.Extensions.IHeartbeat
  17. {
  18. public static readonly object[] EmptyArgs = new object[0];
  19. /// <summary>
  20. /// Uri of the Hub endpoint
  21. /// </summary>
  22. public Uri Uri { get; private set; }
  23. /// <summary>
  24. /// Current state of this connection.
  25. /// </summary>
  26. public ConnectionStates State {
  27. get { return (ConnectionStates)this._state; }
  28. private set {
  29. Interlocked.Exchange(ref this._state, (int)value);
  30. }
  31. }
  32. private volatile int _state;
  33. /// <summary>
  34. /// Current, active ITransport instance.
  35. /// </summary>
  36. public ITransport Transport { get; private set; }
  37. /// <summary>
  38. /// The IProtocol implementation that will parse, encode and decode messages.
  39. /// </summary>
  40. public IProtocol Protocol { get; private set; }
  41. /// <summary>
  42. /// This event is called when the connection is redirected to a new uri.
  43. /// </summary>
  44. public event Action<HubConnection, Uri, Uri> OnRedirected;
  45. /// <summary>
  46. /// This event is called when successfully connected to the hub.
  47. /// </summary>
  48. public event Action<HubConnection> OnConnected;
  49. /// <summary>
  50. /// This event is called when an unexpected error happen and the connection is closed.
  51. /// </summary>
  52. public event Action<HubConnection, string> OnError;
  53. /// <summary>
  54. /// This event is called when the connection is gracefully terminated.
  55. /// </summary>
  56. public event Action<HubConnection> OnClosed;
  57. /// <summary>
  58. /// This event is called for every server-sent message. When returns false, no further processing of the message is done by the plugin.
  59. /// </summary>
  60. public event Func<HubConnection, Message, bool> OnMessage;
  61. /// <summary>
  62. /// Called when the HubConnection start its reconnection process after loosing its underlying connection.
  63. /// </summary>
  64. public event Action<HubConnection, string> OnReconnecting;
  65. /// <summary>
  66. /// Called after a successful reconnection.
  67. /// </summary>
  68. public event Action<HubConnection> OnReconnected;
  69. /// <summary>
  70. /// Called for transport related events.
  71. /// </summary>
  72. public event Action<HubConnection, ITransport, TransportEvents> OnTransportEvent;
  73. /// <summary>
  74. /// An IAuthenticationProvider implementation that will be used to authenticate the connection.
  75. /// </summary>
  76. public IAuthenticationProvider AuthenticationProvider { get; set; }
  77. /// <summary>
  78. /// Negotiation response sent by the server.
  79. /// </summary>
  80. public NegotiationResult NegotiationResult { get; private set; }
  81. /// <summary>
  82. /// Options that has been used to create the HubConnection.
  83. /// </summary>
  84. public HubOptions Options { get; private set; }
  85. /// <summary>
  86. /// How many times this connection is redirected.
  87. /// </summary>
  88. public int RedirectCount { get; private set; }
  89. /// <summary>
  90. /// The reconnect policy that will be used when the underlying connection is lost. Its default value is null.
  91. /// </summary>
  92. public IRetryPolicy ReconnectPolicy { get; set; }
  93. /// <summary>
  94. /// Logging context of this HubConnection instance.
  95. /// </summary>
  96. public LoggingContext Context { get; private set; }
  97. /// <summary>
  98. /// This will be increment to add a unique id to every message the plugin will send.
  99. /// </summary>
  100. private long lastInvocationId = 1;
  101. /// <summary>
  102. /// Id of the last streaming parameter.
  103. /// </summary>
  104. private int lastStreamId = 1;
  105. /// <summary>
  106. /// Store the callback for all sent message that expect a return value from the server. All sent message has
  107. /// a unique invocationId that will be sent back from the server.
  108. /// </summary>
  109. private ConcurrentDictionary<long, InvocationDefinition> invocations = new ConcurrentDictionary<long, InvocationDefinition>();
  110. /// <summary>
  111. /// This is where we store the methodname => callback mapping.
  112. /// </summary>
  113. private ConcurrentDictionary<string, Subscription> subscriptions = new ConcurrentDictionary<string, Subscription>(StringComparer.OrdinalIgnoreCase);
  114. /// <summary>
  115. /// When we sent out the last message to the server.
  116. /// </summary>
  117. private DateTime lastMessageSentAt;
  118. private DateTime lastMessageReceivedAt;
  119. private DateTime connectionStartedAt;
  120. private RetryContext currentContext;
  121. private DateTime reconnectStartTime = DateTime.MinValue;
  122. private DateTime reconnectAt;
  123. private List<TransportTypes> triedoutTransports = new List<TransportTypes>();
  124. private ReaderWriterLockSlim rwLock = new ReaderWriterLockSlim(LockRecursionPolicy.NoRecursion);
  125. public HubConnection(Uri hubUri, IProtocol protocol)
  126. : this(hubUri, protocol, new HubOptions())
  127. {
  128. }
  129. public HubConnection(Uri hubUri, IProtocol protocol, HubOptions options)
  130. {
  131. this.Context = new LoggingContext(this);
  132. this.Uri = hubUri;
  133. this.State = ConnectionStates.Initial;
  134. this.Options = options;
  135. this.Protocol = protocol;
  136. this.Protocol.Connection = this;
  137. this.AuthenticationProvider = new DefaultAccessTokenAuthenticator(this);
  138. }
  139. public void StartConnect()
  140. {
  141. if (this.State != ConnectionStates.Initial &&
  142. this.State != ConnectionStates.Redirected &&
  143. this.State != ConnectionStates.Reconnecting)
  144. {
  145. HTTPManager.Logger.Warning("HubConnection", "StartConnect - Expected Initial or Redirected state, got " + this.State.ToString(), this.Context);
  146. return;
  147. }
  148. if (this.State == ConnectionStates.Initial)
  149. {
  150. this.connectionStartedAt = DateTime.Now;
  151. HTTPManager.Heartbeats.Subscribe(this);
  152. }
  153. HTTPManager.Logger.Verbose("HubConnection", $"StartConnect State: {this.State}, connectionStartedAt: {this.connectionStartedAt.ToString(System.Globalization.CultureInfo.InvariantCulture)}", this.Context);
  154. if (this.AuthenticationProvider != null && this.AuthenticationProvider.IsPreAuthRequired)
  155. {
  156. HTTPManager.Logger.Information("HubConnection", "StartConnect - Authenticating", this.Context);
  157. SetState(ConnectionStates.Authenticating);
  158. this.AuthenticationProvider.OnAuthenticationSucceded += OnAuthenticationSucceded;
  159. this.AuthenticationProvider.OnAuthenticationFailed += OnAuthenticationFailed;
  160. // Start the authentication process
  161. this.AuthenticationProvider.StartAuthentication();
  162. }
  163. else
  164. StartNegotiation();
  165. }
  166. #if CSHARP_7_OR_LATER
  167. TaskCompletionSource<HubConnection> connectAsyncTaskCompletionSource;
  168. public Task<HubConnection> ConnectAsync()
  169. {
  170. if (this.State != ConnectionStates.Initial && this.State != ConnectionStates.Redirected && this.State != ConnectionStates.Reconnecting)
  171. throw new Exception("HubConnection - ConnectAsync - Expected Initial or Redirected state, got " + this.State.ToString());
  172. if (this.connectAsyncTaskCompletionSource != null)
  173. throw new Exception("Connect process already started!");
  174. this.connectAsyncTaskCompletionSource = new TaskCompletionSource<HubConnection>();
  175. this.OnConnected += OnAsyncConnectedCallback;
  176. this.OnError += OnAsyncConnectFailedCallback;
  177. this.StartConnect();
  178. return connectAsyncTaskCompletionSource.Task;
  179. }
  180. private void OnAsyncConnectedCallback(HubConnection hub)
  181. {
  182. this.OnConnected -= OnAsyncConnectedCallback;
  183. this.OnError -= OnAsyncConnectFailedCallback;
  184. this.connectAsyncTaskCompletionSource.TrySetResult(this);
  185. this.connectAsyncTaskCompletionSource = null;
  186. }
  187. private void OnAsyncConnectFailedCallback(HubConnection hub, string error)
  188. {
  189. this.OnConnected -= OnAsyncConnectedCallback;
  190. this.OnError -= OnAsyncConnectFailedCallback;
  191. this.connectAsyncTaskCompletionSource.TrySetException(new Exception(error));
  192. this.connectAsyncTaskCompletionSource = null;
  193. }
  194. #endif
  195. private void OnAuthenticationSucceded(IAuthenticationProvider provider)
  196. {
  197. HTTPManager.Logger.Verbose("HubConnection", "OnAuthenticationSucceded", this.Context);
  198. this.AuthenticationProvider.OnAuthenticationSucceded -= OnAuthenticationSucceded;
  199. this.AuthenticationProvider.OnAuthenticationFailed -= OnAuthenticationFailed;
  200. StartNegotiation();
  201. }
  202. private void OnAuthenticationFailed(IAuthenticationProvider provider, string reason)
  203. {
  204. HTTPManager.Logger.Error("HubConnection", "OnAuthenticationFailed: " + reason, this.Context);
  205. this.AuthenticationProvider.OnAuthenticationSucceded -= OnAuthenticationSucceded;
  206. this.AuthenticationProvider.OnAuthenticationFailed -= OnAuthenticationFailed;
  207. SetState(ConnectionStates.Closed, reason);
  208. }
  209. private void StartNegotiation()
  210. {
  211. HTTPManager.Logger.Verbose("HubConnection", "StartNegotiation", this.Context);
  212. if (this.State == ConnectionStates.CloseInitiated)
  213. {
  214. SetState(ConnectionStates.Closed);
  215. return;
  216. }
  217. #if !BESTHTTP_DISABLE_WEBSOCKET
  218. if (this.Options.SkipNegotiation && this.Options.PreferedTransport == TransportTypes.WebSocket)
  219. {
  220. HTTPManager.Logger.Verbose("HubConnection", "Skipping negotiation", this.Context);
  221. ConnectImpl(this.Options.PreferedTransport);
  222. return;
  223. }
  224. #endif
  225. SetState(ConnectionStates.Negotiating);
  226. // https://github.com/dotnet/aspnetcore/blob/master/src/SignalR/docs/specs/TransportProtocols.md#post-endpoint-basenegotiate-request
  227. // Send out a negotiation request. While we could skip it and connect right with the websocket transport
  228. // it might return with additional information that could be useful.
  229. UriBuilder builder = new UriBuilder(this.Uri);
  230. if (builder.Path.EndsWith("/"))
  231. builder.Path += "negotiate";
  232. else
  233. builder.Path += "/negotiate";
  234. string query = builder.Query;
  235. if (string.IsNullOrEmpty(query))
  236. query = "negotiateVersion=1";
  237. else
  238. query = query.Remove(0, 1) + "&negotiateVersion=1";
  239. builder.Query = query;
  240. var request = new HTTPRequest(builder.Uri, HTTPMethods.Post, OnNegotiationRequestFinished);
  241. request.Context.Add("Hub", this.Context);
  242. if (this.AuthenticationProvider != null)
  243. this.AuthenticationProvider.PrepareRequest(request);
  244. request.Send();
  245. }
  246. private void ConnectImpl(TransportTypes transport)
  247. {
  248. HTTPManager.Logger.Verbose("HubConnection", "ConnectImpl - " + transport, this.Context);
  249. switch (transport)
  250. {
  251. #if !BESTHTTP_DISABLE_WEBSOCKET
  252. case TransportTypes.WebSocket:
  253. if (this.NegotiationResult != null && !IsTransportSupported("WebSockets"))
  254. {
  255. SetState(ConnectionStates.Closed, "Couldn't use preferred transport, as the 'WebSockets' transport isn't supported by the server!");
  256. return;
  257. }
  258. this.Transport = new Transports.WebSocketTransport(this);
  259. this.Transport.OnStateChanged += Transport_OnStateChanged;
  260. break;
  261. #endif
  262. case TransportTypes.LongPolling:
  263. if (this.NegotiationResult != null && !IsTransportSupported("LongPolling"))
  264. {
  265. SetState(ConnectionStates.Closed, "Couldn't use preferred transport, as the 'LongPolling' transport isn't supported by the server!");
  266. return;
  267. }
  268. this.Transport = new Transports.LongPollingTransport(this);
  269. this.Transport.OnStateChanged += Transport_OnStateChanged;
  270. break;
  271. default:
  272. SetState(ConnectionStates.Closed, "Unsupported transport: " + transport);
  273. break;
  274. }
  275. try
  276. {
  277. if (this.OnTransportEvent != null)
  278. this.OnTransportEvent(this, this.Transport, TransportEvents.SelectedToConnect);
  279. }
  280. catch(Exception ex)
  281. {
  282. HTTPManager.Logger.Exception("HubConnection", "ConnectImpl - OnTransportEvent exception in user code!", ex, this.Context);
  283. }
  284. this.Transport.StartConnect();
  285. }
  286. private bool IsTransportSupported(string transportName)
  287. {
  288. // https://github.com/dotnet/aspnetcore/blob/master/src/SignalR/docs/specs/TransportProtocols.md#post-endpoint-basenegotiate-request
  289. // If the negotiation response contains only the url and accessToken, no 'availableTransports' list is sent
  290. if (this.NegotiationResult.SupportedTransports == null)
  291. return true;
  292. for (int i = 0; i < this.NegotiationResult.SupportedTransports.Count; ++i)
  293. if (this.NegotiationResult.SupportedTransports[i].Name.Equals(transportName, StringComparison.OrdinalIgnoreCase))
  294. return true;
  295. return false;
  296. }
  297. private void OnNegotiationRequestFinished(HTTPRequest req, HTTPResponse resp)
  298. {
  299. if (this.State == ConnectionStates.Closed)
  300. return;
  301. if (this.State == ConnectionStates.CloseInitiated)
  302. {
  303. SetState(ConnectionStates.Closed);
  304. return;
  305. }
  306. string errorReason = null;
  307. switch (req.State)
  308. {
  309. // The request finished without any problem.
  310. case HTTPRequestStates.Finished:
  311. if (resp.IsSuccess)
  312. {
  313. HTTPManager.Logger.Information("HubConnection", "Negotiation Request Finished Successfully! Response: " + resp.DataAsText, this.Context);
  314. // Parse negotiation
  315. this.NegotiationResult = NegotiationResult.Parse(resp, out errorReason, this);
  316. // Room for improvement: check validity of the negotiation result:
  317. // If url and accessToken is present, the other two must be null.
  318. // https://github.com/dotnet/aspnetcore/blob/master/src/SignalR/docs/specs/TransportProtocols.md#post-endpoint-basenegotiate-request
  319. if (string.IsNullOrEmpty(errorReason))
  320. {
  321. if (this.NegotiationResult.Url != null)
  322. {
  323. this.SetState(ConnectionStates.Redirected);
  324. if (++this.RedirectCount >= this.Options.MaxRedirects)
  325. errorReason = string.Format("MaxRedirects ({0:N0}) reached!", this.Options.MaxRedirects);
  326. else
  327. {
  328. var oldUri = this.Uri;
  329. this.Uri = this.NegotiationResult.Url;
  330. if (this.OnRedirected != null)
  331. {
  332. try
  333. {
  334. this.OnRedirected(this, oldUri, Uri);
  335. }
  336. catch (Exception ex)
  337. {
  338. HTTPManager.Logger.Exception("HubConnection", "OnNegotiationRequestFinished - OnRedirected", ex, this.Context);
  339. }
  340. }
  341. StartConnect();
  342. }
  343. }
  344. else
  345. ConnectImpl(this.Options.PreferedTransport);
  346. }
  347. }
  348. else // Internal server error?
  349. errorReason = string.Format("Negotiation Request Finished Successfully, but the server sent an error. Status Code: {0}-{1} Message: {2}",
  350. resp.StatusCode,
  351. resp.Message,
  352. resp.DataAsText);
  353. break;
  354. // The request finished with an unexpected error. The request's Exception property may contain more info about the error.
  355. case HTTPRequestStates.Error:
  356. errorReason = "Negotiation Request Finished with Error! " + (req.Exception != null ? (req.Exception.Message + "\n" + req.Exception.StackTrace) : "No Exception");
  357. break;
  358. // The request aborted, initiated by the user.
  359. case HTTPRequestStates.Aborted:
  360. errorReason = "Negotiation Request Aborted!";
  361. break;
  362. // Connecting to the server is timed out.
  363. case HTTPRequestStates.ConnectionTimedOut:
  364. errorReason = "Negotiation Request - Connection Timed Out!";
  365. break;
  366. // The request didn't finished in the given time.
  367. case HTTPRequestStates.TimedOut:
  368. errorReason = "Negotiation Request - Processing the request Timed Out!";
  369. break;
  370. }
  371. if (errorReason != null)
  372. {
  373. this.NegotiationResult = new NegotiationResult();
  374. this.NegotiationResult.NegotiationResponse = resp;
  375. SetState(ConnectionStates.Closed, errorReason);
  376. }
  377. }
  378. public void StartClose()
  379. {
  380. HTTPManager.Logger.Verbose("HubConnection", "StartClose", this.Context);
  381. switch(this.State)
  382. {
  383. case ConnectionStates.Initial:
  384. SetState(ConnectionStates.Closed);
  385. break;
  386. case ConnectionStates.Authenticating:
  387. this.AuthenticationProvider.OnAuthenticationSucceded -= OnAuthenticationSucceded;
  388. this.AuthenticationProvider.OnAuthenticationFailed -= OnAuthenticationFailed;
  389. this.AuthenticationProvider.Cancel();
  390. SetState(ConnectionStates.Closed);
  391. break;
  392. case ConnectionStates.Reconnecting:
  393. SetState(ConnectionStates.Closed);
  394. break;
  395. case ConnectionStates.CloseInitiated:
  396. case ConnectionStates.Closed:
  397. // Already initiated/closed
  398. break;
  399. default:
  400. SetState(ConnectionStates.CloseInitiated);
  401. if (this.Transport != null)
  402. this.Transport.StartClose();
  403. break;
  404. }
  405. }
  406. #if CSHARP_7_OR_LATER
  407. TaskCompletionSource<HubConnection> closeAsyncTaskCompletionSource;
  408. public Task<HubConnection> CloseAsync()
  409. {
  410. if (this.closeAsyncTaskCompletionSource != null)
  411. throw new Exception("CloseAsync already called!");
  412. this.closeAsyncTaskCompletionSource = new TaskCompletionSource<HubConnection>();
  413. this.OnClosed += OnClosedAsyncCallback;
  414. this.OnError += OnClosedAsyncErrorCallback;
  415. // Avoid race condition by caching task prior to StartClose,
  416. // which asynchronously calls OnClosedAsyncCallback, which nulls
  417. // this.closeAsyncTaskCompletionSource immediately before we have
  418. // a chance to read from it.
  419. var task = this.closeAsyncTaskCompletionSource.Task;
  420. this.StartClose();
  421. return task;
  422. }
  423. void OnClosedAsyncCallback(HubConnection hub)
  424. {
  425. this.OnClosed -= OnClosedAsyncCallback;
  426. this.OnError -= OnClosedAsyncErrorCallback;
  427. this.closeAsyncTaskCompletionSource.TrySetResult(this);
  428. this.closeAsyncTaskCompletionSource = null;
  429. }
  430. void OnClosedAsyncErrorCallback(HubConnection hub, string error)
  431. {
  432. this.OnClosed -= OnClosedAsyncCallback;
  433. this.OnError -= OnClosedAsyncErrorCallback;
  434. this.closeAsyncTaskCompletionSource.TrySetException(new Exception(error));
  435. this.closeAsyncTaskCompletionSource = null;
  436. }
  437. #endif
  438. public IFuture<TResult> Invoke<TResult>(string target, params object[] args)
  439. {
  440. Future<TResult> future = new Future<TResult>();
  441. long id = InvokeImp(target,
  442. args,
  443. (message) =>
  444. {
  445. bool isSuccess = string.IsNullOrEmpty(message.error);
  446. if (isSuccess)
  447. future.Assign((TResult)this.Protocol.ConvertTo(typeof(TResult), message.result));
  448. else
  449. future.Fail(new Exception(message.error));
  450. },
  451. typeof(TResult));
  452. if (id < 0)
  453. future.Fail(new Exception("Not in Connected state! Current state: " + this.State));
  454. return future;
  455. }
  456. #if CSHARP_7_OR_LATER
  457. public Task<TResult> InvokeAsync<TResult>(string target, params object[] args)
  458. {
  459. return InvokeAsync<TResult>(target, default(CancellationToken), args);
  460. }
  461. public Task<TResult> InvokeAsync<TResult>(string target, CancellationToken cancellationToken = default, params object[] args)
  462. {
  463. TaskCompletionSource<TResult> tcs = new TaskCompletionSource<TResult>();
  464. long id = InvokeImp(target,
  465. args,
  466. (message) =>
  467. {
  468. if (cancellationToken.IsCancellationRequested)
  469. {
  470. tcs.TrySetCanceled(cancellationToken);
  471. return;
  472. }
  473. bool isSuccess = string.IsNullOrEmpty(message.error);
  474. if (isSuccess)
  475. tcs.TrySetResult((TResult)this.Protocol.ConvertTo(typeof(TResult), message.result));
  476. else
  477. tcs.TrySetException(new Exception(message.error));
  478. },
  479. typeof(TResult));
  480. if (id < 0)
  481. tcs.TrySetException(new Exception("Not in Connected state! Current state: " + this.State));
  482. else
  483. cancellationToken.Register(() => tcs.TrySetCanceled());
  484. return tcs.Task;
  485. }
  486. #endif
  487. public IFuture<object> Send(string target, params object[] args)
  488. {
  489. Future<object> future = new Future<object>();
  490. long id = InvokeImp(target,
  491. args,
  492. (message) =>
  493. {
  494. bool isSuccess = string.IsNullOrEmpty(message.error);
  495. if (isSuccess)
  496. future.Assign(message.item);
  497. else
  498. future.Fail(new Exception(message.error));
  499. },
  500. typeof(object));
  501. if (id < 0)
  502. future.Fail(new Exception("Not in Connected state! Current state: " + this.State));
  503. return future;
  504. }
  505. #if CSHARP_7_OR_LATER
  506. public Task<object> SendAsync(string target, params object[] args)
  507. {
  508. return SendAsync(target, default(CancellationToken), args);
  509. }
  510. public Task<object> SendAsync(string target, CancellationToken cancellationToken = default, params object[] args)
  511. {
  512. TaskCompletionSource<object> tcs = new TaskCompletionSource<object>();
  513. long id = InvokeImp(target,
  514. args,
  515. (message) =>
  516. {
  517. if (cancellationToken.IsCancellationRequested)
  518. {
  519. tcs.TrySetCanceled(cancellationToken);
  520. return;
  521. }
  522. bool isSuccess = string.IsNullOrEmpty(message.error);
  523. if (isSuccess)
  524. tcs.TrySetResult(message.item);
  525. else
  526. tcs.TrySetException(new Exception(message.error));
  527. },
  528. typeof(object));
  529. if (id < 0)
  530. tcs.TrySetException(new Exception("Not in Connected state! Current state: " + this.State));
  531. else
  532. cancellationToken.Register(() => tcs.TrySetCanceled());
  533. return tcs.Task;
  534. }
  535. #endif
  536. private long InvokeImp(string target, object[] args, Action<Message> callback, Type itemType, bool isStreamingInvocation = false)
  537. {
  538. if (this.State != ConnectionStates.Connected)
  539. return -1;
  540. bool blockingInvocation = callback == null;
  541. long invocationId = blockingInvocation ? 0 : System.Threading.Interlocked.Increment(ref this.lastInvocationId);
  542. var message = new Message
  543. {
  544. type = isStreamingInvocation ? MessageTypes.StreamInvocation : MessageTypes.Invocation,
  545. invocationId = blockingInvocation ? null : invocationId.ToString(),
  546. target = target,
  547. arguments = args,
  548. nonblocking = callback == null,
  549. };
  550. SendMessage(message);
  551. if (!blockingInvocation)
  552. if (!this.invocations.TryAdd(invocationId, new InvocationDefinition { callback = callback, returnType = itemType }))
  553. HTTPManager.Logger.Warning("HubConnection", "InvokeImp - invocations already contains id: " + invocationId);
  554. return invocationId;
  555. }
  556. internal void SendMessage(Message message)
  557. {
  558. if (HTTPManager.Logger.Level == Logger.Loglevels.All)
  559. HTTPManager.Logger.Verbose("HubConnection", "SendMessage: " + message.ToString(), this.Context);
  560. try
  561. {
  562. using (new WriteLock(this.rwLock))
  563. {
  564. var encoded = this.Protocol.EncodeMessage(message);
  565. if (encoded.Data != null)
  566. {
  567. this.lastMessageSentAt = DateTime.Now;
  568. this.Transport.Send(encoded);
  569. }
  570. }
  571. }
  572. catch (Exception ex)
  573. {
  574. HTTPManager.Logger.Exception("HubConnection", "SendMessage", ex, this.Context);
  575. }
  576. }
  577. public DownStreamItemController<TDown> GetDownStreamController<TDown>(string target, params object[] args)
  578. {
  579. long invocationId = System.Threading.Interlocked.Increment(ref this.lastInvocationId);
  580. var future = new Future<TDown>();
  581. future.BeginProcess();
  582. var controller = new DownStreamItemController<TDown>(this, invocationId, future);
  583. Action<Message> callback = (Message msg) =>
  584. {
  585. switch (msg.type)
  586. {
  587. // StreamItem message contains only one item.
  588. case MessageTypes.StreamItem:
  589. {
  590. if (controller.IsCanceled)
  591. break;
  592. TDown item = (TDown)this.Protocol.ConvertTo(typeof(TDown), msg.item);
  593. future.AssignItem(item);
  594. break;
  595. }
  596. case MessageTypes.Completion:
  597. {
  598. bool isSuccess = string.IsNullOrEmpty(msg.error);
  599. if (isSuccess)
  600. {
  601. // While completion message must not contain any result, this should be future-proof
  602. if (!controller.IsCanceled && msg.result != null)
  603. {
  604. TDown result = (TDown)this.Protocol.ConvertTo(typeof(TDown), msg.result);
  605. future.AssignItem(result);
  606. }
  607. future.Finish();
  608. }
  609. else
  610. future.Fail(new Exception(msg.error));
  611. break;
  612. }
  613. }
  614. };
  615. var message = new Message
  616. {
  617. type = MessageTypes.StreamInvocation,
  618. invocationId = invocationId.ToString(),
  619. target = target,
  620. arguments = args,
  621. nonblocking = false,
  622. };
  623. SendMessage(message);
  624. if (callback != null)
  625. if (!this.invocations.TryAdd(invocationId, new InvocationDefinition { callback = callback, returnType = typeof(TDown) }))
  626. HTTPManager.Logger.Warning("HubConnection", "GetDownStreamController - invocations already contains id: " + invocationId);
  627. return controller;
  628. }
  629. public UpStreamItemController<TResult> GetUpStreamController<TResult>(string target, int paramCount, bool downStream, object[] args)
  630. {
  631. Future<TResult> future = new Future<TResult>();
  632. future.BeginProcess();
  633. long invocationId = System.Threading.Interlocked.Increment(ref this.lastInvocationId);
  634. string[] streamIds = new string[paramCount];
  635. for (int i = 0; i < paramCount; i++)
  636. streamIds[i] = System.Threading.Interlocked.Increment(ref this.lastStreamId).ToString();
  637. var controller = new UpStreamItemController<TResult>(this, invocationId, streamIds, future);
  638. Action<Message> callback = (Message msg) => {
  639. switch (msg.type)
  640. {
  641. // StreamItem message contains only one item.
  642. case MessageTypes.StreamItem:
  643. {
  644. if (controller.IsCanceled)
  645. break;
  646. TResult item = (TResult)this.Protocol.ConvertTo(typeof(TResult), msg.item);
  647. future.AssignItem(item);
  648. break;
  649. }
  650. case MessageTypes.Completion:
  651. {
  652. bool isSuccess = string.IsNullOrEmpty(msg.error);
  653. if (isSuccess)
  654. {
  655. // While completion message must not contain any result, this should be future-proof
  656. if (!controller.IsCanceled && msg.result != null)
  657. {
  658. TResult result = (TResult)this.Protocol.ConvertTo(typeof(TResult), msg.result);
  659. future.AssignItem(result);
  660. }
  661. future.Finish();
  662. }
  663. else
  664. {
  665. var ex = new Exception(msg.error);
  666. future.Fail(ex);
  667. }
  668. break;
  669. }
  670. }
  671. };
  672. var messageToSend = new Message
  673. {
  674. type = downStream ? MessageTypes.StreamInvocation : MessageTypes.Invocation,
  675. invocationId = invocationId.ToString(),
  676. target = target,
  677. arguments = args,
  678. streamIds = streamIds,
  679. nonblocking = false,
  680. };
  681. SendMessage(messageToSend);
  682. if (!this.invocations.TryAdd(invocationId, new InvocationDefinition { callback = callback, returnType = typeof(TResult) }))
  683. HTTPManager.Logger.Warning("HubConnection", "GetUpStreamController - invocations already contains id: " + invocationId);
  684. return controller;
  685. }
  686. public void On(string methodName, Action callback)
  687. {
  688. On(methodName, null, (args) => callback());
  689. }
  690. public void On<T1>(string methodName, Action<T1> callback)
  691. {
  692. On(methodName, new Type[] { typeof(T1) }, (args) => callback((T1)args[0]));
  693. }
  694. public void On<T1, T2>(string methodName, Action<T1, T2> callback)
  695. {
  696. On(methodName,
  697. new Type[] { typeof(T1), typeof(T2) },
  698. (args) => callback((T1)args[0], (T2)args[1]));
  699. }
  700. public void On<T1, T2, T3>(string methodName, Action<T1, T2, T3> callback)
  701. {
  702. On(methodName,
  703. new Type[] { typeof(T1), typeof(T2), typeof(T3) },
  704. (args) => callback((T1)args[0], (T2)args[1], (T3)args[2]));
  705. }
  706. public void On<T1, T2, T3, T4>(string methodName, Action<T1, T2, T3, T4> callback)
  707. {
  708. On(methodName,
  709. new Type[] { typeof(T1), typeof(T2), typeof(T3), typeof(T4) },
  710. (args) => callback((T1)args[0], (T2)args[1], (T3)args[2], (T4)args[3]));
  711. }
  712. private void On(string methodName, Type[] paramTypes, Action<object[]> callback)
  713. {
  714. if (this.State >= ConnectionStates.CloseInitiated)
  715. throw new Exception("Hub connection already closing or closed!");
  716. this.subscriptions.GetOrAdd(methodName, _ => new Subscription())
  717. .Add(paramTypes, callback);
  718. }
  719. /// <summary>
  720. /// Remove all event handlers for <paramref name="methodName"/> that subscribed with an On call.
  721. /// </summary>
  722. public void Remove(string methodName)
  723. {
  724. if (this.State >= ConnectionStates.CloseInitiated)
  725. throw new Exception("Hub connection already closing or closed!");
  726. Subscription _;
  727. this.subscriptions.TryRemove(methodName, out _);
  728. }
  729. internal Subscription GetSubscription(string methodName)
  730. {
  731. Subscription subscribtion = null;
  732. this.subscriptions.TryGetValue(methodName, out subscribtion);
  733. return subscribtion;
  734. }
  735. internal Type GetItemType(long invocationId)
  736. {
  737. InvocationDefinition def;
  738. this.invocations.TryGetValue(invocationId, out def);
  739. return def.returnType;
  740. }
  741. internal void OnMessages(List<Message> messages)
  742. {
  743. this.lastMessageReceivedAt = DateTime.Now;
  744. for (int messageIdx = 0; messageIdx < messages.Count; ++messageIdx)
  745. {
  746. var message = messages[messageIdx];
  747. if (this.OnMessage != null)
  748. {
  749. try
  750. {
  751. if (!this.OnMessage(this, message))
  752. continue;
  753. }
  754. catch (Exception ex)
  755. {
  756. HTTPManager.Logger.Exception("HubConnection", "Exception in OnMessage user code!", ex, this.Context);
  757. }
  758. }
  759. switch (message.type)
  760. {
  761. case MessageTypes.Invocation:
  762. {
  763. Subscription subscribtion = null;
  764. if (this.subscriptions.TryGetValue(message.target, out subscribtion))
  765. {
  766. for (int i = 0; i < subscribtion.callbacks.Count; ++i)
  767. {
  768. var callbackDesc = subscribtion.callbacks[i];
  769. object[] realArgs = null;
  770. try
  771. {
  772. realArgs = this.Protocol.GetRealArguments(callbackDesc.ParamTypes, message.arguments);
  773. }
  774. catch (Exception ex)
  775. {
  776. HTTPManager.Logger.Exception("HubConnection", "OnMessages - Invocation - GetRealArguments", ex, this.Context);
  777. }
  778. try
  779. {
  780. callbackDesc.Callback.Invoke(realArgs);
  781. }
  782. catch (Exception ex)
  783. {
  784. HTTPManager.Logger.Exception("HubConnection", "OnMessages - Invocation - Invoke", ex, this.Context);
  785. }
  786. }
  787. }
  788. break;
  789. }
  790. case MessageTypes.StreamItem:
  791. {
  792. long invocationId;
  793. if (long.TryParse(message.invocationId, out invocationId))
  794. {
  795. InvocationDefinition def;
  796. if (this.invocations.TryGetValue(invocationId, out def) && def.callback != null)
  797. {
  798. try
  799. {
  800. def.callback(message);
  801. }
  802. catch (Exception ex)
  803. {
  804. HTTPManager.Logger.Exception("HubConnection", "OnMessages - StreamItem - callback", ex, this.Context);
  805. }
  806. }
  807. }
  808. break;
  809. }
  810. case MessageTypes.Completion:
  811. {
  812. long invocationId;
  813. if (long.TryParse(message.invocationId, out invocationId))
  814. {
  815. InvocationDefinition def;
  816. if (this.invocations.TryRemove(invocationId, out def) && def.callback != null)
  817. {
  818. try
  819. {
  820. def.callback(message);
  821. }
  822. catch (Exception ex)
  823. {
  824. HTTPManager.Logger.Exception("HubConnection", "OnMessages - Completion - callback", ex, this.Context);
  825. }
  826. }
  827. }
  828. break;
  829. }
  830. case MessageTypes.Ping:
  831. // Send back an answer
  832. SendMessage(new Message() { type = MessageTypes.Ping });
  833. break;
  834. case MessageTypes.Close:
  835. SetState(ConnectionStates.Closed, message.error, message.allowReconnect);
  836. if (this.Transport != null)
  837. this.Transport.StartClose();
  838. return;
  839. }
  840. }
  841. }
  842. private void Transport_OnStateChanged(TransportStates oldState, TransportStates newState)
  843. {
  844. HTTPManager.Logger.Verbose("HubConnection", string.Format("Transport_OnStateChanged - oldState: {0} newState: {1}", oldState.ToString(), newState.ToString()), this.Context);
  845. if (this.State == ConnectionStates.Closed)
  846. {
  847. HTTPManager.Logger.Verbose("HubConnection", "Transport_OnStateChanged - already closed!", this.Context);
  848. return;
  849. }
  850. switch (newState)
  851. {
  852. case TransportStates.Connected:
  853. try
  854. {
  855. if (this.OnTransportEvent != null)
  856. this.OnTransportEvent(this, this.Transport, TransportEvents.Connected);
  857. }
  858. catch (Exception ex)
  859. {
  860. HTTPManager.Logger.Exception("HubConnection", "Exception in OnTransportEvent user code!", ex, this.Context);
  861. }
  862. SetState(ConnectionStates.Connected);
  863. break;
  864. case TransportStates.Failed:
  865. if (this.State == ConnectionStates.Negotiating && !HTTPManager.IsQuitting)
  866. {
  867. try
  868. {
  869. if (this.OnTransportEvent != null)
  870. this.OnTransportEvent(this, this.Transport, TransportEvents.FailedToConnect);
  871. }
  872. catch (Exception ex)
  873. {
  874. HTTPManager.Logger.Exception("HubConnection", "Exception in OnTransportEvent user code!", ex, this.Context);
  875. }
  876. this.triedoutTransports.Add(this.Transport.TransportType);
  877. var nextTransport = GetNextTransportToTry();
  878. if (nextTransport == null)
  879. SetState(ConnectionStates.Closed, this.Transport.ErrorReason);
  880. else
  881. ConnectImpl(nextTransport.Value);
  882. }
  883. else
  884. {
  885. try
  886. {
  887. if (this.OnTransportEvent != null)
  888. this.OnTransportEvent(this, this.Transport, TransportEvents.ClosedWithError);
  889. }
  890. catch (Exception ex)
  891. {
  892. HTTPManager.Logger.Exception("HubConnection", "Exception in OnTransportEvent user code!", ex, this.Context);
  893. }
  894. SetState(ConnectionStates.Closed, HTTPManager.IsQuitting ? null : this.Transport.ErrorReason);
  895. }
  896. break;
  897. case TransportStates.Closed:
  898. {
  899. try
  900. {
  901. if (this.OnTransportEvent != null)
  902. this.OnTransportEvent(this, this.Transport, TransportEvents.Closed);
  903. }
  904. catch (Exception ex)
  905. {
  906. HTTPManager.Logger.Exception("HubConnection", "Exception in OnTransportEvent user code!", ex, this.Context);
  907. }
  908. SetState(ConnectionStates.Closed);
  909. }
  910. break;
  911. }
  912. }
  913. private TransportTypes? GetNextTransportToTry()
  914. {
  915. foreach (TransportTypes val in Enum.GetValues(typeof(TransportTypes)))
  916. if (!this.triedoutTransports.Contains(val) && IsTransportSupported(val.ToString()))
  917. return val;
  918. return null;
  919. }
  920. private void SetState(ConnectionStates state, string errorReason = null, bool allowReconnect = true)
  921. {
  922. if (string.IsNullOrEmpty(errorReason))
  923. HTTPManager.Logger.Information("HubConnection", string.Format("SetState - from State: '{0}' to State: '{1}', allowReconnect: {2}", this.State, state, allowReconnect), this.Context);
  924. else
  925. HTTPManager.Logger.Information("HubConnection", string.Format("SetState - from State: '{0}' to State: '{1}', errorReason: '{2}', allowReconnect: {3}", this.State, state, errorReason, allowReconnect), this.Context);
  926. if (this.State == state)
  927. return;
  928. var previousState = this.State;
  929. this.State = state;
  930. switch (state)
  931. {
  932. case ConnectionStates.Initial:
  933. case ConnectionStates.Authenticating:
  934. case ConnectionStates.Negotiating:
  935. case ConnectionStates.CloseInitiated:
  936. break;
  937. case ConnectionStates.Reconnecting:
  938. break;
  939. case ConnectionStates.Connected:
  940. // If reconnectStartTime isn't its default value we reconnected
  941. if (this.reconnectStartTime != DateTime.MinValue)
  942. {
  943. try
  944. {
  945. if (this.OnReconnected != null)
  946. this.OnReconnected(this);
  947. }
  948. catch (Exception ex)
  949. {
  950. HTTPManager.Logger.Exception("HubConnection", "OnReconnected", ex, this.Context);
  951. }
  952. }
  953. else
  954. {
  955. try
  956. {
  957. if (this.OnConnected != null)
  958. this.OnConnected(this);
  959. }
  960. catch (Exception ex)
  961. {
  962. HTTPManager.Logger.Exception("HubConnection", "Exception in OnConnected user code!", ex, this.Context);
  963. }
  964. }
  965. this.lastMessageSentAt = DateTime.Now;
  966. this.lastMessageReceivedAt = DateTime.Now;
  967. // Clean up reconnect related fields
  968. this.currentContext = new RetryContext();
  969. this.reconnectStartTime = DateTime.MinValue;
  970. this.reconnectAt = DateTime.MinValue;
  971. break;
  972. case ConnectionStates.Closed:
  973. // Go through all invocations and cancel them.
  974. var error = new Message();
  975. error.type = MessageTypes.Close;
  976. error.error = errorReason;
  977. foreach (var kvp in this.invocations)
  978. {
  979. try
  980. {
  981. kvp.Value.callback(error);
  982. }
  983. catch
  984. { }
  985. }
  986. this.invocations.Clear();
  987. // No errorReason? It's an expected closure.
  988. if (errorReason == null)
  989. {
  990. if (this.OnClosed != null)
  991. {
  992. try
  993. {
  994. this.OnClosed(this);
  995. }
  996. catch(Exception ex)
  997. {
  998. HTTPManager.Logger.Exception("HubConnection", "Exception in OnClosed user code!", ex, this.Context);
  999. }
  1000. }
  1001. }
  1002. else
  1003. {
  1004. // If possible, try to reconnect
  1005. if (allowReconnect && this.ReconnectPolicy != null && (previousState == ConnectionStates.Connected || this.reconnectStartTime != DateTime.MinValue))
  1006. {
  1007. // It's the first attempt after a successful connection
  1008. if (this.reconnectStartTime == DateTime.MinValue)
  1009. {
  1010. this.connectionStartedAt = this.reconnectStartTime = DateTime.Now;
  1011. try
  1012. {
  1013. if (this.OnReconnecting != null)
  1014. this.OnReconnecting(this, errorReason);
  1015. }
  1016. catch (Exception ex)
  1017. {
  1018. HTTPManager.Logger.Exception("HubConnection", "SetState - ConnectionStates.Reconnecting", ex, this.Context);
  1019. }
  1020. }
  1021. RetryContext context = new RetryContext
  1022. {
  1023. ElapsedTime = DateTime.Now - this.reconnectStartTime,
  1024. PreviousRetryCount = this.currentContext.PreviousRetryCount,
  1025. RetryReason = errorReason
  1026. };
  1027. TimeSpan? nextAttempt = null;
  1028. try
  1029. {
  1030. nextAttempt = this.ReconnectPolicy.GetNextRetryDelay(context);
  1031. }
  1032. catch (Exception ex)
  1033. {
  1034. HTTPManager.Logger.Exception("HubConnection", "ReconnectPolicy.GetNextRetryDelay", ex, this.Context);
  1035. }
  1036. // No more reconnect attempt, we are closing
  1037. if (nextAttempt == null)
  1038. {
  1039. HTTPManager.Logger.Warning("HubConnecction", "No more reconnect attempt!", this.Context);
  1040. // Clean up everything
  1041. this.currentContext = new RetryContext();
  1042. this.reconnectStartTime = DateTime.MinValue;
  1043. this.reconnectAt = DateTime.MinValue;
  1044. }
  1045. else
  1046. {
  1047. HTTPManager.Logger.Information("HubConnecction", "Next reconnect attempt after " + nextAttempt.Value.ToString(), this.Context);
  1048. this.currentContext = context;
  1049. this.currentContext.PreviousRetryCount += 1;
  1050. this.reconnectAt = DateTime.Now + nextAttempt.Value;
  1051. this.SetState(ConnectionStates.Reconnecting);
  1052. return;
  1053. }
  1054. }
  1055. if (this.OnError != null)
  1056. {
  1057. try
  1058. {
  1059. this.OnError(this, errorReason);
  1060. }
  1061. catch(Exception ex)
  1062. {
  1063. HTTPManager.Logger.Exception("HubConnection", "Exception in OnError user code!", ex, this.Context);
  1064. }
  1065. }
  1066. }
  1067. HTTPManager.Heartbeats.Unsubscribe(this);
  1068. this.rwLock.Dispose();
  1069. this.rwLock = null;
  1070. break;
  1071. }
  1072. }
  1073. void BestHTTP.Extensions.IHeartbeat.OnHeartbeatUpdate(TimeSpan dif)
  1074. {
  1075. switch (this.State)
  1076. {
  1077. case ConnectionStates.Negotiating:
  1078. case ConnectionStates.Authenticating:
  1079. case ConnectionStates.Redirected:
  1080. if (DateTime.Now >= this.connectionStartedAt + this.Options.ConnectTimeout)
  1081. {
  1082. if (this.AuthenticationProvider != null)
  1083. {
  1084. this.AuthenticationProvider.OnAuthenticationSucceded -= OnAuthenticationSucceded;
  1085. this.AuthenticationProvider.OnAuthenticationFailed -= OnAuthenticationFailed;
  1086. try
  1087. {
  1088. this.AuthenticationProvider.Cancel();
  1089. }
  1090. catch(Exception ex)
  1091. {
  1092. HTTPManager.Logger.Exception("HubConnection", "Exception in AuthenticationProvider.Cancel !", ex, this.Context);
  1093. }
  1094. }
  1095. if (this.Transport != null)
  1096. {
  1097. this.Transport.OnStateChanged -= Transport_OnStateChanged;
  1098. this.Transport.StartClose();
  1099. }
  1100. SetState(ConnectionStates.Closed, string.Format("Couldn't connect in the given time({0})!", this.Options.ConnectTimeout));
  1101. }
  1102. break;
  1103. case ConnectionStates.Connected:
  1104. if (this.Options.PingInterval != TimeSpan.Zero && DateTime.Now - this.lastMessageReceivedAt >= this.Options.PingTimeoutInterval)
  1105. {
  1106. // The transport itself can be in a failure state or in a completely valid one, so while we do not want to receive anything from it, we have to try to close it
  1107. if (this.Transport != null)
  1108. {
  1109. this.Transport.OnStateChanged -= Transport_OnStateChanged;
  1110. this.Transport.StartClose();
  1111. }
  1112. SetState(ConnectionStates.Closed, string.Format("PingInterval set to '{0}' and no message is received since '{1}'. PingTimeoutInterval: '{2}'", this.Options.PingInterval, this.lastMessageReceivedAt, this.Options.PingTimeoutInterval));
  1113. }
  1114. else if (this.Options.PingInterval != TimeSpan.Zero && DateTime.Now - this.lastMessageSentAt >= this.Options.PingInterval)
  1115. SendMessage(new Message() { type = MessageTypes.Ping });
  1116. break;
  1117. case ConnectionStates.Reconnecting:
  1118. if (this.reconnectAt != DateTime.MinValue && DateTime.Now >= this.reconnectAt)
  1119. {
  1120. this.connectionStartedAt = DateTime.Now;
  1121. this.reconnectAt = DateTime.MinValue;
  1122. this.triedoutTransports.Clear();
  1123. this.StartConnect();
  1124. }
  1125. break;
  1126. }
  1127. }
  1128. }
  1129. }
  1130. #endif