using System; using System.Collections.Generic; using System.Net.Http; using System.Reactive.Linq; using System.Reactive.Subjects; using System.Text; using System.Threading; using System.Threading.Tasks; namespace SocketIOClient.Transport { public abstract class HttpPollingHandler : IHttpPollingHandler { public HttpPollingHandler(HttpClient httpClient) { HttpClient = httpClient; TextSubject = new Subject(); BytesSubject = new Subject(); TextObservable = TextSubject.AsObservable(); BytesObservable = BytesSubject.AsObservable(); } protected HttpClient HttpClient { get; } protected Subject TextSubject{get;} protected Subject BytesSubject{get;} public IObservable TextObservable { get; } public IObservable BytesObservable { get; } protected string AppendRandom(string uri) { return uri + "&t=" + DateTimeOffset.Now.ToUnixTimeSeconds(); } public async Task GetAsync(string uri, CancellationToken cancellationToken) { var req = new HttpRequestMessage(HttpMethod.Get, AppendRandom(uri)); var resMsg = await HttpClient.SendAsync(req, cancellationToken).ConfigureAwait(false); if (!resMsg.IsSuccessStatusCode) { throw new HttpRequestException($"Response status code does not indicate success: {resMsg.StatusCode}"); } await ProduceMessageAsync(resMsg).ConfigureAwait(false); } public async Task SendAsync(HttpRequestMessage req, CancellationToken cancellationToken) { var resMsg = await HttpClient.SendAsync(req, cancellationToken).ConfigureAwait(false); if (!resMsg.IsSuccessStatusCode) { throw new HttpRequestException($"Response status code does not indicate success: {resMsg.StatusCode}"); } await ProduceMessageAsync(resMsg).ConfigureAwait(false); } public async virtual Task PostAsync(string uri, string content, CancellationToken cancellationToken) { var httpContent = new StringContent(content); var resMsg = await HttpClient.PostAsync(AppendRandom(uri), httpContent, cancellationToken).ConfigureAwait(false); await ProduceMessageAsync(resMsg).ConfigureAwait(false); } public abstract Task PostAsync(string uri, IEnumerable bytes, CancellationToken cancellationToken); private async Task ProduceMessageAsync(HttpResponseMessage resMsg) { if (resMsg.Content.Headers.ContentType.MediaType == "application/octet-stream") { byte[] bytes = await resMsg.Content.ReadAsByteArrayAsync().ConfigureAwait(false); ProduceBytes(bytes); } else { string text = await resMsg.Content.ReadAsStringAsync().ConfigureAwait(false); ProduceText(text); } } protected abstract void ProduceText(string text); private void ProduceBytes(byte[] bytes) { int i = 0; while (bytes.Length > i + 4) { byte type = bytes[i]; var builder = new StringBuilder(); i++; while (bytes[i] != byte.MaxValue) { builder.Append(bytes[i]); i++; } i++; int length = int.Parse(builder.ToString()); if (type == 0) { var buffer = new byte[length]; Buffer.BlockCopy(bytes, i, buffer, 0, buffer.Length); TextSubject.OnNext(Encoding.UTF8.GetString(buffer)); } else if (type == 1) { var buffer = new byte[length - 1]; Buffer.BlockCopy(bytes, i + 1, buffer, 0, buffer.Length); BytesSubject.OnNext(buffer); } i += length; } } public void Dispose() { TextSubject.Dispose(); BytesSubject.Dispose(); } } }