123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120 |
- using System;
- using System.Net.Http;
- using System.Threading;
- using System.Threading.Tasks;
- using SocketIOClient.JsonSerializer;
- using SocketIOClient.Messages;
- using SocketIOClient.UriConverters;
- namespace SocketIOClient.Transport
- {
- public class HttpTransport : BaseTransport
- {
- public HttpTransport(HttpClient http,
- IHttpPollingHandler pollingHandler,
- SocketIOOptions options,
- IJsonSerializer jsonSerializer) : base(options, jsonSerializer)
- {
- _http = http;
- _httpPollingHandler = pollingHandler;
- _httpPollingHandler.TextObservable.Subscribe(this);
- _httpPollingHandler.BytesObservable.Subscribe(this);
- }
- string _httpUri;
- CancellationTokenSource _pollingTokenSource;
- readonly HttpClient _http;
- readonly IHttpPollingHandler _httpPollingHandler;
- private void StartPolling(CancellationToken cancellationToken)
- {
- Task.Factory.StartNew(async () =>
- {
- int retry = 0;
- while (!cancellationToken.IsCancellationRequested)
- {
- if (!_httpUri.Contains("&sid="))
- {
- await Task.Delay(20);
- continue;
- }
- try
- {
- await _httpPollingHandler.GetAsync(_httpUri, CancellationToken.None).ConfigureAwait(false);
- }
- catch (Exception e)
- {
- retry++;
- if (retry >= 3)
- {
- MessageSubject.OnError(e);
- break;
- }
- await Task.Delay(100 * (int)Math.Pow(2, retry));
- }
- }
- }, TaskCreationOptions.LongRunning);
- }
- public override async Task ConnectAsync(Uri uri, CancellationToken cancellationToken)
- {
- var req = new HttpRequestMessage(HttpMethod.Get, uri);
- // if (_options.ExtraHeaders != null)
- // {
- // foreach (var item in _options.ExtraHeaders)
- // {
- // req.Headers.Add(item.Key, item.Value);
- // }
- // }
- _httpUri = uri.ToString();
- await _httpPollingHandler.SendAsync(req, new CancellationTokenSource(Options.ConnectionTimeout).Token).ConfigureAwait(false);
- if (_pollingTokenSource != null)
- {
- _pollingTokenSource.Cancel();
- }
- _pollingTokenSource = new CancellationTokenSource();
- StartPolling(_pollingTokenSource.Token);
- }
- public override Task DisconnectAsync(CancellationToken cancellationToken)
- {
- _pollingTokenSource.Cancel();
- if (PingTokenSource != null)
- {
- PingTokenSource.Cancel();
- }
- return Task.CompletedTask;
- }
- public override void AddHeader(string key, string val)
- {
- _http.DefaultRequestHeaders.Add(key, val);
- }
- public override void Dispose()
- {
- base.Dispose();
- _httpPollingHandler.Dispose();
- }
- public override async Task SendAsync(Payload payload, CancellationToken cancellationToken)
- {
- await _httpPollingHandler.PostAsync(_httpUri, payload.Text, cancellationToken);
- if (payload.Bytes != null && payload.Bytes.Count > 0)
- {
- await _httpPollingHandler.PostAsync(_httpUri, payload.Bytes, cancellationToken);
- }
- }
- protected override async Task OpenAsync(OpenedMessage msg)
- {
- //if (!_httpUri.Contains("&sid="))
- //{
- //}
- _httpUri += "&sid=" + msg.Sid;
- await base.OpenAsync(msg);
- }
- }
- }
|