using System; using System.Collections.Generic; using System.Diagnostics; using System.Net.Http; using System.Net.WebSockets; using System.Threading; using System.Threading.Tasks; using SocketIOClient.JsonSerializer; using SocketIOClient.Messages; using SocketIOClient.Transport; using SocketIOClient.UriConverters; namespace SocketIOClient { /// /// socket.io client class /// public class SocketIO : IDisposable { /// /// Create SocketIO object with default options /// /// public SocketIO(string uri) : this(new Uri(uri)) { } /// /// Create SocketIO object with options /// /// public SocketIO(Uri uri) : this(uri, new SocketIOOptions()) { } /// /// Create SocketIO object with options /// /// /// public SocketIO(string uri, SocketIOOptions options) : this(new Uri(uri), options) { } /// /// Create SocketIO object with options /// /// /// public SocketIO(Uri uri, SocketIOOptions options) { ServerUri = uri ?? throw new ArgumentNullException("uri"); Options = options ?? throw new ArgumentNullException("options"); Initialize(); } Uri _serverUri; public Uri ServerUri { get => _serverUri; set { if (_serverUri != value) { _serverUri = value; if (value != null && value.AbsolutePath != "/") { Namespace = value.AbsolutePath; } } } } /// /// An unique identifier for the socket session. Set after the connect event is triggered, and updated after the reconnect event. /// public string Id { get; set; } public string Namespace { get; private set; } /// /// Whether or not the socket is connected to the server. /// public bool Connected { get; private set; } /// /// Gets current attempt of reconnection. /// public int Attempts { get; private set; } /// /// Whether or not the socket is disconnected from the server. /// public bool Disconnected => !Connected; public SocketIOOptions Options { get; } public IJsonSerializer JsonSerializer { get; set; } public IUriConverter UriConverter { get; set; } public HttpClient HttpClient { get; set; } public Func ClientWebSocketProvider { get; set; } private IClientWebSocket _clientWebsocket; public BaseTransport _transport; List _expectedExceptions; int _packetId; bool _isConnectCoreRunning; Uri _realServerUri; Exception _connectCoreException; Dictionary> _ackHandlers; List _onAnyHandlers; Dictionary> _eventHandlers; CancellationTokenSource _connectionTokenSource; double _reconnectionDelay; #region Socket.IO event public event EventHandler OnConnected; //public event EventHandler OnConnectError; //public event EventHandler OnConnectTimeout; public event EventHandler OnError; public event EventHandler OnDisconnected; /// /// Fired upon a successful reconnection. /// public event EventHandler OnReconnected; /// /// Fired upon an attempt to reconnect. /// public event EventHandler OnReconnectAttempt; /// /// Fired upon a reconnection attempt error. /// public event EventHandler OnReconnectError; /// /// Fired when couldn’t reconnect within reconnectionAttempts /// public event EventHandler OnReconnectFailed; public event EventHandler OnPing; public event EventHandler OnPong; #endregion #region Observable Event //Subject _onConnected; //public IObservable ConnectedObservable { get; private set; } #endregion private void Initialize() { _packetId = -1; _ackHandlers = new Dictionary>(); _eventHandlers = new Dictionary>(); _onAnyHandlers = new List(); JsonSerializer = new SystemTextJsonSerializer(); UriConverter = new UriConverter(); HttpClient = new HttpClient(); ClientWebSocketProvider = () => new SystemNetWebSocketsClientWebSocket(Options.EIO); _expectedExceptions = new List { typeof(TimeoutException), typeof(WebSocketException), typeof(HttpRequestException), typeof(OperationCanceledException), typeof(TaskCanceledException) }; } private async Task CreateTransportAsync() { Options.Transport = await GetProtocolAsync(); if (Options.Transport == TransportProtocol.Polling) { HttpPollingHandler handler; if (Options.EIO == 3) handler = new Eio3HttpPollingHandler(HttpClient); else handler = new Eio4HttpPollingHandler(HttpClient); _transport = new HttpTransport(HttpClient, handler, Options, JsonSerializer); } else { _clientWebsocket = ClientWebSocketProvider(); _transport = new WebSocketTransport(_clientWebsocket, Options, JsonSerializer); } _transport.Namespace = Namespace; SetHeaders(); } private void SetHeaders() { if (Options.ExtraHeaders != null) { foreach (var item in Options.ExtraHeaders) { _transport.AddHeader(item.Key, item.Value); } } } private void SyncExceptionToMain(Exception e) { _connectCoreException = e; _isConnectCoreRunning = false; } private void ConnectCore() { DisposeForReconnect(); _reconnectionDelay = Options.ReconnectionDelay; _connectionTokenSource = new CancellationTokenSource(); var cct = _connectionTokenSource.Token; _isConnectCoreRunning = true; _connectCoreException = null; Task.Factory.StartNew(async () => { while (true) { _clientWebsocket?.Dispose(); _transport?.Dispose(); CreateTransportAsync().Wait(); _realServerUri = UriConverter.GetServerUri(Options.Transport == TransportProtocol.WebSocket, ServerUri, Options.EIO, Options.Path, Options.Query); try { if (cct.IsCancellationRequested) break; if (Attempts > 0) OnReconnectAttempt?.Invoke(this, Attempts); var timeoutCts = new CancellationTokenSource(Options.ConnectionTimeout); _transport.Subscribe(OnMessageReceived, OnErrorReceived); await _transport.ConnectAsync(_realServerUri, timeoutCts.Token).ConfigureAwait(false); break; } catch (Exception e) { if (_expectedExceptions.Contains(e.GetType())) { if (!Options.Reconnection) { SyncExceptionToMain(e); throw; } if (Attempts > 0) { OnReconnectError?.Invoke(this, e); } Attempts++; if (Attempts <= Options.ReconnectionAttempts) { if (_reconnectionDelay < Options.ReconnectionDelayMax) { _reconnectionDelay += 2 * Options.RandomizationFactor; } if (_reconnectionDelay > Options.ReconnectionDelayMax) { _reconnectionDelay = Options.ReconnectionDelayMax; } await Task.Delay((int)_reconnectionDelay).ConfigureAwait(false); } else { OnReconnectFailed?.Invoke(this, EventArgs.Empty); break; } } else { SyncExceptionToMain(e); throw; } } } _isConnectCoreRunning = false; }); } private async Task GetProtocolAsync() { if (Options.Transport == TransportProtocol.Polling && Options.AutoUpgrade) { Uri uri = UriConverter.GetServerUri(false, ServerUri, Options.EIO, Options.Path, Options.Query); try { string text = await HttpClient.GetStringAsync(uri); if (text.Contains("websocket")) { return TransportProtocol.WebSocket; } } catch { } } return Options.Transport; } public async Task ConnectAsync() { ConnectCore(); while (_isConnectCoreRunning) { await Task.Delay(20); } if (_connectCoreException != null) { throw _connectCoreException; } } private void PingHandler() { OnPing?.Invoke(this, EventArgs.Empty); } private void PongHandler(PongMessage msg) { OnPong?.Invoke(this, msg.Duration); } private void ConnectedHandler(ConnectedMessage msg) { Id = msg.Sid; Connected = true; OnConnected?.Invoke(this, EventArgs.Empty); if (Attempts > 0) { OnReconnected?.Invoke(this, Attempts); } Attempts = 0; } private void DisconnectedHandler() { InvokeDisconnect(DisconnectReason.IOServerDisconnect); } private void EventMessageHandler(EventMessage m) { var res = new SocketIOResponse(m.JsonElements, this) { PacketId = m.Id }; foreach (var item in _onAnyHandlers) { try { item(m.Event, res); } catch (Exception e) { Debug.WriteLine(e); } } if (_eventHandlers.ContainsKey(m.Event)) { try { _eventHandlers[m.Event](res); } catch (Exception e) { Debug.WriteLine(e); } } } private void AckMessageHandler(ClientAckMessage m) { if (_ackHandlers.ContainsKey(m.Id)) { var res = new SocketIOResponse(m.JsonElements, this); try { _ackHandlers[m.Id](res); } finally { _ackHandlers.Remove(m.Id); } } } private void ErrorMessageHandler(ErrorMessage msg) { OnError?.Invoke(this, msg.Message); } private void BinaryMessageHandler(BinaryMessage msg) { if (_eventHandlers.ContainsKey(msg.Event)) { try { var response = new SocketIOResponse(msg.JsonElements, this) { PacketId = msg.Id }; response.InComingBytes.AddRange(msg.IncomingBytes); _eventHandlers[msg.Event](response); } catch (Exception e) { Debug.WriteLine(e); } } } private void BinaryAckMessageHandler(ClientBinaryAckMessage msg) { if (_ackHandlers.ContainsKey(msg.Id)) { try { var response = new SocketIOResponse(msg.JsonElements, this) { PacketId = msg.Id, }; response.InComingBytes.AddRange(msg.IncomingBytes); _ackHandlers[msg.Id](response); } catch (Exception e) { Debug.WriteLine(e); } } } private void OnErrorReceived(Exception ex) { InvokeDisconnect(DisconnectReason.TransportClose); } private void OnMessageReceived(IMessage msg) { try { switch (msg.Type) { case MessageType.Ping: PingHandler(); break; case MessageType.Pong: PongHandler(msg as PongMessage); break; case MessageType.Connected: ConnectedHandler(msg as ConnectedMessage); break; case MessageType.Disconnected: DisconnectedHandler(); break; case MessageType.EventMessage: EventMessageHandler(msg as EventMessage); break; case MessageType.AckMessage: AckMessageHandler(msg as ClientAckMessage); break; case MessageType.ErrorMessage: ErrorMessageHandler(msg as ErrorMessage); break; case MessageType.BinaryMessage: BinaryMessageHandler(msg as BinaryMessage); break; case MessageType.BinaryAckMessage: BinaryAckMessageHandler(msg as ClientBinaryAckMessage); break; } } catch (Exception e) { Debug.WriteLine(e); } } public async Task DisconnectAsync() { if (Connected) { var msg = new DisconnectedMessage { Namespace = Namespace }; try { await _transport.SendAsync(msg, CancellationToken.None).ConfigureAwait(false); } catch (Exception e) { Debug.WriteLine(e); } InvokeDisconnect(DisconnectReason.IOClientDisconnect); } } /// /// Register a new handler for the given event. /// /// /// public void On(string eventName, Action callback) { if (_eventHandlers.ContainsKey(eventName)) { _eventHandlers.Remove(eventName); } _eventHandlers.Add(eventName, callback); } /// /// Unregister a new handler for the given event. /// /// public void Off(string eventName) { if (_eventHandlers.ContainsKey(eventName)) { _eventHandlers.Remove(eventName); } } public void OnAny(OnAnyHandler handler) { if (handler != null) { _onAnyHandlers.Add(handler); } } public void PrependAny(OnAnyHandler handler) { if (handler != null) { _onAnyHandlers.Insert(0, handler); } } public void OffAny(OnAnyHandler handler) { if (handler != null) { _onAnyHandlers.Remove(handler); } } public OnAnyHandler[] ListenersAny() => _onAnyHandlers.ToArray(); internal async Task ClientAckAsync(int packetId, CancellationToken cancellationToken, params object[] data) { IMessage msg; if (data != null && data.Length > 0) { var result = JsonSerializer.Serialize(data); if (result.Bytes.Count > 0) { msg = new ServerBinaryAckMessage { Id = packetId, Namespace = Namespace, Json = result.Json }; msg.OutgoingBytes = new List(result.Bytes); } else { msg = new ServerAckMessage { Namespace = Namespace, Id = packetId, Json = result.Json }; } } else { msg = new ServerAckMessage { Namespace = Namespace, Id = packetId }; } await _transport.SendAsync(msg, cancellationToken).ConfigureAwait(false); } /// /// Emits an event to the socket /// /// /// Any other parameters can be included. All serializable datastructures are supported, including byte[] /// public async Task EmitAsync(string eventName, params object[] data) { await EmitAsync(eventName, CancellationToken.None, data).ConfigureAwait(false); } public async Task EmitAsync(string eventName, CancellationToken cancellationToken, params object[] data) { if (data != null && data.Length > 0) { var result = JsonSerializer.Serialize(data); if (result.Bytes.Count > 0) { var msg = new BinaryMessage { Namespace = Namespace, OutgoingBytes = new List(result.Bytes), Event = eventName, Json = result.Json }; await _transport.SendAsync(msg, cancellationToken).ConfigureAwait(false); } else { var msg = new EventMessage { Namespace = Namespace, Event = eventName, Json = result.Json }; await _transport.SendAsync(msg, cancellationToken).ConfigureAwait(false); } } else { var msg = new EventMessage { Namespace = Namespace, Event = eventName }; await _transport.SendAsync(msg, cancellationToken).ConfigureAwait(false); } } /// /// Emits an event to the socket /// /// /// will be called with the server answer. /// Any other parameters can be included. All serializable datastructures are supported, including byte[] /// public async Task EmitAsync(string eventName, Action ack, params object[] data) { await EmitAsync(eventName, CancellationToken.None, ack, data).ConfigureAwait(false); } public async Task EmitAsync(string eventName, CancellationToken cancellationToken, Action ack, params object[] data) { _ackHandlers.Add(++_packetId, ack); if (data != null && data.Length > 0) { var result = JsonSerializer.Serialize(data); if (result.Bytes.Count > 0) { var msg = new ClientBinaryAckMessage { Event = eventName, Namespace = Namespace, Json = result.Json, Id = _packetId, OutgoingBytes = new List(result.Bytes) }; await _transport.SendAsync(msg, cancellationToken).ConfigureAwait(false); } else { var msg = new ClientAckMessage { Event = eventName, Namespace = Namespace, Id = _packetId, Json = result.Json }; await _transport.SendAsync(msg, cancellationToken).ConfigureAwait(false); } } else { var msg = new ClientAckMessage { Event = eventName, Namespace = Namespace, Id = _packetId }; await _transport.SendAsync(msg, cancellationToken).ConfigureAwait(false); } } private async void InvokeDisconnect(string reason) { if (Connected) { Connected = false; OnDisconnected?.Invoke(this, reason); try { await _transport.DisconnectAsync(CancellationToken.None).ConfigureAwait(false); } catch { } if (reason != DisconnectReason.IOServerDisconnect && reason != DisconnectReason.IOClientDisconnect) { //In the this cases (explicit disconnection), the client will not try to reconnect and you need to manually call socket.connect(). if (Options.Reconnection) { ConnectCore(); } } } } public void AddExpectedException(Type type) { if (!_expectedExceptions.Contains(type)) { _expectedExceptions.Add(type); } } private void DisposeForReconnect() { _packetId = -1; _ackHandlers.Clear(); if (_connectionTokenSource != null) { _connectionTokenSource.Cancel(); _connectionTokenSource.Dispose(); } } public void Dispose() { HttpClient.Dispose(); _transport.Dispose(); _ackHandlers.Clear(); _onAnyHandlers.Clear(); _eventHandlers.Clear(); _connectionTokenSource.Cancel(); _connectionTokenSource.Dispose(); } } }