HttpPollingHandler.cs 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Net.Http;
  4. using System.Reactive.Linq;
  5. using System.Reactive.Subjects;
  6. using System.Text;
  7. using System.Threading;
  8. using System.Threading.Tasks;
  9. namespace SocketIOClient.Transport
  10. {
  11. public abstract class HttpPollingHandler : IHttpPollingHandler
  12. {
  13. public HttpPollingHandler(HttpClient httpClient)
  14. {
  15. HttpClient = httpClient;
  16. TextSubject = new Subject<string>();
  17. BytesSubject = new Subject<byte[]>();
  18. TextObservable = TextSubject.AsObservable();
  19. BytesObservable = BytesSubject.AsObservable();
  20. }
  21. protected HttpClient HttpClient { get; }
  22. protected Subject<string> TextSubject{get;}
  23. protected Subject<byte[]> BytesSubject{get;}
  24. public IObservable<string> TextObservable { get; }
  25. public IObservable<byte[]> BytesObservable { get; }
  26. protected string AppendRandom(string uri)
  27. {
  28. return uri + "&t=" + DateTimeOffset.Now.ToUnixTimeSeconds();
  29. }
  30. public async Task GetAsync(string uri, CancellationToken cancellationToken)
  31. {
  32. var req = new HttpRequestMessage(HttpMethod.Get, AppendRandom(uri));
  33. var resMsg = await HttpClient.SendAsync(req, cancellationToken).ConfigureAwait(false);
  34. if (!resMsg.IsSuccessStatusCode)
  35. {
  36. throw new HttpRequestException($"Response status code does not indicate success: {resMsg.StatusCode}");
  37. }
  38. await ProduceMessageAsync(resMsg).ConfigureAwait(false);
  39. }
  40. public async Task SendAsync(HttpRequestMessage req, CancellationToken cancellationToken)
  41. {
  42. var resMsg = await HttpClient.SendAsync(req, cancellationToken).ConfigureAwait(false);
  43. if (!resMsg.IsSuccessStatusCode)
  44. {
  45. throw new HttpRequestException($"Response status code does not indicate success: {resMsg.StatusCode}");
  46. }
  47. await ProduceMessageAsync(resMsg).ConfigureAwait(false);
  48. }
  49. public async virtual Task PostAsync(string uri, string content, CancellationToken cancellationToken)
  50. {
  51. var httpContent = new StringContent(content);
  52. var resMsg = await HttpClient.PostAsync(AppendRandom(uri), httpContent, cancellationToken).ConfigureAwait(false);
  53. await ProduceMessageAsync(resMsg).ConfigureAwait(false);
  54. }
  55. public abstract Task PostAsync(string uri, IEnumerable<byte[]> bytes, CancellationToken cancellationToken);
  56. private async Task ProduceMessageAsync(HttpResponseMessage resMsg)
  57. {
  58. if (resMsg.Content.Headers.ContentType.MediaType == "application/octet-stream")
  59. {
  60. byte[] bytes = await resMsg.Content.ReadAsByteArrayAsync().ConfigureAwait(false);
  61. ProduceBytes(bytes);
  62. }
  63. else
  64. {
  65. string text = await resMsg.Content.ReadAsStringAsync().ConfigureAwait(false);
  66. ProduceText(text);
  67. }
  68. }
  69. protected abstract void ProduceText(string text);
  70. private void ProduceBytes(byte[] bytes)
  71. {
  72. int i = 0;
  73. while (bytes.Length > i + 4)
  74. {
  75. byte type = bytes[i];
  76. var builder = new StringBuilder();
  77. i++;
  78. while (bytes[i] != byte.MaxValue)
  79. {
  80. builder.Append(bytes[i]);
  81. i++;
  82. }
  83. i++;
  84. int length = int.Parse(builder.ToString());
  85. if (type == 0)
  86. {
  87. var buffer = new byte[length];
  88. Buffer.BlockCopy(bytes, i, buffer, 0, buffer.Length);
  89. TextSubject.OnNext(Encoding.UTF8.GetString(buffer));
  90. }
  91. else if (type == 1)
  92. {
  93. var buffer = new byte[length - 1];
  94. Buffer.BlockCopy(bytes, i + 1, buffer, 0, buffer.Length);
  95. BytesSubject.OnNext(buffer);
  96. }
  97. i += length;
  98. }
  99. }
  100. public void Dispose()
  101. {
  102. TextSubject.Dispose();
  103. BytesSubject.Dispose();
  104. }
  105. }
  106. }