HTTP2ContentConsumer.cs 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695
  1. #if (!UNITY_WEBGL || UNITY_EDITOR) && !BESTHTTP_DISABLE_ALTERNATE_SSL
  2. #define ENABLE_LOGGING
  3. using System;
  4. using System.Collections.Concurrent;
  5. using System.Collections.Generic;
  6. using System.Threading;
  7. using Best.HTTP.Shared.Extensions;
  8. using Best.HTTP.Shared.PlatformSupport.Memory;
  9. using Best.HTTP.Shared.PlatformSupport.Threading;
  10. using Best.HTTP.Shared;
  11. using Best.HTTP.Shared.Logger;
  12. using Best.HTTP.Shared.PlatformSupport.Network.Tcp;
  13. using Best.HTTP.Shared.Streams;
  14. namespace Best.HTTP.Hosts.Connections.HTTP2
  15. {
  16. public delegate HTTP2Stream CustomHTTP2StreamFactory(HTTPRequest request, uint streamId, HTTP2ContentConsumer parentHandler, HTTP2SettingsManager registry, HPACKEncoder hpackEncoder);
  17. public sealed class HTTP2ContentConsumer : IHTTPRequestHandler, IContentConsumer, IThreadSignaler
  18. {
  19. public KeepAliveHeader KeepAlive { get { return null; } }
  20. public bool CanProcessMultiple { get { return !this.SentGoAwayFrame && this.isRunning && this._maxAssignedRequests > 1; } }
  21. public int AssignedRequests => this._assignedRequest;
  22. private int _assignedRequest;
  23. public int MaxAssignedRequests => this._maxAssignedRequests;
  24. private int _maxAssignedRequests = 1;
  25. public const UInt32 MaxValueFor31Bits = 0xFFFFFFFF >> 1;
  26. public double Latency { get; private set; }
  27. public HTTP2SettingsManager settings;
  28. public HPACKEncoder HPACKEncoder;
  29. public LoggingContext Context { get; private set; }
  30. public PeekableContentProviderStream ContentProvider { get; private set; }
  31. private DateTime lastPingSent = DateTime.MinValue;
  32. private int waitingForPingAck = 0;
  33. public static int RTTBufferCapacity = 5;
  34. private CircularBuffer<double> rtts = new CircularBuffer<double>(RTTBufferCapacity);
  35. private volatile bool isRunning;
  36. private AutoResetEvent newFrameSignal = new AutoResetEvent(false);
  37. private ConcurrentQueue<HTTPRequest> requestQueue = new ConcurrentQueue<HTTPRequest>();
  38. private List<HTTP2Stream> clientInitiatedStreams = new List<HTTP2Stream>();
  39. private ConcurrentQueue<HTTP2FrameHeaderAndPayload> newFrames = new ConcurrentQueue<HTTP2FrameHeaderAndPayload>();
  40. private List<HTTP2FrameHeaderAndPayload> outgoingFrames = new List<HTTP2FrameHeaderAndPayload>();
  41. private UInt32 remoteWindow;
  42. private DateTime lastInteraction;
  43. private DateTime goAwaySentAt = DateTime.MaxValue;
  44. private bool SentGoAwayFrame { get => this.goAwaySentAt != DateTime.MaxValue; }
  45. private HTTPOverTCPConnection conn;
  46. private TimeSpan MaxGoAwayWaitTime { get { return !this.SentGoAwayFrame ? TimeSpan.MaxValue : TimeSpan.FromMilliseconds(Math.Max(this.Latency * 2.5, 1500)); } }
  47. // https://httpwg.org/specs/rfc7540.html#StreamIdentifiers
  48. // Streams initiated by a client MUST use odd-numbered stream identifiers
  49. // With an initial value of -1, the first client initiated stream's id going to be 1.
  50. private long LastStreamId = -1;
  51. private HTTP2ConnectionSettings _connectionSettings;
  52. public HTTP2ContentConsumer(HTTPOverTCPConnection conn)
  53. {
  54. this.Context = new LoggingContext(this);
  55. this.Context.Add("Parent", conn.Context);
  56. this.conn = conn;
  57. this.isRunning = true;
  58. this._connectionSettings = HTTPManager.PerHostSettings.Get(conn.HostKey).HTTP2ConnectionSettings;
  59. this.settings = new HTTP2SettingsManager(this.Context, this._connectionSettings);
  60. Process(this.conn.CurrentRequest);
  61. }
  62. public void Process(HTTPRequest request)
  63. {
  64. #if ENABLE_LOGGING
  65. HTTPManager.Logger.Information(nameof(HTTP2ContentConsumer), "Process request called", this.Context);
  66. #endif
  67. request.TimeoutSettings.SetProcessing(this.lastInteraction = DateTime.Now);
  68. Interlocked.Increment(ref this._assignedRequest);
  69. this.requestQueue.Enqueue(request);
  70. SignalThread();
  71. }
  72. public void SignalThread()
  73. {
  74. this.newFrameSignal?.Set();
  75. }
  76. public void RunHandler()
  77. {
  78. #if ENABLE_LOGGING
  79. HTTPManager.Logger.Information(nameof(HTTP2ContentConsumer), "Processing thread up and running!", this.Context);
  80. #endif
  81. ThreadedRunner.SetThreadName("Best.HTTP2 Process");
  82. string abortWithMessage = string.Empty;
  83. try
  84. {
  85. bool atLeastOneStreamHasAFrameToSend = true;
  86. this.HPACKEncoder = new HPACKEncoder(this.Context, this.settings);
  87. // https://httpwg.org/specs/rfc7540.html#InitialWindowSize
  88. // The connection flow-control window is also 65,535 octets.
  89. this.remoteWindow = this.settings.RemoteSettings[HTTP2Settings.INITIAL_WINDOW_SIZE];
  90. // we want to pack as many data as we can in one tcp segment, but setting the buffer's size too high
  91. // we might keep data too long and send them in bursts instead of in a steady stream.
  92. // Keeping it too low might result in a full tcp segment and one with very low payload
  93. // Is it possible that one full tcp segment sized buffer would be the best, or multiple of it.
  94. // It would keep the network busy without any fragments. The ethernet layer has a maximum of 1500 bytes,
  95. // but there's two layers of 20 byte headers each, so as a theoretical maximum it's 1500-20-20 bytes.
  96. // On the other hand, if the buffer is small (1-2), that means that for larger data, we have to do a lot
  97. // of system calls, in that case a larger buffer might be better. Still, if we are not cpu bound,
  98. // a well saturated network might serve us better.
  99. using (WriteOnlyBufferedStream bufferedStream = new WriteOnlyBufferedStream(this.conn.TopStream, 1024 * 1024 /*1500 - 20 - 20*/, this.Context))
  100. {
  101. // The client connection preface starts with a sequence of 24 octets
  102. // Connection preface starts with the string PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n).
  103. ReadOnlySpan<byte> MAGIC = stackalloc byte[24] { 0x50, 0x52, 0x49, 0x20, 0x2a, 0x20, 0x48, 0x54, 0x54, 0x50, 0x2f, 0x32, 0x2e, 0x30, 0x0d, 0x0a, 0x0d, 0x0a, 0x53, 0x4d, 0x0d, 0x0a, 0x0d, 0x0a };
  104. bufferedStream.Write(MAGIC);
  105. // This sequence MUST be followed by a SETTINGS frame (Section 6.5), which MAY be empty.
  106. // The client sends the client connection preface immediately upon receipt of a
  107. // 101 (Switching Protocols) response (indicating a successful upgrade)
  108. // or as the first application data octets of a TLS connection
  109. this.settings.InitiatedMySettings[HTTP2Settings.INITIAL_WINDOW_SIZE] = this._connectionSettings.InitialStreamWindowSize;
  110. this.settings.InitiatedMySettings[HTTP2Settings.MAX_CONCURRENT_STREAMS] = this._connectionSettings.MaxConcurrentStreams;
  111. this.settings.InitiatedMySettings[HTTP2Settings.ENABLE_CONNECT_PROTOCOL] = (uint)(this._connectionSettings.EnableConnectProtocol ? 1 : 0);
  112. this.settings.InitiatedMySettings[HTTP2Settings.ENABLE_PUSH] = 0;
  113. this.settings.SendChanges(this.outgoingFrames);
  114. this.settings.RemoteSettings.OnSettingChangedEvent += OnRemoteSettingChanged;
  115. // The default window size for the whole connection is 65535 bytes,
  116. // but we want to set it to the maximum possible value.
  117. Int64 initialConnectionWindowSize = this._connectionSettings.InitialConnectionWindowSize;
  118. // yandex.ru returns with an FLOW_CONTROL_ERROR (3) error when the plugin tries to set the connection window to 2^31 - 1
  119. // and works only with a maximum value of 2^31 - 10Mib (10 * 1024 * 1024).
  120. if (initialConnectionWindowSize == HTTP2ContentConsumer.MaxValueFor31Bits)
  121. initialConnectionWindowSize -= 10 * 1024 * 1024;
  122. if (initialConnectionWindowSize > 65535)
  123. {
  124. Int64 initialConnectionWindowSizeDiff = initialConnectionWindowSize - 65535;
  125. if (initialConnectionWindowSizeDiff > 0)
  126. this.outgoingFrames.Add(HTTP2FrameHelper.CreateWindowUpdateFrame(0, (UInt32)initialConnectionWindowSizeDiff, this.Context));
  127. }
  128. initialConnectionWindowSize -= 65535;
  129. // local, per-connection window
  130. long localConnectionWindow = initialConnectionWindowSize;
  131. UInt32 updateConnectionWindowAt = (UInt32)(localConnectionWindow / 2);
  132. while (this.isRunning)
  133. {
  134. DateTime now = DateTime.Now;
  135. if (!atLeastOneStreamHasAFrameToSend)
  136. {
  137. // buffered stream will call flush automatically if its internal buffer is full.
  138. // But we have to make it sure that we flush remaining data before we go to sleep.
  139. bufferedStream.Flush();
  140. // Wait until we have to send the next ping, OR a new frame is received on the read thread.
  141. // lastPingSent Now lastPingSent+frequency lastPingSent+Ping timeout
  142. //----|---------------------|---------------|----------------------|----------------------|------------|
  143. // lastInteraction lastInteraction + MaxIdleTime
  144. var sendPingAt = this.lastPingSent + this._connectionSettings.PingFrequency;
  145. var timeoutAt = this.waitingForPingAck != 0 ? this.lastPingSent + this._connectionSettings.Timeout : DateTime.MaxValue;
  146. // sendPingAt can be in the past if Timeout is larger than PingFrequency
  147. var nextPingInteraction = sendPingAt < timeoutAt && sendPingAt >= now ? sendPingAt : timeoutAt;
  148. var disconnectByIdleAt = this.lastInteraction + this._connectionSettings.MaxIdleTime;
  149. var nextDueClientInteractionAt = nextPingInteraction < disconnectByIdleAt ? nextPingInteraction : disconnectByIdleAt;
  150. int wait = (int)(nextDueClientInteractionAt - now).TotalMilliseconds;
  151. wait = (int)Math.Min(wait, this.MaxGoAwayWaitTime.TotalMilliseconds);
  152. TimeSpan nextStreamInteraction = TimeSpan.MaxValue;
  153. for (int i = 0; i < this.clientInitiatedStreams.Count; i++)
  154. {
  155. var streamInteraction = this.clientInitiatedStreams[i].NextInteraction;
  156. if (streamInteraction < nextStreamInteraction)
  157. nextStreamInteraction = streamInteraction;
  158. }
  159. wait = (int)Math.Min(wait, nextStreamInteraction.TotalMilliseconds);
  160. wait = (int)Math.Min(wait, 1000);
  161. if (wait >= 1)
  162. {
  163. //if (HTTPManager.Logger.Level <= Logger.Loglevels.All)
  164. // HTTPManager.Logger.Information(nameof(HTTP2ContentConsumer), string.Format("Sleeping for {0:N0}ms", wait), this.Context);
  165. this.newFrameSignal.WaitOne(wait);
  166. now = DateTime.Now;
  167. }
  168. }
  169. // Don't send a new ping until a pong isn't received for the last one
  170. if (now - this.lastPingSent >= this._connectionSettings.PingFrequency && Interlocked.CompareExchange(ref this.waitingForPingAck, 1, 0) == 0)
  171. {
  172. this.lastPingSent = now;
  173. var frame = HTTP2FrameHelper.CreatePingFrame(HTTP2PingFlags.None, this.Context);
  174. BufferHelper.SetLong(frame.Payload.Data, 0, now.Ticks);
  175. #if ENABLE_LOGGING
  176. HTTPManager.Logger.Information(nameof(HTTP2ContentConsumer), $"PING frame created with payload: {frame.Payload.Slice(0, 8)}", this.Context);
  177. #endif
  178. this.outgoingFrames.Add(frame);
  179. }
  180. // Process received frames
  181. HTTP2FrameHeaderAndPayload header;
  182. while (this.newFrames.TryDequeue(out header))
  183. {
  184. if (header.StreamId > 0)
  185. {
  186. switch (header.Type)
  187. {
  188. case HTTP2FrameTypes.DATA:
  189. localConnectionWindow -= header.Payload.Count;
  190. break;
  191. }
  192. HTTP2Stream http2Stream = FindStreamById(header.StreamId);
  193. // Add frame to the stream, so it can process it when its Process function is called
  194. if (http2Stream != null)
  195. {
  196. http2Stream.AddFrame(header, this.outgoingFrames);
  197. }
  198. else
  199. {
  200. // Error? It's possible that we closed and removed the stream while the server was in the middle of sending frames
  201. #if ENABLE_LOGGING
  202. if (HTTPManager.Logger.Level == Loglevels.All)
  203. HTTPManager.Logger.Warning(nameof(HTTP2ContentConsumer), $"Can't deliver frame: {header}, because no stream could be found for its Id!", this.Context);
  204. #endif
  205. BufferPool.Release(header.Payload);
  206. }
  207. }
  208. else
  209. {
  210. switch (header.Type)
  211. {
  212. case HTTP2FrameTypes.SETTINGS:
  213. this.settings.Process(header, this.outgoingFrames);
  214. Interlocked.Exchange(ref this._maxAssignedRequests,
  215. (int)Math.Min(this._connectionSettings.MaxConcurrentStreams,
  216. this.settings.RemoteSettings[HTTP2Settings.MAX_CONCURRENT_STREAMS]));
  217. /*
  218. PluginEventHelper.EnqueuePluginEvent(
  219. new PluginEventInfo(PluginEvents.HTTP2ConnectProtocol,
  220. new HTTP2ConnectProtocolInfo(this.conn.HostKey,
  221. this.settings.MySettings[HTTP2Settings.ENABLE_CONNECT_PROTOCOL] == 1 && this.settings.RemoteSettings[HTTP2Settings.ENABLE_CONNECT_PROTOCOL] == 1)));
  222. */
  223. break;
  224. case HTTP2FrameTypes.PING:
  225. var pingFrame = HTTP2FrameHelper.ReadPingFrame(header);
  226. if ((pingFrame.Flags & HTTP2PingFlags.ACK) != 0)
  227. {
  228. if (Interlocked.CompareExchange(ref this.waitingForPingAck, 0, 1) == 0)
  229. break; // waitingForPingAck was 0 == aren't expecting a ping ack!
  230. // it was an ack, payload must contain what we sent
  231. var ticks = BufferHelper.ReadLong(pingFrame.OpaqueData, 0);
  232. // the difference between the current time and the time when the ping message is sent
  233. TimeSpan diff = TimeSpan.FromTicks(now.Ticks - ticks);
  234. #if ENABLE_LOGGING
  235. if (diff.TotalSeconds > 10 || diff.TotalSeconds < 0)
  236. HTTPManager.Logger.Warning(nameof(HTTP2ContentConsumer), $"Pong received with weird diff: {diff}! Payload: {pingFrame.OpaqueData}", this.Context);
  237. #endif
  238. // add it to the buffer
  239. this.rtts.Add(diff.TotalMilliseconds);
  240. // and calculate the new latency
  241. this.Latency = CalculateLatency();
  242. #if ENABLE_LOGGING
  243. HTTPManager.Logger.Verbose(nameof(HTTP2ContentConsumer), string.Format("Latency: {0:F2}ms, RTT buffer: {1}", this.Latency, this.rtts.ToString()), this.Context);
  244. #endif
  245. }
  246. else if ((pingFrame.Flags & HTTP2PingFlags.ACK) == 0)
  247. {
  248. // https://httpwg.org/specs/rfc7540.html#PING
  249. // if it wasn't an ack for our ping, we have to send one
  250. var frame = HTTP2FrameHelper.CreatePingFrame(HTTP2PingFlags.ACK, this.Context);
  251. Array.Copy(pingFrame.OpaqueData.Data, 0, frame.Payload.Data, 0, pingFrame.OpaqueData.Count);
  252. this.outgoingFrames.Add(frame);
  253. }
  254. BufferPool.Release(pingFrame.OpaqueData);
  255. break;
  256. case HTTP2FrameTypes.WINDOW_UPDATE:
  257. var windowUpdateFrame = HTTP2FrameHelper.ReadWindowUpdateFrame(header);
  258. this.remoteWindow += windowUpdateFrame.WindowSizeIncrement;
  259. break;
  260. case HTTP2FrameTypes.GOAWAY:
  261. // parse the frame, so we can print out detailed information
  262. HTTP2GoAwayFrame goAwayFrame = HTTP2FrameHelper.ReadGoAwayFrame(header);
  263. #if ENABLE_LOGGING
  264. HTTPManager.Logger.Information(nameof(HTTP2ContentConsumer), "Received GOAWAY frame: " + goAwayFrame.ToString(), this.Context);
  265. #endif
  266. abortWithMessage = string.Format("Server closing the connection! Error code: {0} ({1}) Additonal Debug Data: {2}",
  267. goAwayFrame.Error, goAwayFrame.ErrorCode, goAwayFrame.AdditionalDebugData);
  268. for (int i = 0; i < this.clientInitiatedStreams.Count; ++i)
  269. this.clientInitiatedStreams[i].Abort(abortWithMessage);
  270. this.clientInitiatedStreams.Clear();
  271. // set the running flag to false, so the thread can exit
  272. this.isRunning = false;
  273. BufferPool.Release(goAwayFrame.AdditionalDebugData);
  274. //this.conn.State = HTTPConnectionStates.Closed;
  275. break;
  276. case HTTP2FrameTypes.ALT_SVC:
  277. //HTTP2AltSVCFrame altSvcFrame = HTTP2FrameHelper.ReadAltSvcFrame(header);
  278. // Implement
  279. //HTTPManager.EnqueuePluginEvent(new PluginEventInfo(PluginEvents.AltSvcHeader, new AltSvcEventInfo(altSvcFrame.Origin, ))
  280. break;
  281. }
  282. if (header.Payload != null)
  283. BufferPool.Release(header.Payload);
  284. }
  285. }
  286. // If no pong received in a (configurable) reasonable time, treat the connection broken
  287. if (this.waitingForPingAck != 0 && now - this.lastPingSent >= this._connectionSettings.Timeout)
  288. throw new TimeoutException("Ping ACK isn't received in time!");
  289. // pre-test stream count to lock only when truly needed.
  290. if (this.clientInitiatedStreams.Count < _maxAssignedRequests && this.isRunning)
  291. {
  292. // grab requests from queue
  293. HTTPRequest request;
  294. while (this.clientInitiatedStreams.Count < _maxAssignedRequests && this.requestQueue.TryDequeue(out request))
  295. {
  296. HTTP2Stream newStream = null;
  297. if (request.Tag is CustomHTTP2StreamFactory factory)
  298. {
  299. newStream = factory(request, (UInt32)Interlocked.Add(ref LastStreamId, 2), this, this.settings, this.HPACKEncoder);
  300. }
  301. else
  302. {
  303. newStream = new HTTP2Stream((UInt32)Interlocked.Add(ref LastStreamId, 2), this, this.settings, this.HPACKEncoder);
  304. }
  305. newStream.Assign(request);
  306. this.clientInitiatedStreams.Add(newStream);
  307. }
  308. }
  309. // send any settings changes
  310. this.settings.SendChanges(this.outgoingFrames);
  311. atLeastOneStreamHasAFrameToSend = false;
  312. // process other streams
  313. for (int i = 0; i < this.clientInitiatedStreams.Count; ++i)
  314. {
  315. var stream = this.clientInitiatedStreams[i];
  316. stream.Process(this.outgoingFrames);
  317. // remove closed, empty streams (not enough to check the closed flag, a closed stream still can contain frames to send)
  318. if (stream.State == HTTP2StreamStates.Closed && !stream.HasFrameToSend)
  319. {
  320. this.clientInitiatedStreams.RemoveAt(i--);
  321. stream.Removed();
  322. Interlocked.Decrement(ref this._assignedRequest);
  323. }
  324. atLeastOneStreamHasAFrameToSend |= stream.HasFrameToSend;
  325. this.lastInteraction = now;
  326. }
  327. // If we encounter a data frame that too large for the current remote window, we have to stop
  328. // sending all data frames as we could send smaller data frames before the large ones.
  329. // Room for improvement: An improvement would be here to stop data frame sending per-stream.
  330. bool haltDataSending = false;
  331. if (this.ShutdownType == ShutdownTypes.Running && !this.SentGoAwayFrame && now - this.lastInteraction >= this._connectionSettings.MaxIdleTime)
  332. {
  333. this.lastInteraction = now;
  334. #if ENABLE_LOGGING
  335. HTTPManager.Logger.Information(nameof(HTTP2ContentConsumer), "Reached idle time, sending GoAway frame!", this.Context);
  336. #endif
  337. this.outgoingFrames.Add(HTTP2FrameHelper.CreateGoAwayFrame(0, HTTP2ErrorCodes.NO_ERROR, this.Context));
  338. this.goAwaySentAt = now;
  339. }
  340. // https://httpwg.org/specs/rfc7540.html#GOAWAY
  341. // Endpoints SHOULD always send a GOAWAY frame before closing a connection so that the remote peer can know whether a stream has been partially processed or not.
  342. if (this.ShutdownType == ShutdownTypes.Gentle && !this.SentGoAwayFrame)
  343. {
  344. #if ENABLE_LOGGING
  345. HTTPManager.Logger.Information(nameof(HTTP2ContentConsumer), "Connection abort requested, sending GoAway frame!", this.Context);
  346. #endif
  347. this.outgoingFrames.Clear();
  348. this.outgoingFrames.Add(HTTP2FrameHelper.CreateGoAwayFrame(0, HTTP2ErrorCodes.NO_ERROR, this.Context));
  349. this.goAwaySentAt = now;
  350. }
  351. if (this.isRunning && this.SentGoAwayFrame && now - goAwaySentAt >= this.MaxGoAwayWaitTime)
  352. {
  353. #if ENABLE_LOGGING
  354. HTTPManager.Logger.Information(nameof(HTTP2ContentConsumer), "No GoAway frame received back. Really quitting now!", this.Context);
  355. #endif
  356. this.isRunning = false;
  357. continue;
  358. }
  359. if (localConnectionWindow < updateConnectionWindowAt)
  360. {
  361. UInt32 diff = (UInt32)(initialConnectionWindowSize - localConnectionWindow);
  362. #if ENABLE_LOGGING
  363. if (HTTPManager.Logger.IsDiagnostic)
  364. HTTPManager.Logger.Information(nameof(HTTP2ContentConsumer), $"Updating local connection window by {diff:N0} ({initialConnectionWindowSize:N0} - {localConnectionWindow:N0})", this.Context);
  365. #endif
  366. this.outgoingFrames.Add(HTTP2FrameHelper.CreateWindowUpdateFrame(0, diff, this.Context));
  367. localConnectionWindow = initialConnectionWindowSize;
  368. }
  369. // Go through all the collected frames and send them.
  370. for (int i = 0; i < this.outgoingFrames.Count; ++i)
  371. {
  372. var frame = this.outgoingFrames[i];
  373. #if ENABLE_LOGGING
  374. if (HTTPManager.Logger.IsDiagnostic && frame.Type != HTTP2FrameTypes.DATA /*&& frame.Type != HTTP2FrameTypes.PING*/)
  375. HTTPManager.Logger.Information(nameof(HTTP2ContentConsumer), "Sending frame: " + frame.ToString(), this.Context);
  376. #endif
  377. // post process frames
  378. switch (frame.Type)
  379. {
  380. case HTTP2FrameTypes.DATA:
  381. if (haltDataSending)
  382. continue;
  383. // if the tracked remoteWindow is smaller than the frame's payload, we stop sending
  384. // data frames until we receive window-update frames
  385. if (frame.Payload.Count > this.remoteWindow)
  386. {
  387. haltDataSending = true;
  388. #if ENABLE_LOGGING
  389. HTTPManager.Logger.Warning(nameof(HTTP2ContentConsumer), string.Format("Data sending halted for this round. Remote Window: {0:N0}, frame: {1}", this.remoteWindow, frame.ToString()), this.Context);
  390. #endif
  391. continue;
  392. }
  393. break;
  394. }
  395. this.outgoingFrames.RemoveAt(i--);
  396. using (var buffer = HTTP2FrameHelper.HeaderAsBinary(frame))
  397. bufferedStream.Write(buffer.Data, 0, buffer.Count);
  398. if (frame.Payload.Count > 0)
  399. {
  400. bufferedStream.Write(frame.Payload.Data, frame.Payload.Offset, frame.Payload.Count);
  401. if (!frame.DontUseMemPool)
  402. BufferPool.Release(frame.Payload);
  403. }
  404. if (frame.Type == HTTP2FrameTypes.DATA)
  405. this.remoteWindow -= (uint)frame.Payload.Count;
  406. }
  407. bufferedStream.Flush();
  408. } // while (this.isRunning)
  409. bufferedStream.Flush();
  410. }
  411. }
  412. catch (Exception ex)
  413. {
  414. abortWithMessage = ex.ToString();
  415. // Log out the exception if it's a non-expected one.
  416. if (this.ShutdownType == ShutdownTypes.Running && this.isRunning && !this.SentGoAwayFrame && !HTTPManager.IsQuitting)
  417. HTTPManager.Logger.Exception(nameof(HTTP2ContentConsumer), "Sender thread", ex, this.Context);
  418. }
  419. finally
  420. {
  421. this.isRunning = false;
  422. #if ENABLE_LOGGING
  423. HTTPManager.Logger.Information(nameof(HTTP2ContentConsumer), $"Sender thread closing - cleaning up remaining requests({this.clientInitiatedStreams.Count})...", this.Context);
  424. #endif
  425. if (string.IsNullOrEmpty(abortWithMessage))
  426. abortWithMessage = "Connection closed unexpectedly";
  427. for (int i = 0; i < this.clientInitiatedStreams.Count; ++i)
  428. this.clientInitiatedStreams[i].Abort(abortWithMessage);
  429. this.clientInitiatedStreams.Clear();
  430. #if ENABLE_LOGGING
  431. HTTPManager.Logger.Information(nameof(HTTP2ContentConsumer), "Sender thread closing", this.Context);
  432. #endif
  433. ConnectionEventHelper.EnqueueConnectionEvent(new ConnectionEventInfo(this.conn, HTTPConnectionStates.Closed));
  434. }
  435. }
  436. private void OnRemoteSettingChanged(HTTP2SettingsRegistry registry, HTTP2Settings setting, uint oldValue, uint newValue)
  437. {
  438. switch (setting)
  439. {
  440. case HTTP2Settings.INITIAL_WINDOW_SIZE:
  441. this.remoteWindow = newValue - (oldValue - this.remoteWindow);
  442. break;
  443. }
  444. }
  445. public void SetBinding(PeekableContentProviderStream contentProvider) => this.ContentProvider = contentProvider;
  446. public void UnsetBinding() => this.ContentProvider = null;
  447. public void OnContent()
  448. {
  449. try
  450. {
  451. while (this.isRunning && HTTP2FrameHelper.CanReadFullFrame(this.ContentProvider))
  452. {
  453. HTTP2FrameHeaderAndPayload header = HTTP2FrameHelper.ReadHeader(this.ContentProvider, this.Context);
  454. #if ENABLE_LOGGING
  455. if (HTTPManager.Logger.IsDiagnostic /*&& header.Type != HTTP2FrameTypes.DATA /*&& header.Type != HTTP2FrameTypes.PING*/)
  456. HTTPManager.Logger.Information(nameof(HTTP2ContentConsumer), "New frame received: " + header.ToString(), this.Context);
  457. #endif
  458. // Add the new frame to the queue. Processing it on the write thread gives us the advantage that
  459. // we don't have to deal with too much locking.
  460. this.newFrames.Enqueue(header);
  461. }
  462. }
  463. catch (Exception ex)
  464. {
  465. HTTPManager.Logger.Exception(nameof(HTTP2ContentConsumer), "", ex, this.Context);
  466. }
  467. finally
  468. {
  469. // ping write thread to process the new frame
  470. this.newFrameSignal?.Set();
  471. }
  472. }
  473. public void OnConnectionClosed()
  474. {
  475. #if ENABLE_LOGGING
  476. HTTPManager.Logger.Verbose(nameof(HTTP2ContentConsumer), $"{nameof(OnConnectionClosed)}({this.isRunning})", this.Context);
  477. #endif
  478. this.isRunning = false;
  479. this.newFrameSignal?.Set();
  480. }
  481. public void OnError(Exception ex)
  482. {
  483. #if ENABLE_LOGGING
  484. HTTPManager.Logger.Exception(nameof(HTTP2ContentConsumer), $"{nameof(OnError)}({this.isRunning}, {ex})", ex, this.Context);
  485. #endif
  486. this.isRunning = false;
  487. this.newFrameSignal?.Set();
  488. }
  489. private double CalculateLatency()
  490. {
  491. if (this.rtts.Count == 0)
  492. return 0;
  493. double sumLatency = 0;
  494. for (int i = 0; i < this.rtts.Count; ++i)
  495. sumLatency += this.rtts[i];
  496. return sumLatency / this.rtts.Count;
  497. }
  498. HTTP2Stream FindStreamById(UInt32 streamId)
  499. {
  500. for (int i = 0; i < this.clientInitiatedStreams.Count; ++i)
  501. {
  502. var stream = this.clientInitiatedStreams[i];
  503. if (stream.Id == streamId)
  504. return stream;
  505. }
  506. return null;
  507. }
  508. public ShutdownTypes ShutdownType { get; private set; }
  509. public void Shutdown(ShutdownTypes type)
  510. {
  511. this.ShutdownType = type;
  512. switch (this.ShutdownType)
  513. {
  514. case ShutdownTypes.Gentle:
  515. this.newFrameSignal.Set();
  516. break;
  517. case ShutdownTypes.Immediate:
  518. this.conn?.TopStream?.Dispose();
  519. break;
  520. }
  521. }
  522. public void Dispose()
  523. {
  524. #if ENABLE_LOGGING
  525. HTTPManager.Logger.Information(nameof(HTTP2ContentConsumer), "Dispose", this.Context);
  526. #endif
  527. while (this.newFrames.TryDequeue(out var frame))
  528. BufferPool.Release(frame.Payload);
  529. foreach (var frame in this.outgoingFrames)
  530. BufferPool.Release(frame.Payload);
  531. this.outgoingFrames.Clear();
  532. HTTPRequest request = null;
  533. while (this.requestQueue.TryDequeue(out request))
  534. {
  535. #if ENABLE_LOGGING
  536. HTTPManager.Logger.Information(nameof(HTTP2ContentConsumer), string.Format("Dispose - Request '{0}' IsCancellationRequested: {1}", request.CurrentUri.ToString(), request.IsCancellationRequested.ToString()), this.Context);
  537. #endif
  538. RequestEventHelper.EnqueueRequestEvent(request.IsCancellationRequested ? new RequestEventInfo(request, HTTPRequestStates.Aborted, null) : new RequestEventInfo(request, RequestEvents.Resend));
  539. }
  540. this.newFrameSignal?.Close();
  541. this.newFrameSignal = null;
  542. }
  543. }
  544. }
  545. #endif