using System; using System.Threading; using Best.HTTP.Hosts.Connections; using Best.HTTP.Shared.PlatformSupport.Memory; namespace Best.HTTP.Response { /// /// A blocking variant of the that allows clients to wait for downloaded data when the buffer is empty but not completed. /// /// /// /// The BlockingDownloadContentStream is a specialized variant of the designed to provide a blocking mechanism for clients waiting for downloaded data. /// This class is particularly useful when clients need to read from the stream, but the buffer is temporarily empty due to ongoing downloads. /// /// /// Key Features: /// /// /// Blocking Data Retrieval /// Provides a blocking method that allows clients to wait for data if the buffer is empty but not yet completed. /// /// /// Timeout Support /// The method accepts a timeout parameter, allowing clients to set a maximum wait time for data availability. /// /// /// Exception Handling /// Handles exceptions and errors that occur during download, ensuring that clients receive any relevant exception information. /// /// /// /// /// Clients can use the method to retrieve data from the stream, and if the buffer is empty, the method will block until new data is downloaded or a timeout occurs. /// This blocking behavior is particularly useful in scenarios where clients need to consume data sequentially but can't proceed until data is available. /// /// /// When the download is completed or if an error occurs during download, this stream allows clients to inspect the completion status and any associated exceptions, just like the base . /// /// public sealed class BlockingDownloadContentStream : DownloadContentStream { private AutoResetEvent _are = new AutoResetEvent(false); /// /// Initializes a new instance of the class. /// /// The HTTP response associated with this download stream. /// The maximum size of the internal buffer. /// Handler for notifying when buffer space becomes available. public BlockingDownloadContentStream(HTTPResponse response, long maxBuffered, IDownloadContentBufferAvailable bufferAvailableHandler) : base(response, maxBuffered, bufferAvailableHandler) { } /// /// Attempts to retrieve a downloaded content-segment from the stream, blocking if necessary until a segment is available. /// /// When this method returns, contains the instance representing the data, if available; otherwise, contains the value of . This parameter is passed uninitialized. /// true if a segment could be retrieved; otherwise, false. /// /// /// The TryTake function provides a blocking approach to retrieve data from the stream. /// If the stream has data available, it immediately returns the data. /// If there's no data available, the method will block until new data is downloaded or the buffer is marked as completed. /// /// /// This method is designed for scenarios where clients need to read from the stream sequentially and are willing to wait until data is available. /// It ensures that clients receive data as soon as it becomes available, without having to repeatedly check or poll the stream. /// /// public override bool TryTake(out BufferSegment segment) { segment = BufferSegment.Empty; while (!base.IsCompleted && segment == BufferSegment.Empty) segment = Take(); return segment != BufferSegment.Empty; } /// /// Returns with a download content-segment. If the stream is currently empty but not completed the execution is blocked until new data downloaded. /// A segment is an arbitrary length array of bytes the plugin could read in one operation, it can range from couple of bytes to kilobytes. /// /// A BufferSegment holding a reference to the byte[] containing the downloaded data, offset and count of bytes in the array. /// The stream is disposed. /// The stream is empty and marked as completed. public BufferSegment Take() => Take(TimeSpan.FromMilliseconds(-1)); /// /// Returns with a download content-segment. If the stream is currently empty but not completed the execution is blocked until new data downloaded or the timeout is reached. /// A segment is an arbitrary length array of bytes the plugin could read in one operation, it can range from couple of bytes to kilobytes. /// /// A TimeSpan that represents the number of milliseconds to wait, or a TimeSpan that represents -1 milliseconds to wait indefinitely. /// A BufferSegment holding a reference to the byte[] containing the downloaded data, offset and count of bytes in the array. In case of a timeout, BufferSegment.Empty returned. /// The stream is disposed. /// The stream is empty and marked as completed. public BufferSegment Take(TimeSpan timeout) { this.IsDetached = true; while (!base.IsCompleted) { if (this._isDisposed) throw new ObjectDisposedException(GetType().FullName); if (this._exceptionInfo != null) this._exceptionInfo.Throw(); if (this._segments.TryDequeue(out var segment) && segment.Count > 0) { #pragma warning disable 0197 Interlocked.Add(ref base._length, -segment.Count); #pragma warning restore this._bufferAvailableHandler?.BufferAvailable(this); return segment; } if (base._isCompleted) throw new InvalidOperationException("The stream is empty and marked as completed!"); if (WaitForEvent(timeout)) /*!this._are.WaitOne(timeout)*/ return BufferSegment.Empty; } return BufferSegment.Empty; } /// /// Reads a sequence of bytes from the current stream and advances the position within the stream by the number of bytes read. /// /// /// /// This override of the method provides blocking behavior, meaning if there are no bytes available in the stream, the method will block until new data is downloaded or until the stream completes. Once data is available, or if the stream completes, the method will return with the number of bytes read. /// /// /// This behavior ensures that consumers of the stream can continue reading data sequentially, even if the stream's internal buffer is temporarily empty due to ongoing downloads. /// /// /// An array of bytes. When this method returns, the buffer contains the specified byte array with the values between and ( + - 1) replaced by the bytes read from the current source. /// The zero-based byte offset in at which to begin storing the data read from the current stream. /// The maximum number of bytes to be read from the current stream. /// /// The total number of bytes read into the buffer. This can be less than the number of bytes requested if that many bytes are not currently available, or zero if the end of the stream is reached. /// public override int Read(byte[] buffer, int offset, int count) { if (this._isDisposed) throw new ObjectDisposedException(GetType().FullName); int readCount = base.Read(buffer, offset, count); while (readCount == 0 && !IsCompleted) { //this._are?.WaitOne(); WaitForEvent(TimeSpan.FromMilliseconds(-1)); readCount = base.Read(buffer, offset, count); } return readCount; } public override void Write(BufferSegment segment) { if (this._isDisposed) throw new ObjectDisposedException(GetType().FullName); base.Write(segment); this._are?.Set(); } internal override void CompleteAdding(Exception error) { if (this._isDisposed) throw new ObjectDisposedException(GetType().FullName); base.CompleteAdding(error); this._are?.Set(); } /// /// Instead of calling WaitOne once for the total duration of the timeout, /// periodically check whether we are disposed or not. /// private bool WaitForEvent(TimeSpan timeoutTS) { const int CHECK_PERIOD = 100; var resetEvent = this._are; if (this._isDisposed || resetEvent == null) throw new ObjectDisposedException(GetType().FullName); var timeout = (int)timeoutTS.TotalMilliseconds; if (timeout < 0) timeout = int.MaxValue; while (!this._isDisposed && timeout > 0) { int waitTime = Math.Min((int)timeout, CHECK_PERIOD); // there's a race condition between checking _isDisposed and using resetEvent. // There are two cases: // 1.) resetEvent is already disposed when calling WaitOne in case it will throw an exception // 2.) resetEvent will be disposed while blocking in WaitOne and in this case it will unblock and exit from the while cycle because of _isDisposed is true if (resetEvent.WaitOne(waitTime)) return true; timeout -= CHECK_PERIOD; } return false; } protected override void Dispose(bool disposing) { base.Dispose(disposing); this._are?.Dispose(); this._are = null; } } }