BufferedReadNetworkStream.cs 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375
  1. using System;
  2. using System.IO;
  3. using BestHTTP.Extensions;
  4. namespace BestHTTP.Connections
  5. {
  6. public sealed class BufferedReadNetworkStream : Stream
  7. {
  8. #region Network Stats
  9. public static long TotalNetworkBytesReceived { get => _totalNetworkBytesReceived; }
  10. private static long _totalNetworkBytesReceived;
  11. internal static void IncrementTotalNetworkBytesReceived(int amount) => System.Threading.Interlocked.Add(ref _totalNetworkBytesReceived, amount);
  12. public static long TotalNetworkBytesSent { get => _totalNetworkBytesSent; }
  13. private static long _totalNetworkBytesSent;
  14. internal static void IncrementTotalNetworkBytesSent(int amount) => System.Threading.Interlocked.Add(ref _totalNetworkBytesSent, amount);
  15. public static int TotalConnections { get => _totalConnections; }
  16. private static int _totalConnections;
  17. public static int OpenConnections { get => _openConnections; }
  18. private static int _openConnections;
  19. internal static void IncrementCurrentConnections()
  20. {
  21. System.Threading.Interlocked.Increment(ref _totalConnections);
  22. System.Threading.Interlocked.Increment(ref _openConnections);
  23. }
  24. internal static void DecrementCurrentConnections() => System.Threading.Interlocked.Decrement(ref _openConnections);
  25. internal static void ResetNetworkStats()
  26. {
  27. System.Threading.Interlocked.Exchange(ref _totalNetworkBytesReceived, 0);
  28. System.Threading.Interlocked.Exchange(ref _totalNetworkBytesSent, 0);
  29. System.Threading.Interlocked.Exchange(ref _totalConnections, 0);
  30. System.Threading.Interlocked.Exchange(ref _openConnections, 0);
  31. }
  32. #endregion
  33. public override bool CanRead { get { throw new NotImplementedException(); } }
  34. public override bool CanSeek { get { throw new NotImplementedException(); } }
  35. public override bool CanWrite { get { throw new NotImplementedException(); } }
  36. public override long Length { get { throw new NotImplementedException(); } }
  37. public override long Position { get { throw new NotImplementedException(); } set { throw new NotImplementedException(); } }
  38. private ReadOnlyBufferedStream readStream;
  39. private Stream innerStream;
  40. public BufferedReadNetworkStream(Stream stream, int bufferSize)
  41. {
  42. this.innerStream = stream;
  43. this.readStream = new ReadOnlyBufferedStream(stream, bufferSize);
  44. IncrementCurrentConnections();
  45. }
  46. public override void Flush()
  47. {
  48. }
  49. public override int Read(byte[] buffer, int offset, int count)
  50. {
  51. int read = this.readStream.Read(buffer, offset, count);
  52. IncrementTotalNetworkBytesReceived(read);
  53. return read;
  54. }
  55. public override long Seek(long offset, SeekOrigin origin)
  56. {
  57. throw new NotImplementedException();
  58. }
  59. public override void SetLength(long value)
  60. {
  61. throw new NotImplementedException();
  62. }
  63. public override void Write(byte[] buffer, int offset, int count)
  64. {
  65. IncrementTotalNetworkBytesSent(count);
  66. this.innerStream.Write(buffer, offset, count);
  67. }
  68. public override void Close()
  69. {
  70. base.Close();
  71. if (this.innerStream != null)
  72. {
  73. this.innerStream.Close();
  74. this.innerStream = null;
  75. DecrementCurrentConnections();
  76. }
  77. }
  78. }
  79. // Non-used experimental stream. Reading from the inner stream is done parallel and Read is blocked if no data is buffered.
  80. // Additionally BC reads 5 bytes for the TLS header, than the payload. Buffering data from the network could save at least one context switch per TLS message.
  81. // In theory it, could help as reading from the network could be done parallel with TLS decryption.
  82. // However, if decrypting data is done faster than data is coming on the network, waiting for data longer and letting SpinWait to go deep-sleep it's going to
  83. // resume the thread milliseconds after new data is available. Those little afters are adding up and actually slowing down the download.
  84. // Not using locking just calling TryDequeue until there's data would solve the slow-down, but with the price of using 100% CPU of a core.
  85. // The whole struggle might worth it if Unity would implement SocketAsyncEventArgs properly.
  86. //sealed class BufferedReadNetworkStream : Stream
  87. //{
  88. // public override bool CanRead => throw new NotImplementedException();
  89. //
  90. // public override bool CanSeek => throw new NotImplementedException();
  91. //
  92. // public override bool CanWrite => throw new NotImplementedException();
  93. //
  94. // public override long Length => throw new NotImplementedException();
  95. //
  96. // public override long Position { get => throw new NotImplementedException(); set => throw new NotImplementedException(); }
  97. //
  98. // byte[] buf;
  99. // int available = 0;
  100. // int pos = 0;
  101. //
  102. // private System.Net.Sockets.Socket client;
  103. // int readBufferSize;
  104. // int bufferSize;
  105. // private System.Threading.SpinWait spinWait = new System.Threading.SpinWait();
  106. //
  107. // System.Collections.Concurrent.ConcurrentQueue<BufferSegment> downloadedData = new System.Collections.Concurrent.ConcurrentQueue<BufferSegment>();
  108. // private int downloadedBytes;
  109. // private System.Threading.SpinWait downWait = new System.Threading.SpinWait();
  110. // private int closed = 0;
  111. //
  112. // //System.Net.Sockets.SocketAsyncEventArgs socketAsyncEventArgs = new System.Net.Sockets.SocketAsyncEventArgs();
  113. //
  114. // //DateTime started;
  115. //
  116. // public BufferedReadNetworkStream(System.Net.Sockets.Socket socket, int readBufferSize, int bufferSize)
  117. // {
  118. // this.client = socket;
  119. // this.readBufferSize = readBufferSize;
  120. // this.bufferSize = bufferSize;
  121. //
  122. // //this.socketAsyncEventArgs.AcceptSocket = this.client;
  123. // //
  124. // //var buffer = BufferPool.Get(this.readBufferSize, true);
  125. // //this.socketAsyncEventArgs.SetBuffer(buffer, 0, buffer.Length);
  126. // //
  127. // ////var bufferList = new List<ArraySegment<byte>>();
  128. // ////for (int i = 0; i < 1; i++)
  129. // ////{
  130. // //// var buffer = BufferPool.Get(this.readBufferSize, true);
  131. // //// bufferList.Add(new ArraySegment<byte>(buffer));
  132. // ////}
  133. // ////this.socketAsyncEventArgs.BufferList = bufferList;
  134. // //
  135. // //this.socketAsyncEventArgs.Completed += SocketAsyncEventArgs_Completed;
  136. // //
  137. // //this.started = DateTime.Now;
  138. // //if (!this.client.ReceiveAsync(this.socketAsyncEventArgs))
  139. // // SocketAsyncEventArgs_Completed(null, this.socketAsyncEventArgs);
  140. //
  141. // BestHTTP.PlatformSupport.Threading.ThreadedRunner.RunShortLiving(() =>
  142. // {
  143. // DateTime started = DateTime.Now;
  144. // try
  145. // {
  146. // while (closed == 0)
  147. // {
  148. // var buffer = BufferPool.Get(this.readBufferSize, true);
  149. //
  150. // int count = this.client.Receive(buffer, 0, buffer.Length, System.Net.Sockets.SocketFlags.None);
  151. // //int count = 0;
  152. // //unsafe {
  153. // // fixed (byte* pBuffer = buffer)
  154. // // {
  155. // // int zero = 0;
  156. // // count = recvfrom(this.client.Handle, pBuffer, buffer.Length, SocketFlags.None, null, ref zero);
  157. // // }
  158. // //}
  159. //
  160. // this.downloadedData.Enqueue(new BufferSegment(buffer, 0, count));
  161. // System.Threading.Interlocked.Add(ref downloadedBytes, count);
  162. //
  163. // if (HTTPManager.Logger.Level <= Logger.Loglevels.Warning)
  164. // HTTPManager.Logger.Warning(nameof(BufferedReadNetworkStream), $"read count: {count:N0} downloadedBytes: {downloadedBytes:N0} / {this.bufferSize:N0}");
  165. //
  166. // if (count <= 0)
  167. // {
  168. // System.Threading.Interlocked.Exchange(ref closed, 1);
  169. // return;
  170. // }
  171. //
  172. // while (downloadedBytes >= this.bufferSize)
  173. // {
  174. // downWait.SpinOnce();
  175. // }
  176. // }
  177. // }
  178. // catch (Exception ex)
  179. // {
  180. // UnityEngine.Debug.LogException(ex);
  181. // }
  182. // finally
  183. // {
  184. // UnityEngine.Debug.Log($"Reading finished in {(DateTime.Now - started)}");
  185. // }
  186. // });
  187. // }
  188. //
  189. // //private void SocketAsyncEventArgs_Completed(object sender, System.Net.Sockets.SocketAsyncEventArgs e)
  190. // //{
  191. // // this.downloadedData.Enqueue(new BufferSegment(e.Buffer, 0, e.BytesTransferred));
  192. // //
  193. // // if (e.BytesTransferred == 0)
  194. // // {
  195. // // UnityEngine.Debug.Log($"Reading finished in {(DateTime.Now - started)}");
  196. // // return;
  197. // // }
  198. // //
  199. // // int down = System.Threading.Interlocked.Add(ref downloadedBytes, e.BytesTransferred);
  200. // //
  201. // // if (HTTPManager.Logger.Level <= Logger.Loglevels.Warning)
  202. // // HTTPManager.Logger.Warning(nameof(BufferedReadNetworkStream), $"SocketAsyncEventArgs_Completed - read count: {e.BytesTransferred:N0} downloadedBytes: {down:N0} / {this.bufferSize:N0}");
  203. // //
  204. // // var buffer = BufferPool.Get(this.readBufferSize, true);
  205. // // this.socketAsyncEventArgs.SetBuffer(buffer, 0, buffer.Length);
  206. // //
  207. // // if (!this.client.ReceiveAsync(this.socketAsyncEventArgs))
  208. // // SocketAsyncEventArgs_Completed(null, this.socketAsyncEventArgs);
  209. // //}
  210. //
  211. // private void SwitchBuffers(bool waitForData)
  212. // {
  213. // //HTTPManager.Logger.Error("Read", $"{this.downloadedData.Count}");
  214. // BufferSegment segment;
  215. // while (!this.downloadedData.TryDequeue(out segment))
  216. // {
  217. // if (waitForData && closed == 0)
  218. // {
  219. // if (HTTPManager.Logger.Level <= Logger.Loglevels.Error)
  220. // HTTPManager.Logger.Error(nameof(BufferedReadNetworkStream), $"SpinOnce");
  221. // this.spinWait.SpinOnce();
  222. // }
  223. // else
  224. // return;
  225. // }
  226. //
  227. // //if (segment.Count <= 0)
  228. // // throw new Exception("Connection closed!");
  229. //
  230. // if (buf != null)
  231. // BufferPool.Release(buf);
  232. //
  233. // System.Threading.Interlocked.Add(ref downloadedBytes, -segment.Count);
  234. //
  235. // buf = segment.Data;
  236. // available = segment.Count;
  237. // pos = 0;
  238. // }
  239. //
  240. // public override int Read(byte[] buffer, int offset, int size)
  241. // {
  242. // if (this.buf == null)
  243. // {
  244. // SwitchBuffers(true);
  245. // }
  246. //
  247. // if (size <= available)
  248. // {
  249. // Array.Copy(buf, pos, buffer, offset, size);
  250. // available -= size;
  251. // pos += size;
  252. //
  253. // if (available == 0)
  254. // {
  255. // SwitchBuffers(false);
  256. // }
  257. //
  258. // return size;
  259. // }
  260. // else
  261. // {
  262. // int readcount = 0;
  263. // if (available > 0)
  264. // {
  265. // Array.Copy(buf, pos, buffer, offset, available);
  266. // offset += available;
  267. // readcount += available;
  268. // available = 0;
  269. // pos = 0;
  270. // }
  271. //
  272. // while (true)
  273. // {
  274. // try
  275. // {
  276. // SwitchBuffers(true);
  277. // }
  278. // catch (Exception ex)
  279. // {
  280. // if (readcount > 0)
  281. // {
  282. // return readcount;
  283. // }
  284. //
  285. // throw (ex);
  286. // }
  287. //
  288. // if (available < 1)
  289. // {
  290. // if (readcount > 0)
  291. // {
  292. // return readcount;
  293. // }
  294. //
  295. // return available;
  296. // }
  297. // else
  298. // {
  299. // int toread = size - readcount;
  300. // if (toread <= available)
  301. // {
  302. // Array.Copy(buf, pos, buffer, offset, toread);
  303. // available -= toread;
  304. // pos += toread;
  305. // readcount += toread;
  306. // return readcount;
  307. // }
  308. // else
  309. // {
  310. // Array.Copy(buf, pos, buffer, offset, available);
  311. // offset += available;
  312. // readcount += available;
  313. // pos = 0;
  314. // available = 0;
  315. // }
  316. // }
  317. // }
  318. // }
  319. // }
  320. //
  321. // public override long Seek(long offset, SeekOrigin origin)
  322. // {
  323. // throw new NotImplementedException();
  324. // }
  325. //
  326. // public override void SetLength(long value)
  327. // {
  328. // throw new NotImplementedException();
  329. // }
  330. //
  331. // public override void Write(byte[] buffer, int offset, int count)
  332. // {
  333. // this.client.Send(buffer, offset, count, System.Net.Sockets.SocketFlags.None);
  334. //
  335. // HTTPManager.Logger.Warning(nameof(BufferedReadNetworkStream), $"Wrote: {count}");
  336. // }
  337. //
  338. // public override void Close()
  339. // {
  340. // base.Close();
  341. //
  342. // //socketAsyncEventArgs.Dispose();
  343. // //socketAsyncEventArgs = null;
  344. // }
  345. //
  346. // public override void Flush()
  347. // {
  348. // }
  349. //}
  350. }