HttpTransport.cs 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. using System;
  2. using System.Net.Http;
  3. using System.Threading;
  4. using System.Threading.Tasks;
  5. using SocketIOClient.JsonSerializer;
  6. using SocketIOClient.Messages;
  7. using SocketIOClient.UriConverters;
  8. namespace SocketIOClient.Transport
  9. {
  10. public class HttpTransport : BaseTransport
  11. {
  12. public HttpTransport(HttpClient http,
  13. IHttpPollingHandler pollingHandler,
  14. SocketIOOptions options,
  15. IJsonSerializer jsonSerializer) : base(options, jsonSerializer)
  16. {
  17. _http = http;
  18. _httpPollingHandler = pollingHandler;
  19. _httpPollingHandler.TextObservable.Subscribe(this);
  20. _httpPollingHandler.BytesObservable.Subscribe(this);
  21. }
  22. string _httpUri;
  23. CancellationTokenSource _pollingTokenSource;
  24. readonly HttpClient _http;
  25. readonly IHttpPollingHandler _httpPollingHandler;
  26. private void StartPolling(CancellationToken cancellationToken)
  27. {
  28. Task.Factory.StartNew(async () =>
  29. {
  30. int retry = 0;
  31. while (!cancellationToken.IsCancellationRequested)
  32. {
  33. if (!_httpUri.Contains("&sid="))
  34. {
  35. await Task.Delay(20);
  36. continue;
  37. }
  38. try
  39. {
  40. await _httpPollingHandler.GetAsync(_httpUri, CancellationToken.None).ConfigureAwait(false);
  41. }
  42. catch (Exception e)
  43. {
  44. retry++;
  45. if (retry >= 3)
  46. {
  47. MessageSubject.OnError(e);
  48. break;
  49. }
  50. await Task.Delay(100 * (int)Math.Pow(2, retry));
  51. }
  52. }
  53. }, TaskCreationOptions.LongRunning);
  54. }
  55. public override async Task ConnectAsync(Uri uri, CancellationToken cancellationToken)
  56. {
  57. var req = new HttpRequestMessage(HttpMethod.Get, uri);
  58. // if (_options.ExtraHeaders != null)
  59. // {
  60. // foreach (var item in _options.ExtraHeaders)
  61. // {
  62. // req.Headers.Add(item.Key, item.Value);
  63. // }
  64. // }
  65. _httpUri = uri.ToString();
  66. await _httpPollingHandler.SendAsync(req, new CancellationTokenSource(Options.ConnectionTimeout).Token).ConfigureAwait(false);
  67. if (_pollingTokenSource != null)
  68. {
  69. _pollingTokenSource.Cancel();
  70. }
  71. _pollingTokenSource = new CancellationTokenSource();
  72. StartPolling(_pollingTokenSource.Token);
  73. }
  74. public override Task DisconnectAsync(CancellationToken cancellationToken)
  75. {
  76. _pollingTokenSource.Cancel();
  77. if (PingTokenSource != null)
  78. {
  79. PingTokenSource.Cancel();
  80. }
  81. return Task.CompletedTask;
  82. }
  83. public override void AddHeader(string key, string val)
  84. {
  85. _http.DefaultRequestHeaders.Add(key, val);
  86. }
  87. public override void Dispose()
  88. {
  89. base.Dispose();
  90. _httpPollingHandler.Dispose();
  91. }
  92. public override async Task SendAsync(Payload payload, CancellationToken cancellationToken)
  93. {
  94. await _httpPollingHandler.PostAsync(_httpUri, payload.Text, cancellationToken);
  95. if (payload.Bytes != null && payload.Bytes.Count > 0)
  96. {
  97. await _httpPollingHandler.PostAsync(_httpUri, payload.Bytes, cancellationToken);
  98. }
  99. }
  100. protected override async Task OpenAsync(OpenedMessage msg)
  101. {
  102. //if (!_httpUri.Contains("&sid="))
  103. //{
  104. //}
  105. _httpUri += "&sid=" + msg.Sid;
  106. await base.OpenAsync(msg);
  107. }
  108. }
  109. }