123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274 |
- using System;
- using System.Threading;
- using Best.HTTP.Caching;
- using Best.HTTP.Hosts.Connections.HTTP1;
- using Best.HTTP.HostSetting;
- using Best.HTTP.Request.Timings;
- using Best.HTTP.Response;
- using Best.HTTP.Shared;
- using Best.HTTP.Shared.Extensions;
- using Best.HTTP.Shared.PlatformSupport.FileSystem;
- using Best.HTTP.Shared.PlatformSupport.Network.Tcp;
- using Best.HTTP.Shared.PlatformSupport.Network.Tcp.Streams;
- using Best.HTTP.Shared.Streams;
- namespace Best.HTTP.Hosts.Connections.File
- {
- internal sealed class FileConnection : ConnectionBase, IContentConsumer, IDownloadContentBufferAvailable
- {
- public PeekableContentProviderStream ContentProvider { get; private set; }
- PeekableHTTP1Response _response;
- NonblockingUnderlyingStream _stream;
- UnityEngine.Hash128 _cacheHash;
- public FileConnection(HostKey hostKey)
- : base(hostKey)
- { }
- protected override void ThreadFunc()
- {
- this.CurrentRequest.Timing.StartNext(TimingEventNames.Waiting_TTFB);
- this.Context.Remove("Request");
- this.Context.Add("Request", this.CurrentRequest.Context);
- bool isFromLocalCache = this.CurrentRequest.CurrentUri.Host.Equals(HTTPCache.CacheHostName, StringComparison.OrdinalIgnoreCase);
- if (this._response == null)
- this.CurrentRequest.Response = this._response = new PeekableHTTP1Response(this.CurrentRequest, isFromLocalCache, this);
- this._response.Context.Add(nameof(FileConnection), this.Context);
- StreamList stream = new StreamList();
- try
- {
- var headers = new BufferPoolMemoryStream();
- stream.AppendStream(headers);
-
- headers.WriteLine("HTTP/1.1 200 Ok");
- System.IO.Stream contentStream = null;
- if (isFromLocalCache)
- {
- var hashStr = this.CurrentRequest.CurrentUri.AbsolutePath.Substring(1);
- var hash = UnityEngine.Hash128.Parse(hashStr);
- // BeginReadContent tries to acquire a read lock on the content and returns null if couldn't.
- contentStream = HTTPManager.LocalCache?.BeginReadContent(hash, this.Context);
- if (contentStream == null)
- throw new HTTPCacheAcquireLockException($"Coulnd't acquire read lock on cached entity.");
- this._cacheHash = hash;
- headers.WriteLine($"BestHTTP-Origin: cachefile({hashStr})");
- stream.AppendStream(HTTPManager.IOService.CreateFileStream(HTTPManager.LocalCache.GetHeaderPathFromHash(hash), FileStreamModes.OpenRead));
- }
- else
- {
- headers.WriteLine($"BestHTTP-Origin: file");
- headers.WriteLine("Content-Type: application/octet-stream");
- contentStream = HTTPManager.IOService.CreateFileStream(this.CurrentRequest.CurrentUri.LocalPath, FileStreamModes.OpenRead);
- }
- headers.WriteLine($"Content-Length: {contentStream.Length.ToString()}");
- if (!isFromLocalCache)
- headers.WriteLine();
- headers.Seek(0, System.IO.SeekOrigin.Begin);
- stream.AppendStream(contentStream);
- this.CurrentRequest.TimeoutSettings.SetProcessing(DateTime.Now);
-
- this._stream = new NonblockingUnderlyingStream(stream, 1024 * 1024, this.Context);
- this._stream.SetTwoWayBinding(this);
- this._stream.BeginReceive();
- this.CurrentRequest.OnCancellationRequested += OnCancellationRequested;
- }
- catch(Exception ex)
- {
- FinishedProcessing(ex);
- stream?.Dispose();
- }
- }
- void IDownloadContentBufferAvailable.BufferAvailable(DownloadContentStream stream)
- {
- //HTTPManager.Logger.Verbose(nameof(FileConnection), "IDownloadContentBufferAvailable.BufferAvailable", this.Context);
- // Here we should trigger somehow the read stream and that should call OnContent(IPeekableContentProvider provider, PeekableStream peekable)
- // to go the regular route.
- if (this._response != null)
- OnContent();
- }
- public void SetBinding(PeekableContentProviderStream contentProvider)
- {
- this.ContentProvider = contentProvider;
- }
- public void UnsetBinding() => this.ContentProvider = null;
- public void OnContent()
- {
- try
- {
- if (this.CurrentRequest.TimeoutSettings.IsTimedOut(DateTime.Now))
- throw new TimeoutException();
- if (this.CurrentRequest.IsCancellationRequested)
- throw new Exception("Cancellation requested!");
- this._response.ProcessPeekable(this.ContentProvider);
- }
- catch (Exception e)
- {
- if (this.ShutdownType == ShutdownTypes.Immediate)
- return;
- FinishedProcessing(e);
- }
- // After an exception, this._response will be null!
- if (this._response != null && this._response.ReadState == PeekableHTTP1Response.PeekableReadState.Finished)
- FinishedProcessing(null);
- }
- public void OnConnectionClosed()
- {
- HTTPManager.Logger.Information(nameof(FileConnection), $"OnConnectionClosed({this.ContentProvider?.Length}, {this._response?.ReadState})", this.Context);
- // If the consumer still have a request: error it and close the connection
- if (this.CurrentRequest != null && this._response != null)
- {
- FinishedProcessing(new Exception("Underlying TCP connection closed unexpectedly!"));
- }
- else // If no current request: close the connection
- ConnectionEventHelper.EnqueueConnectionEvent(new ConnectionEventInfo(this, HTTPConnectionStates.Closed));
- }
- public void OnError(Exception e)
- {
- HTTPManager.Logger.Information(nameof(FileConnection), $"OnError({this.ContentProvider?.Length}, {this._response?.ReadState}, {this.ShutdownType})", this.Context);
- if (this.ShutdownType == ShutdownTypes.Immediate)
- return;
- FinishedProcessing(e);
- }
- private void OnCancellationRequested(HTTPRequest req)
- {
- HTTPManager.Logger.Information(nameof(FileConnection), "OnCancellationRequested()", this.Context);
- Interlocked.Exchange(ref this._response, null);
- req.OnCancellationRequested -= OnCancellationRequested;
- this._stream.Dispose();
- }
- void FinishedProcessing(Exception ex)
- {
- // Warning: FinishedProcessing might be called from different threads in parallel:
- // - send thread triggered by a write failure
- // - read thread oncontent/OnError/OnConnectionClosed
- var resp = Interlocked.Exchange(ref this._response, null);
- if (resp == null)
- return;
- HTTPManager.Logger.Verbose(nameof(FileConnection), $"{nameof(FinishedProcessing)}({resp}, {ex})", this.Context);
- HTTPManager.LocalCache?.EndReadContent(this._cacheHash, this.Context);
- this._cacheHash = new UnityEngine.Hash128();
- // Unset the consumer, we no longer expect another OnContent call until further notice.
- this._stream?.Unbind();
- this._stream?.Dispose();
- this._stream = null;
- var req = this.CurrentRequest;
- req.OnCancellationRequested -= OnCancellationRequested;
- bool resendRequest = false;
- HTTPRequestStates requestState = HTTPRequestStates.Finished;
- HTTPConnectionStates connectionState = HTTPConnectionStates.Recycle;
- Exception error = ex;
- if (error != null)
- {
- // Timeout is a non-retryable error
- if (ex is TimeoutException)
- {
- error = null;
- requestState = HTTPRequestStates.TimedOut;
- }
- else if (ex is HTTPCacheAcquireLockException)
- {
- error = null;
- resendRequest = true;
- }
- else
- {
- if (req.RetrySettings.Retries < req.RetrySettings.MaxRetries)
- {
- req.RetrySettings.Retries++;
- error = null;
- resendRequest = true;
- }
- else
- {
- requestState = HTTPRequestStates.Error;
- }
- }
- // Any exception means that the connection is in an unknown state, we shouldn't try to reuse it.
- connectionState = HTTPConnectionStates.Closed;
- resp.Dispose();
- }
- else
- {
- // After HandleResponse connectionState can have the following values:
- // - Processing: nothing interesting, caller side can decide what happens with the connection (recycle connection).
- // - Closed: server sent an connection: close header.
- // - ClosedResendRequest: in this case resendRequest is true, and the connection must not be reused.
- // In this case we can send only one ConnectionEvent to handle both case and avoid concurrency issues.
- KeepAliveHeader keepAlive = null;
- error = ConnectionHelper.HandleResponse(req, out resendRequest, out connectionState, ref keepAlive, this.Context);
- connectionState = HTTPConnectionStates.Recycle;
- if (!resendRequest && resp.IsUpgraded)
- requestState = HTTPRequestStates.Processing;
- }
- req.Timing.StartNext(TimingEventNames.Queued);
- HTTPManager.Logger.Verbose(nameof(FileConnection), $"{nameof(FinishedProcessing)} final decision. ResendRequest: {resendRequest}, RequestState: {requestState}, ConnectionState: {connectionState}", this.Context);
- // If HandleResponse returned with ClosedResendRequest or there were an error and we can retry the request
- if (connectionState == HTTPConnectionStates.ClosedResendRequest || (resendRequest && connectionState == HTTPConnectionStates.Closed))
- {
- ConnectionHelper.ResendRequestAndCloseConnection(this, req);
- }
- else if (resendRequest && requestState == HTTPRequestStates.Finished)
- {
- RequestEventHelper.EnqueueRequestEvent(new RequestEventInfo(req, RequestEvents.Resend));
- ConnectionEventHelper.EnqueueConnectionEvent(new ConnectionEventInfo(this, connectionState));
- }
- else
- {
- // Otherwise set the request's then the connection's state
- ConnectionHelper.EnqueueEvents(this, connectionState, req, requestState, error);
- }
- }
- }
- }
|