using System; using System.Collections.Generic; using System.Diagnostics; using System.Reactive.Subjects; using System.Threading; using System.Threading.Tasks; using SocketIOClient.JsonSerializer; using SocketIOClient.Messages; using SocketIOClient.UriConverters; namespace SocketIOClient.Transport { public abstract class BaseTransport : IObserver, IObserver, IObservable, IDisposable { public BaseTransport(SocketIOOptions options, IJsonSerializer jsonSerializer) { Options = options; MessageSubject = new Subject(); JsonSerializer = jsonSerializer; UriConverter = new UriConverter(); _messageQueue = new Queue(); } DateTime _pingTime; readonly Queue _messageQueue; protected SocketIOOptions Options { get; } protected Subject MessageSubject { get; } protected IJsonSerializer JsonSerializer { get; } protected CancellationTokenSource PingTokenSource { get; private set; } protected OpenedMessage OpenedMessage { get; private set; } public string Namespace { get; set; } public IUriConverter UriConverter { get; set; } public async Task SendAsync(IMessage msg, CancellationToken cancellationToken) { msg.Eio = Options.EIO; msg.Protocol = Options.Transport; var payload = new Payload { Text = msg.Write() }; if (msg.OutgoingBytes != null) { payload.Bytes = msg.OutgoingBytes; } await SendAsync(payload, cancellationToken).ConfigureAwait(false); } protected virtual async Task OpenAsync(OpenedMessage msg) { OpenedMessage = msg; if (Options.EIO == 3 && string.IsNullOrEmpty(Namespace)) { return; } var connectMsg = new ConnectedMessage { Namespace = Namespace, Eio = Options.EIO, Query = Options.Query, }; if (Options.EIO == 4) { if (Options.Auth != null) { connectMsg.AuthJsonStr = JsonSerializer.Serialize(new[] { Options.Auth }).Json.TrimStart('[').TrimEnd(']'); } } for (int i = 1; i <= 3; i++) { try { await SendAsync(connectMsg, CancellationToken.None).ConfigureAwait(false); break; } catch (Exception e) { if (i == 3) OnError(e); else await Task.Delay(TimeSpan.FromMilliseconds(Math.Pow(2, i) * 100)); } } } /// /// Eio3 ping is sent by the client /// Eio4 ping is sent by the server /// /// private void StartPing(CancellationToken cancellationToken) { Debug.WriteLine($"[Ping] Interval: {OpenedMessage.PingInterval}"); Task.Factory.StartNew(async () => { while (!cancellationToken.IsCancellationRequested) { await Task.Delay(OpenedMessage.PingInterval); if (cancellationToken.IsCancellationRequested) { break; } try { var ping = new PingMessage(); Debug.WriteLine($"[Ping] Sending"); await SendAsync(ping, CancellationToken.None).ConfigureAwait(false); Debug.WriteLine($"[Ping] Has been sent"); _pingTime = DateTime.Now; MessageSubject.OnNext(ping); } catch (Exception e) { Debug.WriteLine($"[Ping] Failed to send, {e.Message}"); MessageSubject.OnError(e); break; } } }, TaskCreationOptions.LongRunning); } public abstract Task ConnectAsync(Uri uri, CancellationToken cancellationToken); public abstract Task DisconnectAsync(CancellationToken cancellationToken); public abstract void AddHeader(string key, string val); public virtual void Dispose() { MessageSubject.Dispose(); _messageQueue.Clear(); if (PingTokenSource != null) { PingTokenSource.Cancel(); PingTokenSource.Dispose(); } } public abstract Task SendAsync(Payload payload, CancellationToken cancellationToken); public void OnCompleted() { throw new NotImplementedException(); } public void OnError(Exception error) { MessageSubject.OnError(error); } public void OnNext(string text) { Debug.WriteLine($"[Receive] {text}"); var msg = MessageFactory.CreateMessage(Options.EIO, text); if (msg == null) { return; } if (msg.BinaryCount > 0) { msg.IncomingBytes = new List(msg.BinaryCount); _messageQueue.Enqueue(msg); return; } if (msg.Type == MessageType.Opened) { OpenAsync(msg as OpenedMessage).ConfigureAwait(false); } if (Options.EIO == 3) { if (msg.Type == MessageType.Connected) { var connectMsg = msg as ConnectedMessage; connectMsg.Sid = OpenedMessage.Sid; if ((string.IsNullOrEmpty(Namespace) && string.IsNullOrEmpty(connectMsg.Namespace)) || connectMsg.Namespace == Namespace) { if (PingTokenSource != null) { PingTokenSource.Cancel(); } PingTokenSource = new CancellationTokenSource(); StartPing(PingTokenSource.Token); } else { return; } } else if (msg.Type == MessageType.Pong) { var pong = msg as PongMessage; pong.Duration = DateTime.Now - _pingTime; } } MessageSubject.OnNext(msg); if (msg.Type == MessageType.Ping) { _pingTime = DateTime.Now; try { SendAsync(new PongMessage(), CancellationToken.None).ConfigureAwait(false); MessageSubject.OnNext(new PongMessage { Eio = Options.EIO, Protocol = Options.Transport, Duration = DateTime.Now - _pingTime }); } catch (Exception e) { OnError(e); } } } public void OnNext(byte[] bytes) { Debug.WriteLine($"[Receive] binary message"); if (_messageQueue.Count > 0) { var msg = _messageQueue.Peek(); msg.IncomingBytes.Add(bytes); if (msg.IncomingBytes.Count == msg.BinaryCount) { MessageSubject.OnNext(msg); _messageQueue.Dequeue(); } } } public IDisposable Subscribe(IObserver observer) { return MessageSubject.Subscribe(observer); } } }