123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118 |
- using System;
- using System.Collections.Generic;
- using System.Net.Http;
- using System.Reactive.Linq;
- using System.Reactive.Subjects;
- using System.Text;
- using System.Threading;
- using System.Threading.Tasks;
- namespace SocketIOClient.Transport
- {
- public abstract class HttpPollingHandler : IHttpPollingHandler
- {
- public HttpPollingHandler(HttpClient httpClient)
- {
- HttpClient = httpClient;
- TextSubject = new Subject<string>();
- BytesSubject = new Subject<byte[]>();
- TextObservable = TextSubject.AsObservable();
- BytesObservable = BytesSubject.AsObservable();
- }
- protected HttpClient HttpClient { get; }
- protected Subject<string> TextSubject{get;}
- protected Subject<byte[]> BytesSubject{get;}
- public IObservable<string> TextObservable { get; }
- public IObservable<byte[]> BytesObservable { get; }
- protected string AppendRandom(string uri)
- {
- return uri + "&t=" + DateTimeOffset.Now.ToUnixTimeSeconds();
- }
- public async Task GetAsync(string uri, CancellationToken cancellationToken)
- {
- var req = new HttpRequestMessage(HttpMethod.Get, AppendRandom(uri));
- var resMsg = await HttpClient.SendAsync(req, cancellationToken).ConfigureAwait(false);
- if (!resMsg.IsSuccessStatusCode)
- {
- throw new HttpRequestException($"Response status code does not indicate success: {resMsg.StatusCode}");
- }
- await ProduceMessageAsync(resMsg).ConfigureAwait(false);
- }
- public async Task SendAsync(HttpRequestMessage req, CancellationToken cancellationToken)
- {
- var resMsg = await HttpClient.SendAsync(req, cancellationToken).ConfigureAwait(false);
- if (!resMsg.IsSuccessStatusCode)
- {
- throw new HttpRequestException($"Response status code does not indicate success: {resMsg.StatusCode}");
- }
- await ProduceMessageAsync(resMsg).ConfigureAwait(false);
- }
- public async virtual Task PostAsync(string uri, string content, CancellationToken cancellationToken)
- {
- var httpContent = new StringContent(content);
- var resMsg = await HttpClient.PostAsync(AppendRandom(uri), httpContent, cancellationToken).ConfigureAwait(false);
- await ProduceMessageAsync(resMsg).ConfigureAwait(false);
- }
- public abstract Task PostAsync(string uri, IEnumerable<byte[]> bytes, CancellationToken cancellationToken);
- private async Task ProduceMessageAsync(HttpResponseMessage resMsg)
- {
- if (resMsg.Content.Headers.ContentType.MediaType == "application/octet-stream")
- {
- byte[] bytes = await resMsg.Content.ReadAsByteArrayAsync().ConfigureAwait(false);
- ProduceBytes(bytes);
- }
- else
- {
- string text = await resMsg.Content.ReadAsStringAsync().ConfigureAwait(false);
- ProduceText(text);
- }
- }
- protected abstract void ProduceText(string text);
- private void ProduceBytes(byte[] bytes)
- {
- int i = 0;
- while (bytes.Length > i + 4)
- {
- byte type = bytes[i];
- var builder = new StringBuilder();
- i++;
- while (bytes[i] != byte.MaxValue)
- {
- builder.Append(bytes[i]);
- i++;
- }
- i++;
- int length = int.Parse(builder.ToString());
- if (type == 0)
- {
- var buffer = new byte[length];
- Buffer.BlockCopy(bytes, i, buffer, 0, buffer.Length);
- TextSubject.OnNext(Encoding.UTF8.GetString(buffer));
- }
- else if (type == 1)
- {
- var buffer = new byte[length - 1];
- Buffer.BlockCopy(bytes, i + 1, buffer, 0, buffer.Length);
- BytesSubject.OnNext(buffer);
- }
- i += length;
- }
- }
- public void Dispose()
- {
- TextSubject.Dispose();
- BytesSubject.Dispose();
- }
- }
- }
|