using System; using System.Collections.Generic; using System.Threading; using Best.HTTP.Request.Timings; using Best.HTTP.Response; using Best.HTTP.Response.Decompression; using Best.HTTP.Shared; using Best.HTTP.Shared.Extensions; using Best.HTTP.Shared.PlatformSupport.Memory; using Best.HTTP.Shared.Streams; using static Best.HTTP.Hosts.Connections.HTTP1.Constants; using static Best.HTTP.Response.HTTPStatusCodes; namespace Best.HTTP.Hosts.Connections.HTTP1 { /// /// An HTTP 1.1 response implementation that can utilize a peekable stream. /// Its main entry point is the ProcessPeekable method that should be called after every chunk of data downloaded. /// public class PeekableHTTP1Response : HTTPResponse { public PeekableReadState ReadState { get => this._readState; private set { if (this._readState != value) HTTPManager.Logger.Information(nameof(PeekableHTTP1Response), $"{this._readState} => {value}", this.Context); this._readState = value; } } private PeekableReadState _readState; public bool ForceDepleteContent; public enum ContentDeliveryMode { Raw, RawUnknownLength, Chunked, } public enum PeekableReadState { StatusLine, Headers, WaitForContentSent, // when received a 100-continue PrepareForContent, ContentSetup, Content, Finished } public ContentDeliveryMode DeliveryMode => this._deliveryMode; private ContentDeliveryMode _deliveryMode; private long _expectedLength; private Dictionary> _newHeaders; long _downloaded = 0; IDecompressor _decompressor = null; bool _compressed = false; bool sendProgressChanged; int _chunkLength = -1; enum ReadChunkedStates { ReadChunkLength, ReadChunk, ReadTrailingCRLF, ReadTrailingHeaders } ReadChunkedStates _readChunkedState = ReadChunkedStates.ReadChunkLength; IDownloadContentBufferAvailable _bufferAvailableHandler; public PeekableHTTP1Response(HTTPRequest request, bool isFromCache, IDownloadContentBufferAvailable bufferAvailableHandler) : base(request, isFromCache) { this._bufferAvailableHandler = bufferAvailableHandler; } private int _isProccessing; public void ProcessPeekable(PeekableContentProviderStream peekable) { // To avoid executing ProcessPeekable in parallel on two threads, do an atomic CompareExchange and return if the old value wasn't 0. if (Interlocked.CompareExchange(ref this._isProccessing, 1, 0) != 0) return; if (HTTPManager.Logger.IsDiagnostic) HTTPManager.Logger.Verbose(nameof(PeekableHTTP1Response), $"ProcessPeekable({this.ReadState}, {peekable.Length})", this.Context); try { // The first call after setting it to PeekableReadState.WaitForContentSent is after the the client could send its content. // This also works when "If the request did not contain an Expect header field containing the 100-continue expectation, // the client can simply discard this interim response." // (https://www.rfc-editor.org/rfc/rfc9110#section-15.2.1-3) if (this._readState == PeekableReadState.WaitForContentSent) { this._newHeaders?.Clear(); this.Headers?.Clear(); this._readState = PeekableReadState.StatusLine; } // It's an unexpected network closure, except when we reading the content in the RawUnknownLength delivery mode. if (peekable == null && ReadState != PeekableReadState.Content && this._deliveryMode != ContentDeliveryMode.RawUnknownLength) throw new Exception("Server closed the connection unexpectedly!"); switch (ReadState) { case PeekableReadState.StatusLine: if (!IsNewLinePresent(peekable)) return; Request.Timing.StartNext(TimingEventNames.Headers); var statusLine = HTTPResponse.ReadTo(peekable, (byte)' '); string[] versions = statusLine.Split(new char[] { '/', '.' }); this.HTTPVersion = new Version(int.Parse(versions[1]), int.Parse(versions[2])); int statusCode; string statusCodeStr = NoTrimReadTo(peekable, (byte)' ', LF); if (!int.TryParse(statusCodeStr, out statusCode)) throw new Exception($"Couldn't parse '{statusCodeStr}' as a status code!"); this.StatusCode = statusCode; if (statusCodeStr.Length > 0 && (byte)statusCodeStr[statusCodeStr.Length - 1] != LF && (byte)statusCodeStr[statusCodeStr.Length - 1] != CR) this.Message = ReadTo(peekable, LF); else { HTTPManager.Logger.Warning(nameof(PeekableHTTP1Response), "Skipping Status Message reading!", this.Context); this.Message = string.Empty; } if (HTTPManager.Logger.IsDiagnostic) HTTPManager.Logger.Verbose(nameof(PeekableHTTP1Response), $"HTTP/'{this.HTTPVersion}' '{this.StatusCode}' '{this.Message}'", this.Context); if (this.Request?.DownloadSettings?.OnHeadersReceived != null) this._newHeaders = new Dictionary>(StringComparer.OrdinalIgnoreCase); this.ReadState = PeekableReadState.Headers; goto case PeekableReadState.Headers; case PeekableReadState.Headers: ProcessReadHeaders(peekable, PeekableReadState.PrepareForContent); if (this.ReadState == PeekableReadState.PrepareForContent) { #if !UNITY_WEBGL || UNITY_EDITOR // When upgraded, we don't want to read the content here, so set the state to Finished. if (this.StatusCode == 101 && (HasHeaderWithValue("connection", "upgrade") || HasHeader("upgrade")) && this.Request?.DownloadSettings?.OnUpgraded != null) { HTTPManager.Logger.Information(nameof(PeekableHTTP1Response), "Request Upgraded!", this.Context); this.IsUpgraded = this.Request.DownloadSettings.OnUpgraded(this.Request, this, peekable); if (this.IsUpgraded) { this._readState = PeekableReadState.Finished; goto case PeekableReadState.Finished; } } #endif // If it's a 100-continue, restart reading the response after the client could send its content. if (this.StatusCode == Continue) { this._readState = PeekableReadState.WaitForContentSent; break; } // https://www.rfc-editor.org/rfc/rfc9110#name-informational-1xx // A 1xx response is terminated by the end of the header section; it cannot contain content or trailers. if ((this.StatusCode >= Continue && this.StatusCode < OK) || // https://www.rfc-editor.org/rfc/rfc9110#name-204-no-content // A 204 response is terminated by the end of the header section; it cannot contain content or trailers. this.StatusCode == NoContent || // https://www.rfc-editor.org/rfc/rfc9110#name-304-not-modified // A 304 response is terminated by the end of the header section; it cannot contain content or trailers. this.StatusCode == NotModified) { this._readState = PeekableReadState.Finished; goto case PeekableReadState.Finished; } Request.Timing.StartNext(TimingEventNames.Response_Received); // if not an upgraded response, or OnUpgraded returned false, go for the content too. goto case PeekableReadState.PrepareForContent; } break; case PeekableReadState.PrepareForContent: BeginReceiveContent(); // A content-length header might come with chunked transfer-encoding too. var contentLengthHeader = GetFirstHeaderValue("content-length"); long.TryParse(contentLengthHeader, out this._expectedLength); if (HasHeaderWithValue("transfer-encoding", "chunked") && string.IsNullOrEmpty(contentLengthHeader)) { this._deliveryMode = ContentDeliveryMode.Chunked; this.ReadState = PeekableReadState.ContentSetup; } else { this._deliveryMode = ContentDeliveryMode.Raw; this.ReadState = PeekableReadState.ContentSetup; var contentRangeHeaders = GetHeaderValues("content-range"); if (contentLengthHeader == null && contentRangeHeaders == null) { this._deliveryMode = ContentDeliveryMode.RawUnknownLength; } else if (contentLengthHeader == null && contentRangeHeaders != null) { HTTPRange range = GetRange(); this._expectedLength = (range.LastBytePos - range.FirstBytePos) + 1; } } HTTPManager.Logger.Information(nameof(PeekableHTTP1Response), $"PrepareForContent - delivery mode selected: {this._deliveryMode}, {this._expectedLength}!", this.Context); CreateDownloadStream(this._bufferAvailableHandler); string encoding = IsFromCache ? null : GetFirstHeaderValue("content-encoding"); #if !UNITY_WEBGL || UNITY_EDITOR this._compressed = !string.IsNullOrEmpty(encoding); // https://github.com/Benedicht/BestHTTP-Issues/issues/183 // If _decompressor is still null, remove the compressed flag and serve the content as-is. if ((this._decompressor = DecompressorFactory.GetDecompressor(encoding, this.Context)) == null) this._compressed = false; #endif this.sendProgressChanged = this.Request.DownloadSettings.OnDownloadProgress != null && this.IsSuccess; this.ReadState = PeekableReadState.Content; goto case PeekableReadState.Content; case PeekableReadState.Content: var downStream = this.DownStream; if (downStream != null && downStream.MaxBuffered <= downStream.Length) return; switch (this._deliveryMode) { case ContentDeliveryMode.Raw: ProcessReadRaw(peekable); break; case ContentDeliveryMode.RawUnknownLength: ProcessReadRawUnknownLength(peekable); break; case ContentDeliveryMode.Chunked: ProcessReadChunked(peekable); break; } if (this.ReadState == PeekableReadState.Finished) goto case PeekableReadState.Finished; break; case PeekableReadState.Finished: //baseRequest.Timing.StartNext(TimingEventNames.Queued_For_Disptach); break; } } finally { Interlocked.Exchange(ref this._isProccessing, 0); } } bool IsNewLinePresent(PeekableStream peekable) { peekable.BeginPeek(); int nextByte = peekable.PeekByte(); while (nextByte >= 0 && nextByte != 0x0A) nextByte = peekable.PeekByte(); return nextByte == 0x0A; } private void ProcessReadHeaders(PeekableStream peekable, PeekableReadState targetState) { if (!IsNewLinePresent(peekable)) return; do { string headerName = ReadTo(peekable, (byte)':', LF); if (headerName == string.Empty) { this.ReadState = targetState; if (this.Request?.DownloadSettings?.OnHeadersReceived != null) RequestEventHelper.EnqueueRequestEvent(new RequestEventInfo(this.Request, this._newHeaders)); return; } string value = ReadTo(peekable, LF); if (HTTPManager.Logger.IsDiagnostic) HTTPManager.Logger.Verbose(nameof(PeekableHTTP1Response), $"Header - '{headerName}': '{value}'", this.Context); AddHeader(headerName, value); if (this._newHeaders != null) { List values; if (!this._newHeaders.TryGetValue(headerName, out values)) this._newHeaders.Add(headerName, values = new List(1)); values.Add(value); } } while (IsNewLinePresent(peekable)); } private void ProcessReadRawUnknownLength(PeekableStream peekable) { if (peekable == null) { if (sendProgressChanged) RequestEventHelper.EnqueueRequestEvent(new RequestEventInfo(this.Request, RequestEvents.DownloadProgress, this._downloaded, this._expectedLength)); PostProcessContent(); this.ReadState = PeekableReadState.Finished; return; } while (peekable.Length > 0) { var buffer = BufferPool.Get(64 * 1024, true, this.Context); var readCount = peekable.Read(buffer, 0, buffer.Length); ProcessChunk(buffer.AsBuffer(readCount)); } if (sendProgressChanged) RequestEventHelper.EnqueueRequestEvent(new RequestEventInfo(this.Request, RequestEvents.DownloadProgress, this._downloaded, this._expectedLength)); } private bool TryReadChunkLength(PeekableStream peekable, out int result) { result = -1; if (!IsNewLinePresent(peekable)) return false; // Read until the end of line, then split the string so we will discard any optional chunk extensions string line = ReadTo(peekable, LF); string[] splits = line.Split(';'); string num = splits[0]; return int.TryParse(num, System.Globalization.NumberStyles.AllowHexSpecifier, null, out result); } void ProcessReadChunked(PeekableStream peekable) { switch(this._readChunkedState) { case ReadChunkedStates.ReadChunkLength: this._readChunkedState = ReadChunkedStates.ReadChunkLength; if (TryReadChunkLength(peekable, out this._chunkLength)) { if (this._chunkLength == 0) { PostProcessContent(); if (this.Request?.DownloadSettings?.OnHeadersReceived != null) this._newHeaders = new Dictionary>(StringComparer.OrdinalIgnoreCase); goto case ReadChunkedStates.ReadTrailingHeaders; } goto case ReadChunkedStates.ReadChunk; } break; case ReadChunkedStates.ReadChunk: this._readChunkedState = ReadChunkedStates.ReadChunk; while (this._chunkLength > 0 && peekable.Length > 0) { int targetReadCount = Math.Min(Math.Min(64 * 1024, this._chunkLength), (int)peekable.Length); var buffer = BufferPool.Get(targetReadCount, true, this.Context); var readCount = peekable.Read(buffer, 0, targetReadCount); if (readCount < 0) { BufferPool.Release(buffer); throw ExceptionHelper.ServerClosedTCPStream(); } this._chunkLength -= readCount; ProcessChunk(buffer.AsBuffer(readCount)); } if (sendProgressChanged) RequestEventHelper.EnqueueRequestEvent(new RequestEventInfo(this.Request, RequestEvents.DownloadProgress, this._downloaded, this._expectedLength)); // Every chunk data has a trailing CRLF if (this._chunkLength == 0) goto case ReadChunkedStates.ReadTrailingCRLF; break; case ReadChunkedStates.ReadTrailingCRLF: this._readChunkedState = ReadChunkedStates.ReadTrailingCRLF; if (IsNewLinePresent(peekable)) { ReadTo(peekable, LF); goto case ReadChunkedStates.ReadChunkLength; } break; case ReadChunkedStates.ReadTrailingHeaders: this._readChunkedState = ReadChunkedStates.ReadTrailingHeaders; ProcessReadHeaders(peekable, PeekableReadState.Finished); break; } } void ProcessReadRaw(PeekableStream peekable) { if (this.DownStream == null) throw new ArgumentNullException(nameof(this.DownStream)); if (peekable == null) throw new ArgumentNullException(nameof(peekable)); while (peekable.Length > 0 && !this.DownStream.IsFull) { var buffer = BufferPool.Get(64 * 1024, true, this.Context); var readCount = peekable.Read(buffer, 0, buffer.Length); if (readCount < 0) { BufferPool.Release(buffer); throw ExceptionHelper.ServerClosedTCPStream(); } ProcessChunk(buffer.AsBuffer(readCount)); } if (sendProgressChanged) RequestEventHelper.EnqueueRequestEvent(new RequestEventInfo(this.Request, RequestEvents.DownloadProgress, this._downloaded, this._expectedLength)); if (this._downloaded >= this._expectedLength) { PostProcessContent(); this.ReadState = PeekableReadState.Finished; } } void ProcessChunk(BufferSegment chunk) { this._downloaded += chunk.Count; if (this._compressed) { var (decompressed, release) = this._decompressor.Decompress(chunk, false, true, this.Context); if (decompressed != BufferSegment.Empty) FeedDownloadedContentChunk(decompressed); //if (decompressed.Data != chunk.Data) if (release) BufferPool.Release(chunk); } else { FeedDownloadedContentChunk(chunk); } } void PostProcessContent() { if (this._compressed) { var (decompressed, release) = this._decompressor.Decompress(BufferSegment.Empty, true, true, this.Context); if (decompressed != BufferSegment.Empty) FeedDownloadedContentChunk(decompressed); } FinishedContentReceiving(); if (this._decompressor != null) { this._decompressor.Dispose(); this._decompressor = null; } } protected override void Dispose(bool disposing) { base.Dispose(disposing); this._decompressor?.Dispose(); } } }