DynamicUploadStream.cs 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. using System;
  2. using System.Collections.Concurrent;
  3. using System.Threading;
  4. using Best.HTTP.Hosts.Connections;
  5. using Best.HTTP.Shared.Extensions;
  6. using Best.HTTP.Shared.PlatformSupport.Memory;
  7. namespace Best.HTTP.Request.Upload
  8. {
  9. /// <summary>
  10. /// A specialized upload stream designed to handle data that's generated on-the-fly or periodically.
  11. /// </summary>
  12. /// <remarks>
  13. /// This implementation is designed to handle scenarios where data may not always be immediately available for upload.
  14. /// The request will remain active until the <see cref="Complete"/> method is invoked, ensuring that data can continue to be fed into the stream even if it's temporarily empty during a Read operation.
  15. /// </remarks>
  16. public sealed class DynamicUploadStream : UploadStreamBase
  17. {
  18. /// <summary>
  19. /// Gets the length of the upload stream.
  20. /// </summary>
  21. /// <remarks>
  22. /// This implementation returns a constant value of <c>-1</c>, indicating that the length of the data to be uploaded is unknown. When the processing connection encounters this value, it should utilize chunked uploading to handle the data transfer.
  23. /// </remarks>
  24. /// <value>The constant value of <c>-1</c>, representing unknown length.</value>
  25. public override long Length
  26. => BodyLengths.UnknownWithChunkedTransferEncoding;
  27. /// <summary>
  28. /// Gets the length of data currently buffered and ready for upload.
  29. /// </summary>
  30. /// <value>The length of buffered data in bytes.</value>
  31. public long BufferedLength
  32. => this._bufferedLength;
  33. private long _bufferedLength;
  34. private bool _isCompleted;
  35. private ConcurrentQueue<BufferSegment> _chunks = new ConcurrentQueue<BufferSegment>();
  36. private BufferSegment _current;
  37. private string _contentType;
  38. /// <summary>
  39. /// Initializes a new instance of the DynamicUploadStream class with an optional content type.
  40. /// </summary>
  41. /// <param name="contentType">The MIME type of the content to be uploaded. Defaults to "<c>application/octet-stream</c>" if not specified.</param>
  42. /// <remarks>
  43. /// This constructor allows the caller to specify the content type of the data to be uploaded. If not provided, it defaults to a general binary data type.
  44. /// </remarks>
  45. public DynamicUploadStream(string contentType = "application/octet-stream")
  46. => this._contentType = contentType;
  47. /// <summary>
  48. /// Sets the necessary headers before sending the request.
  49. /// </summary>
  50. public override void BeforeSendHeaders(HTTPRequest request)
  51. => request.SetHeader("content-type", this._contentType);
  52. /// <summary>
  53. /// Prepares the stream before the request body is sent.
  54. /// </summary>
  55. public override void BeforeSendBody(HTTPRequest request, IThreadSignaler threadSignaler)
  56. => base.BeforeSendBody(request, threadSignaler);
  57. /// <summary>
  58. /// Reads data from the stream to be uploaded.
  59. /// </summary>
  60. /// <remarks>
  61. /// The returned value indicates the state of the stream:
  62. /// <list type="bullet">
  63. /// <item><term>-1</term><description>More data is expected in the future, but isn't currently available. When new data is ready, the IThreadSignaler must be notified.</description></item>
  64. /// <item><term>0</term><description>The stream has been closed and no more data will be provided.</description></item>
  65. /// <item><description>Otherwise it returns with the number bytes copied to the buffer.</description></item>
  66. /// </list>
  67. /// Note: A zero return value can come after a -1 return, indicating a transition from waiting to completion.
  68. /// </remarks>
  69. public override int Read(byte[] buffer, int offset, int count)
  70. {
  71. int readCount = 0;
  72. while (readCount < count && (_current != BufferSegment.Empty || _chunks.TryDequeue(out _current)))
  73. {
  74. int copyCount = Math.Min(count - readCount, _current.Count);
  75. Array.Copy(_current.Data, _current.Offset, buffer, offset, copyCount);
  76. readCount += copyCount;
  77. offset += copyCount;
  78. if (_current.Offset + copyCount >= _current.Count)
  79. {
  80. BufferPool.Release(_current);
  81. _current = BufferSegment.Empty;
  82. }
  83. else
  84. {
  85. _current = _current.Slice(_current.Offset + copyCount);
  86. }
  87. }
  88. if (!this._isCompleted && readCount == 0)
  89. return UploadReadConstants.WaitForMore;
  90. Interlocked.Add(ref this._bufferedLength, -readCount);
  91. return readCount;
  92. }
  93. /// <summary>
  94. /// Writes data to the stream, making it available for upload.
  95. /// </summary>
  96. /// <remarks>
  97. /// After writing data to the stream using this method, the connection is signaled that data is available to send.
  98. /// </remarks>
  99. /// <param name="buffer">The array of unsigned bytes from which to copy count bytes to the current stream.</param>
  100. /// <param name="offset">The zero-based byte offset in buffer at which to begin copying bytes to the current stream.</param>
  101. /// <param name="count">The number of bytes to be written to the current stream.</param>
  102. /// <exception cref="InvalidOperationException">Thrown when trying to write after the stream has been marked as complete.</exception>
  103. public override void Write(byte[] buffer, int offset, int count)
  104. {
  105. if (buffer == null)
  106. throw new ArgumentNullException(nameof(buffer));
  107. if (this._isCompleted)
  108. throw new InvalidOperationException("Complete() already called on the stream!");
  109. var localCopy = BufferPool.Get(count, true, base.Signaler?.Context);
  110. Array.Copy(buffer, 0, localCopy, offset, count);
  111. Write(localCopy.AsBuffer(count));
  112. }
  113. /// <summary>
  114. /// Writes a segment of data to the stream, making it available for upload.
  115. /// </summary>
  116. /// <param name="segment">A segment of data to be written to the stream.</param>
  117. /// <exception cref="InvalidOperationException">Thrown when trying to write after the stream has been marked as complete.</exception>
  118. /// <remarks>
  119. /// After writing a segment to the stream using this method, the connection is signaled that data is available to send.
  120. /// </remarks>
  121. public void Write(BufferSegment segment)
  122. {
  123. if (this._isCompleted)
  124. throw new InvalidOperationException("Complete() already called on the stream!");
  125. if (segment.Data == null)
  126. return;
  127. this._chunks.Enqueue(segment);
  128. Interlocked.Add(ref this._bufferedLength, segment.Count);
  129. this.Signaler?.SignalThread();
  130. }
  131. /// <summary>
  132. /// Marks the stream as complete, signaling that no more data will be added.
  133. /// </summary>
  134. /// <remarks>
  135. /// All remaining buffered data will be sent to the server.
  136. /// </remarks>
  137. public void Complete()
  138. {
  139. if (this._isCompleted)
  140. throw new InvalidOperationException("Complete() already called on the stream!");
  141. this._isCompleted = true;
  142. base.Signaler?.SignalThread();
  143. }
  144. public override bool CanRead => true;
  145. public override bool CanSeek => false;
  146. public override bool CanWrite => true;
  147. public override long Position { get => throw new System.NotImplementedException(); set => throw new System.NotImplementedException(); }
  148. public override long Seek(long offset, System.IO.SeekOrigin origin) => throw new System.NotImplementedException();
  149. public override void SetLength(long value) => throw new System.NotImplementedException();
  150. public override void Flush() { }
  151. }
  152. }