#if !UNITY_WEBGL || UNITY_EDITOR using System; using System.Threading; using Best.HTTP.Request.Timings; using Best.HTTP.Request.Upload; using Best.HTTP.Response; using Best.HTTP.Shared; using Best.HTTP.Shared.Extensions; using Best.HTTP.Shared.Logger; using Best.HTTP.Shared.PlatformSupport.Memory; using Best.HTTP.Shared.PlatformSupport.Network.Tcp; using Best.HTTP.Shared.PlatformSupport.Threading; using Best.HTTP.Shared.Streams; using static Best.HTTP.Hosts.Connections.HTTP1.Constants; namespace Best.HTTP.Hosts.Connections.HTTP1 { public sealed class HTTP1ContentConsumer : IHTTPRequestHandler, IContentConsumer, IDownloadContentBufferAvailable, IThreadSignaler { public ShutdownTypes ShutdownType { get; private set; } public KeepAliveHeader KeepAlive { get { return this._keepAlive; } } private KeepAliveHeader _keepAlive; public bool CanProcessMultiple { get { return false; } } /// /// Number of assigned requests to process. /// public int AssignedRequests => this.conn.CurrentRequest == null ? 0 : 1; /// /// Maximum number of assignable requests /// public int MaxAssignedRequests => 1; public LoggingContext Context { get; private set; } public PeekableContentProviderStream ContentProvider { get; private set; } private readonly HTTPOverTCPConnection conn; private PeekableHTTP1Response _response; private int _isAlreadyProcessingContent; private AutoResetEvent _are = new AutoResetEvent(false); public HTTP1ContentConsumer(HTTPOverTCPConnection conn) { this.Context = new LoggingContext(this); this.conn = conn; } public void RunHandler() { HTTPManager.Logger.Information(nameof(HTTP1ContentConsumer), "Started processing request", this.Context); ThreadedRunner.SetThreadName("Best.HTTP1 Write"); try { var now = DateTime.Now; if (this.conn.CurrentRequest.TimeoutSettings.IsTimedOut(now)) throw new TimeoutException(); if (this.conn.CurrentRequest.IsCancellationRequested) throw new Exception("Cancellation requested!"); // create the response before we would send out the request, because sending out might cause an exception // and the response is used for decision making in the FinishedProcessing call. if (this._response == null) this.conn.CurrentRequest.Response = this._response = new PeekableHTTP1Response(this.conn.CurrentRequest, false, this); // Write the request to the stream this.conn.CurrentRequest.TimeoutSettings.SetProcessing(now); this.conn.CurrentRequest.Timing.StartNext(TimingEventNames.Request_Sent); SendOutTo(this.conn.CurrentRequest, this.conn.TopStream); this.conn.CurrentRequest.OnCancellationRequested += OnCancellationRequested; } catch (Exception e) { if (this.ShutdownType == ShutdownTypes.Immediate) return; FinishedProcessing(e); } } private void SendOutTo(HTTPRequest request, System.IO.Stream stream) { request.Prepare(); string requestPathAndQuery = request.ProxySettings.HasProxyFor(request.CurrentUri) ? request.ProxySettings.Proxy.GetRequestPath(request.CurrentUri) : request.CurrentUri.GetRequestPathAndQueryURL(); string requestLine = string.Format("{0} {1} HTTP/1.1", HTTPRequest.MethodNames[(byte)request.MethodType], requestPathAndQuery); if (HTTPManager.Logger.Level <= Loglevels.Information) HTTPManager.Logger.Information("HTTPRequest", string.Format("Sending request: '{0}'", requestLine), request.Context); // Create a buffer stream that will not close 'stream' when disposed or closed. // buffersize should be larger than UploadChunkSize as it might be used for uploading user data and // it should have enough room for UploadChunkSize data and additional chunk information. using (WriteOnlyBufferedStream bufferStream = new WriteOnlyBufferedStream(stream, (int)(request.UploadSettings.UploadChunkSize * 1.5f), request.Context)) { var requestLineBytes = requestLine.GetASCIIBytes(); bufferStream.WriteBufferSegment(requestLineBytes); bufferStream.WriteArray(EOL); BufferPool.Release(requestLineBytes); // Write headers to the buffer request.EnumerateHeaders((header, values) => { if (string.IsNullOrEmpty(header) || values == null) return; //var headerName = string.Concat(header, ": ").GetASCIIBytes(); var headerName = header.GetASCIIBytes(); for (int i = 0; i < values.Count; ++i) { if (string.IsNullOrEmpty(values[i])) { HTTPManager.Logger.Warning("HTTPRequest", string.Format("Null/empty value for header: {0}", header), request.Context); continue; } if (HTTPManager.Logger.Level <= Loglevels.Information) HTTPManager.Logger.Verbose("HTTPRequest", $"Header - '{header}': '{values[i]}'", request.Context); var valueBytes = values[i].GetASCIIBytes(); bufferStream.WriteBufferSegment(headerName); bufferStream.WriteArray(HeaderValueSeparator); bufferStream.WriteBufferSegment(valueBytes); bufferStream.WriteArray(EOL); BufferPool.Release(valueBytes); } BufferPool.Release(headerName); }, /*callBeforeSendCallback:*/ true); bufferStream.WriteArray(EOL); // Send remaining data to the wire bufferStream.Flush(); this.conn.CurrentRequest.Timing.StartNext(TimingEventNames.Waiting_TTFB); } // bufferStream.Dispose //if (!request.UploadSettings.Expect100Continue) SendContent(); HTTPManager.Logger.Information("HTTPRequest", "Sent out '" + requestLine + "'", this.Context); } void SendContent() { System.IO.Stream uploadStream = this.conn.CurrentRequest.UploadSettings.UploadStream; if (uploadStream != null) { try { if (uploadStream is Request.Upload.UploadStreamBase upStream) upStream.BeforeSendBody(this.conn.CurrentRequest, this); using WriteOnlyBufferedStream bufferStream = new WriteOnlyBufferedStream(this.conn.TopStream, (int)(this.conn.CurrentRequest.UploadSettings.UploadChunkSize * 1.5f), this.conn.CurrentRequest.Context); long uploadLength = uploadStream.Length; bool isChunked = uploadLength == BodyLengths.UnknownWithChunkedTransferEncoding; // Initialize the progress report variables long Uploaded = 0; // Upload buffer. First we will read the data into this buffer from the UploadStream, then write this buffer to our outStream byte[] buffer = BufferPool.Get(this.conn.CurrentRequest.UploadSettings.UploadChunkSize, true); using var _ = new AutoReleaseBuffer(buffer); // How many bytes was read from the UploadStream int count = uploadStream.Read(buffer, 0, buffer.Length); while (count != 0) { if (count <= 0) { this._are.WaitOne(); count = uploadStream.Read(buffer, 0, buffer.Length); continue; } if (isChunked) { var countBytes = count.ToString("X").GetASCIIBytes(); bufferStream.WriteBufferSegment(countBytes); bufferStream.WriteArray(EOL); BufferPool.Release(countBytes); } // write out the buffer to the wire bufferStream.Write(buffer, 0, count); // chunk trailing EOL if (uploadLength < 0) bufferStream.WriteArray(EOL); // update how many bytes are uploaded Uploaded += count; // Write to the wire bufferStream.Flush(); if (this.conn.CurrentRequest.UploadSettings.OnUploadProgress != null) RequestEventHelper.EnqueueRequestEvent(new RequestEventInfo(this.conn.CurrentRequest, RequestEvents.UploadProgress, Uploaded, uploadLength)); if (this.conn.CurrentRequest.IsCancellationRequested) return; count = uploadStream.Read(buffer, 0, buffer.Length); } // All data from the stream are sent, write the 'end' chunk if necessary if (isChunked) { byte[] noMoreChunkBytes = BufferPool.Get(1, true); noMoreChunkBytes[0] = (byte)'0'; bufferStream.Write(noMoreChunkBytes, 0, 1); bufferStream.WriteArray(EOL); bufferStream.WriteArray(EOL); BufferPool.Release(noMoreChunkBytes); } // Make sure all remaining data will be on the wire bufferStream.Flush(); } finally { if (this.conn.CurrentRequest.UploadSettings.DisposeStream) uploadStream.Dispose(); } } } void IDownloadContentBufferAvailable.BufferAvailable(DownloadContentStream stream) { //HTTPManager.Logger.Verbose(nameof(HTTP1ContentConsumer), "IDownloadContentBufferAvailable.BufferAvailable", this.Context); // TODO: Do NOT call OnContent on the Unity main thread if (this._response != null) OnContent(); } public void SetBinding(PeekableContentProviderStream contentProvider) => this.ContentProvider = contentProvider; public void UnsetBinding() => this.ContentProvider = null; public void OnContent() { if (Interlocked.CompareExchange(ref this._isAlreadyProcessingContent, 1, 0) != 0) return; try { //HTTPManager.Logger.Information(nameof(HTTP1ContentConsumer), $"OnContent({peekable?.Length}, {this._response?.ReadState})", this.Context, this.conn.CurrentRequest.Context); try { if (this.conn.CurrentRequest.TimeoutSettings.IsTimedOut(DateTime.Now)) throw new TimeoutException(); if (this.conn.CurrentRequest.IsCancellationRequested) throw new Exception("Cancellation requested!"); this._response.ProcessPeekable(this.ContentProvider); } catch (Exception e) { if (this.ShutdownType == ShutdownTypes.Immediate) return; FinishedProcessing(e); } // After an exception, this._response will be null! if (this._response != null) { if (this._response.ReadState == PeekableHTTP1Response.PeekableReadState.Finished) FinishedProcessing(null); else if (this._response.ReadState == PeekableHTTP1Response.PeekableReadState.WaitForContentSent) { SendContent(); this.conn.CurrentRequest.Timing.StartNext(TimingEventNames.Waiting_TTFB); } } } finally { Interlocked.Exchange(ref this._isAlreadyProcessingContent, 0); } } public void OnConnectionClosed() { HTTPManager.Logger.Information(nameof(HTTP1ContentConsumer), $"OnConnectionClosed({this.ContentProvider?.Length}, {this._response?.ReadState})", this.Context); if (this._response != null && this._response.ReadState == PeekableHTTP1Response.PeekableReadState.Content && this._response.DeliveryMode == PeekableHTTP1Response.ContentDeliveryMode.RawUnknownLength && "close".Equals(this._response.GetFirstHeaderValue("connection"), StringComparison.OrdinalIgnoreCase)) { FinishedProcessing(null); } else if (this.ContentProvider.Length > 0 && this._response != null && this._response.ReadState == PeekableHTTP1Response.PeekableReadState.Content && this._response.DownStream != null) { // Let the stream comsume any buffered data first, and handle closure when the buffer depletes. // TODO: This require however that the PeekableResponse ping this HTTP1ContentConsumer when buffer space is available in the down-stream. // Or force-add all remaining data to the stream and see whether we finished downloading or not. // Problems: // 1.) OnContent might be already called and a call to it would be dropped. We could spin up a new thread waiting for its finish, then call it again. //throw new NotImplementedException(); ThreadedRunner.RunShortLiving(() => { SpinWait spinWait = new SpinWait(); while (Interlocked.CompareExchange(ref this._isAlreadyProcessingContent, 1, 0) == 1) spinWait.SpinOnce(); try { try { this._response.DownStream.EmergencyIncreaseMaxBuffered(); this._response.ProcessPeekable(this.ContentProvider); } catch (Exception e) { if (this.ShutdownType == ShutdownTypes.Immediate) return; FinishedProcessing(e); } finally { // After an exception, this._response will be null! if (this._response != null) { if (this._response.ReadState == PeekableHTTP1Response.PeekableReadState.Finished) FinishedProcessing(null); else FinishedProcessing(new Exception("Underlying TCP connection closed unexpectedly!")); } } } finally { Interlocked.Exchange(ref this._isAlreadyProcessingContent, 0); } }); return; } // If the consumer still have a request: error it and close the connection if (this.conn.CurrentRequest != null && this._response != null) { FinishedProcessing(new Exception("Underlying TCP connection closed unexpectedly!")); } else // If no current request: close the connection ConnectionEventHelper.EnqueueConnectionEvent(new ConnectionEventInfo(this.conn, HTTPConnectionStates.Closed)); } public void OnError(Exception e) { HTTPManager.Logger.Information(nameof(HTTP1ContentConsumer), $"OnError({this.ContentProvider?.Length}, {this._response?.ReadState}, {this.ShutdownType})", this.Context); if (this.ShutdownType == ShutdownTypes.Immediate) return; FinishedProcessing(e); } private void OnCancellationRequested(HTTPRequest req) { HTTPManager.Logger.Information(nameof(HTTP1ContentConsumer), "OnCancellationRequested()", this.Context); Interlocked.Exchange(ref this._response, null); req.OnCancellationRequested -= OnCancellationRequested; this.conn?.Streamer?.Dispose(); } void FinishedProcessing(Exception ex) { // Warning: FinishedProcessing might be called from different threads in parallel: // - send thread triggered by a write failure // - read thread oncontent/OnError/OnConnectionClosed var resp = Interlocked.Exchange(ref this._response, null); if (resp == null) return; HTTPManager.Logger.Verbose(nameof(HTTP1ContentConsumer), $"{nameof(FinishedProcessing)}({resp.ReadState}, {ex})", this.Context); // Unset the consumer, we no longer expect another OnContent call until further notice. //if (conn.TopStream is IPeekableContentProvider provider && provider?.Consumer == this) // provider.Consumer = null; this.ContentProvider.UnbindIf(this); var req = this.conn.CurrentRequest; req.OnCancellationRequested -= OnCancellationRequested; bool resendRequest = false; HTTPRequestStates requestState = HTTPRequestStates.Finished; HTTPConnectionStates connectionState = ex != null ? HTTPConnectionStates.Closed : HTTPConnectionStates.Recycle; // We could finish the request, ignore the error. if (resp.ReadState == PeekableHTTP1Response.PeekableReadState.Finished) ex = null; Exception error = ex; if (error != null) { // Timeout is a non-retryable error if (ex is TimeoutException) { error = null; requestState = HTTPRequestStates.TimedOut; } else { if (req.RetrySettings.Retries < req.RetrySettings.MaxRetries) { req.RetrySettings.Retries++; error = null; resendRequest = true; } else { requestState = HTTPRequestStates.Error; } } // Any exception means that the connection is in an unknown state, we shouldn't try to reuse it. connectionState = HTTPConnectionStates.Closed; resp.Dispose(); } else { // After HandleResponse connectionState can have the following values: // - Processing: nothing interesting, caller side can decide what happens with the connection (recycle connection). // - Closed: server sent an connection: close header. // - ClosedResendRequest: in this case resendRequest is true, and the connection must not be reused. // In this case we can send only one ConnectionEvent to handle both case and avoid concurrency issues. error = ConnectionHelper.HandleResponse(req, out resendRequest, out connectionState, ref this._keepAlive, this.Context); if (error != null) requestState = HTTPRequestStates.Error; else if (!resendRequest && resp.IsUpgraded) requestState = HTTPRequestStates.Processing; } req.Timing.StartNext(TimingEventNames.Queued); HTTPManager.Logger.Verbose(nameof(HTTP1ContentConsumer), $"{nameof(FinishedProcessing)} final decision. ResendRequest: {resendRequest}, RequestState: {requestState}, ConnectionState: {connectionState}", this.Context); // If HandleResponse returned with ClosedResendRequest or there were an error and we can retry the request if (connectionState == HTTPConnectionStates.ClosedResendRequest || (resendRequest && connectionState == HTTPConnectionStates.Closed)) { ConnectionHelper.ResendRequestAndCloseConnection(this.conn, req); } else if (resendRequest && requestState == HTTPRequestStates.Finished) { RequestEventHelper.EnqueueRequestEvent(new RequestEventInfo(req, RequestEvents.Resend)); ConnectionEventHelper.EnqueueConnectionEvent(new ConnectionEventInfo(this.conn, connectionState)); } else { // Otherwise set the request's then the connection's state ConnectionHelper.EnqueueEvents(this.conn, connectionState, req, requestState, error); } } public void Process(HTTPRequest request) { (conn.TopStream as IPeekableContentProvider).SetTwoWayBinding(this); // https://github.com/Benedicht/BestHTTP-Issues/issues/179 // Toughts: // - Many requests, especially if they are uploading slowly, can occupy all background threads. // Use short-living thread when: // - It's a GET request // - It's not an upgrade request bool isRequestWithoutBody = request.MethodType == HTTPMethods.Get || request.MethodType == HTTPMethods.Head || request.MethodType == HTTPMethods.Delete || request.MethodType == HTTPMethods.Options; bool isUpgrade = request.HasHeader("upgrade"); var useShortLivingThread = HTTPManager.PerHostSettings.Get(request.CurrentHostKey).HTTP1ConnectionSettings.ForceUseThreadPool || (isRequestWithoutBody && !isUpgrade); if (useShortLivingThread) ThreadedRunner.RunShortLiving(RunHandler); else ThreadedRunner.RunLongLiving(RunHandler); } public void Shutdown(ShutdownTypes type) { HTTPManager.Logger.Verbose(nameof(HTTP1ContentConsumer), string.Format($"Shutdown({type})"), this.Context); this.ShutdownType = type; } public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } private void Dispose(bool disposing) { if (disposing) { this._are.Dispose(); this._are = null; } } void IThreadSignaler.SignalThread() { this._are?.Set(); } } } #endif