DownloadContentStream.cs 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295
  1. using System;
  2. using System.Collections.Concurrent;
  3. using System.IO;
  4. using System.Runtime.ExceptionServices;
  5. using System.Threading;
  6. using Best.HTTP.Hosts.Connections;
  7. using Best.HTTP.Shared;
  8. using Best.HTTP.Shared.Extensions;
  9. using Best.HTTP.Shared.PlatformSupport.Memory;
  10. namespace Best.HTTP.Response
  11. {
  12. /// <summary>
  13. /// A read-only stream that the plugin uses to store the downloaded content. This stream is designed to buffer downloaded data efficiently and provide it to consumers.
  14. /// </summary>
  15. /// <remarks>
  16. /// <para>
  17. /// The DownloadContentStream serves as a storage medium for content downloaded during HTTP requests.
  18. /// It buffers the downloaded data in segments and allows clients to read from the buffer as needed.
  19. /// This buffering mechanism is essential for optimizing download performance, especially in scenarios where the download rate may vary or be faster than the rate at which data is consumed.
  20. /// </para>
  21. /// <para>
  22. /// The stream operates in conjunction with the <see cref="IDownloadContentBufferAvailable"/> interface, which is used to signal connections when buffer space becomes available.
  23. /// Connections can then transfer additional data into the buffer for processing.
  24. /// </para>
  25. /// <para>
  26. /// <list type="bullet">
  27. /// <item>
  28. /// <term>Efficient Buffering</term>
  29. /// <description>The stream efficiently buffers downloaded content, ensuring that data is readily available for reading without extensive delays.</description>
  30. /// </item>
  31. /// <item>
  32. /// <term>Dynamic Resizing</term>
  33. /// <description>The internal buffer dynamically resizes to accommodate varying amounts of downloaded data, optimizing memory usage.</description>
  34. /// </item>
  35. /// <item>
  36. /// <term>Asynchronous Signal Handling</term>
  37. /// <description>Asynchronous signaling mechanisms are used to notify connections when buffer space is available, enabling efficient data transfer.</description>
  38. /// </item>
  39. /// <item>
  40. /// <term>Error Handling</term>
  41. /// <description>The stream captures and propagates errors that occur during download, allowing clients to handle exceptions gracefully.</description>
  42. /// </item>
  43. /// <item>
  44. /// <term>Blocking Variant</term>
  45. /// <description>A blocking variant, <see cref="BlockingDownloadContentStream"/>, allows clients to wait for data when the buffer is empty but not completed.</description>
  46. /// </item>
  47. /// </list>
  48. /// </para>
  49. /// <para>
  50. /// Clients can read from this stream using standard stream reading methods, and the stream will release memory segments as data is read.
  51. /// When the download is completed or if an error occurs during download, this stream allows clients to inspect the completion status and any associated exceptions.
  52. /// </para>
  53. /// </remarks>
  54. public class DownloadContentStream : Stream
  55. {
  56. /// <summary>
  57. /// Gets the HTTP response from which this download stream originated.
  58. /// </summary>
  59. public HTTPResponse Response { get; protected set; }
  60. /// <summary>
  61. /// Gets a value indicating whether the download is completed, and there's no more data buffered in the stream to read.
  62. /// </summary>
  63. public virtual bool IsCompleted { get => this._isCompleted && this._segments.Count == 0 && this._currentSegment == BufferSegment.Empty; }
  64. /// <summary>
  65. /// Gets a reference to an exception if the download completed with an error.
  66. /// </summary>
  67. public Exception CompletedWith { get => this._exceptionInfo?.SourceException; }
  68. /// <summary>
  69. /// Gets the length of the buffered data. Because downloads happen in parallel, a <see cref="Read(byte[], int, int)"/> call can return with more data after checking Length.
  70. /// </summary>
  71. public override long Length => Interlocked.Read(ref this._length);
  72. protected long _length;
  73. /// <summary>
  74. /// Gets the maximum size of the internal buffer of this stream.
  75. /// </summary>
  76. /// <remarks>In some cases, the plugin may put more data into the stream than the specified size.</remarks>
  77. public long MaxBuffered { get; protected set; }
  78. /// <summary>
  79. /// Gets a value indicating whether the internal buffer holds at least the <see cref="MaxBuffered"/> amount of data.
  80. /// </summary>
  81. public bool IsFull { get => this.Length >= this.MaxBuffered; }
  82. /// <summary>
  83. /// Gets or sets whether the stream is detached from the <see cref="HTTPRequest"/>/<see cref="HTTPResponse"/> when <see cref="Read(byte[], int, int)"/> is used before the request is finished.
  84. /// When the stream is detached from the response object, their lifetimes are not bound together,
  85. /// meaning that the stream isn't disposed automatically, and the client code is responsible for calling the stream's <see cref="System.IO.Stream.Dispose()"/> function.
  86. /// </summary>
  87. public bool IsDetached
  88. {
  89. get => this._isDetached;
  90. set
  91. {
  92. if (this._isDetached != value)
  93. {
  94. HTTPManager.Logger.Verbose(nameof(DownloadContentStream), $"IsDetached {this._isDetached} => {value}", this.Response?.Context);
  95. this._isDetached = value;
  96. }
  97. }
  98. }
  99. private bool _isDetached;
  100. /// <summary>
  101. /// There are cases where the plugin have to put more data into the buffer than its previously set maximum.
  102. /// For example when the underlying connection is closed, but the content provider still have buffered data,
  103. /// in witch case we have to push all processed data to the user facing download stream.
  104. /// </summary>
  105. public void EmergencyIncreaseMaxBuffered() => this.MaxBuffered = long.MaxValue;
  106. protected IDownloadContentBufferAvailable _bufferAvailableHandler;
  107. protected ConcurrentQueue<BufferSegment> _segments = new ConcurrentQueue<BufferSegment>();
  108. protected BufferSegment _currentSegment = BufferSegment.Empty;
  109. protected bool _isCompleted;
  110. protected ExceptionDispatchInfo _exceptionInfo;
  111. /// <summary>
  112. /// Count of consecutive calls with DoFullCheck that found the stream fully buffered.
  113. /// </summary>
  114. private int _isFullCheckCount;
  115. protected bool _isDisposed;
  116. /// <summary>
  117. /// Initializes a new instance of the DownloadContentStream class.
  118. /// </summary>
  119. /// <param name="response">The HTTP response associated with this download stream.</param>
  120. /// <param name="maxBuffered">The maximum size of the internal buffer.</param>
  121. /// <param name="bufferAvailableHandler">Handler for notifying when buffer space becomes available.</param>
  122. public DownloadContentStream(HTTPResponse response, long maxBuffered, IDownloadContentBufferAvailable bufferAvailableHandler)
  123. {
  124. this.Response = response;
  125. this.MaxBuffered = maxBuffered;
  126. this._bufferAvailableHandler = bufferAvailableHandler;
  127. }
  128. /// <summary>
  129. /// Completes the download stream with an optional error. Called when the download is finished.
  130. /// </summary>
  131. /// <param name="error">The exception that occurred during download, if any.</param>
  132. internal virtual void CompleteAdding(Exception error)
  133. {
  134. HTTPManager.Logger.Information(nameof(DownloadContentStream), $"CompleteAdding({error})", this.Response?.Context);
  135. this._isCompleted = true;
  136. this._exceptionInfo = error != null ? ExceptionDispatchInfo.Capture(error) : null;
  137. this._bufferAvailableHandler = null;
  138. }
  139. /// <summary>
  140. /// Tries to remove a downloaded segment from the stream. If the stream is empty, it returns immediately with false.
  141. /// </summary>
  142. /// <param name="segment">A <see cref="BufferSegment"/> containing the reference to a byte[] and the offset and count of the data in the array.</param>
  143. /// <returns><c>true</c> if a downloaded segment was available and could return with, otherwise <c>false</c></returns>
  144. public virtual bool TryTake(out BufferSegment segment)
  145. {
  146. if (this._isDisposed)
  147. throw new ObjectDisposedException(GetType().FullName);
  148. this.IsDetached = true;
  149. if (this._segments.TryDequeue(out segment) && segment.Count > 0)
  150. {
  151. Interlocked.Add(ref this._length, -segment.Count);
  152. this._bufferAvailableHandler?.BufferAvailable(this);
  153. return true;
  154. }
  155. return false;
  156. }
  157. /// <summary>
  158. /// A non-blocking Read function. When it returns <c>0</c>, it doesn't mean the download is complete. If the download interrupted before completing, the next Read call can throw an exception.
  159. /// </summary>
  160. /// <param name="buffer">The buffer to read data into.</param>
  161. /// <param name="offset">The zero-based byte offset in the buffer at which to begin copying bytes.</param>
  162. /// <param name="count">The maximum number of bytes to read.</param>
  163. /// <returns>The number of bytes copied to the buffer, or zero if no downloaded data is available at the time of the call.</returns>
  164. /// <exception cref="ObjectDisposedException">If the stream is already disposed.</exception>
  165. public override int Read(byte[] buffer, int offset, int count)
  166. {
  167. using var _ = new Unity.Profiling.ProfilerMarker($"{nameof(DownloadContentStream)}.{nameof(Read)}").Auto();
  168. if (this._isDisposed)
  169. throw new ObjectDisposedException(GetType().FullName);
  170. if (this._exceptionInfo != null)
  171. this._exceptionInfo.Throw();
  172. this.IsDetached = true;
  173. if (this._currentSegment == BufferSegment.Empty)
  174. this._segments.TryDequeue(out this._currentSegment);
  175. int sumReadCount = 0;
  176. while (sumReadCount < count && this._currentSegment != BufferSegment.Empty)
  177. {
  178. int readCount = Math.Min(count - sumReadCount, this._currentSegment.Count);
  179. Array.Copy(this._currentSegment.Data, this._currentSegment.Offset, buffer, offset, readCount);
  180. offset += readCount;
  181. sumReadCount += readCount;
  182. if (this._currentSegment.Count == readCount)
  183. {
  184. BufferPool.Release(this._currentSegment);
  185. if (!this._segments.TryDequeue(out this._currentSegment))
  186. this._currentSegment = BufferSegment.Empty;
  187. }
  188. else
  189. this._currentSegment = this._currentSegment.Slice(this._currentSegment.Offset + readCount);
  190. }
  191. Interlocked.Add(ref this._length, -sumReadCount);
  192. this._bufferAvailableHandler?.BufferAvailable(this);
  193. return sumReadCount;
  194. }
  195. /// <summary>
  196. /// Writes a downloaded data segment to the stream.
  197. /// </summary>
  198. /// <param name="segment">The downloaded data segment to write.</param>
  199. public virtual void Write(BufferSegment segment)
  200. {
  201. if (this._isDisposed)
  202. throw new ObjectDisposedException(GetType().FullName);
  203. if (segment.Count <= 0)
  204. return;
  205. this._segments.Enqueue(segment);
  206. Interlocked.Add(ref this._length, segment.Count);
  207. this._isFullCheckCount = 0;
  208. }
  209. /// <summary>
  210. /// Checks whether the stream is fully buffered and increases a counter if it's full, resetting it otherwise.
  211. /// </summary>
  212. /// <param name="limit">The limit for the full check counter.</param>
  213. /// <returns><c>true</c> if the counter is equal to or larger than the limit parameter; otherwise <c>false</c>.</returns>
  214. internal bool DoFullCheck(int limit)
  215. {
  216. if (IsFull)
  217. _isFullCheckCount++;
  218. else
  219. _isFullCheckCount = 0;
  220. return _isFullCheckCount >= limit;
  221. }
  222. /// <summary>
  223. /// Disposes of the stream, releasing any resources held by it.
  224. /// </summary>
  225. /// <param name="disposing"><c>true</c> to release both managed and unmanaged resources; <c>false</c> to release only unmanaged resources.</param>
  226. protected override void Dispose(bool disposing)
  227. {
  228. base.Dispose(disposing);
  229. if (this._isDisposed)
  230. return;
  231. this._isDisposed = true;
  232. using (var _ = new Unity.Profiling.ProfilerMarker("DownloadContentStream.Dispose").Auto())
  233. {
  234. BufferPool.Release(this._currentSegment);
  235. this._currentSegment = BufferSegment.Empty;
  236. BufferPool.ReleaseBulk(this._segments);
  237. this._segments.Clear();
  238. }
  239. }
  240. public override bool CanRead => true;
  241. public override bool CanSeek => false;
  242. public override bool CanWrite => false;
  243. public override long Position { get => throw new NotImplementedException(); set => throw new NotImplementedException(); }
  244. public override void Flush() => throw new NotImplementedException();
  245. public override long Seek(long offset, SeekOrigin origin) => throw new NotImplementedException();
  246. public override void SetLength(long value) => throw new NotImplementedException();
  247. public override void Write(byte[] buffer, int offset, int count) => throw new NotImplementedException();
  248. }
  249. }