#if !BESTHTTP_DISABLE_SIGNALR_CORE using System; using BestHTTP; using BestHTTP.Futures; using BestHTTP.SignalRCore.Messages; namespace BestHTTP.SignalRCore { public interface IUPloadItemController : IDisposable { string[] StreamingIDs { get; } HubConnection Hub { get; } void UploadParam(string streamId, T item); void Cancel(); } public sealed class DownStreamItemController : IFuture, IDisposable { public readonly long invocationId; public readonly HubConnection hubConnection; public readonly IFuture future; public FutureState state { get { return this.future.state; } } public TResult value { get { return this.future.value; } } public Exception error { get { return this.future.error; } } public bool IsCanceled { get; private set; } public DownStreamItemController(HubConnection hub, long iId, IFuture future) { this.hubConnection = hub; this.invocationId = iId; this.future = future; } public void Cancel() { if (this.IsCanceled) return; this.IsCanceled = true; Message message = new Message { type = MessageTypes.CancelInvocation, invocationId = this.invocationId.ToString() }; this.hubConnection.SendMessage(message); } public void Dispose() { GC.SuppressFinalize(this); Cancel(); } public IFuture OnItem(FutureValueCallback callback) { return this.future.OnItem(callback); } public IFuture OnSuccess(FutureValueCallback callback) { return this.future.OnSuccess(callback); } public IFuture OnError(FutureErrorCallback callback) { return this.future.OnError(callback); } public IFuture OnComplete(FutureCallback callback) { return this.future.OnComplete(callback); } } public sealed class UpStreamItemController : IUPloadItemController, IFuture { public readonly long invocationId; public readonly string[] streamingIds; public readonly HubConnection hubConnection; public readonly Futures.IFuture future; public string[] StreamingIDs { get { return this.streamingIds; } } public HubConnection Hub { get { return this.hubConnection; } } public FutureState state { get { return this.future.state; } } public TResult value { get { return this.future.value; } } public Exception error { get { return this.future.error; } } public bool IsFinished { get; private set; } public bool IsCanceled { get; private set; } private object[] streams; public UpStreamItemController(HubConnection hub, long iId, string[] sIds, IFuture future) { this.hubConnection = hub; this.invocationId = iId; this.streamingIds = sIds; this.streams = new object[this.streamingIds.Length]; this.future = future; } public UploadChannel GetUploadChannel(int paramIdx) { var stream = this.streams[paramIdx] as UploadChannel; if (stream == null) this.streams[paramIdx] = stream = new UploadChannel(this, paramIdx); return stream; } public void UploadParam(string streamId, T item) { if (streamId == null) return; var message = new Message { type = MessageTypes.StreamItem, invocationId = streamId.ToString(), item = item, }; this.hubConnection.SendMessage(message); } public void Finish() { if (!this.IsFinished) { this.IsFinished = true; for (int i = 0; i < this.streamingIds.Length; ++i) if (this.streamingIds[i] != null) { var message = new Message { type = MessageTypes.Completion, invocationId = this.streamingIds[i].ToString() }; this.hubConnection.SendMessage(message); } } } public void Cancel() { if (!this.IsFinished && !this.IsCanceled) { this.IsCanceled = true; var message = new Message { type = MessageTypes.CancelInvocation, invocationId = this.invocationId.ToString(), }; this.hubConnection.SendMessage(message); // Zero out the streaming ids, disabling any future message sending Array.Clear(this.streamingIds, 0, this.streamingIds.Length); // If it's also a down-stream, set it canceled. var itemContainer = (this.future.value as StreamItemContainer); if (itemContainer != null) itemContainer.IsCanceled = true; } } void IDisposable.Dispose() { GC.SuppressFinalize(this); Finish(); } public IFuture OnItem(FutureValueCallback callback) { return this.future.OnItem(callback); } public IFuture OnSuccess(FutureValueCallback callback) { return this.future.OnSuccess(callback); } public IFuture OnError(FutureErrorCallback callback) { return this.future.OnError(callback); } public IFuture OnComplete(FutureCallback callback) { return this.future.OnComplete(callback); } } /// /// An upload channel that represents one prameter of a client callable function. It implements the IDisposable /// interface and calls Finish from the Dispose method. /// public sealed class UploadChannel : IDisposable { /// /// The associated upload controller /// public IUPloadItemController Controller { get; private set; } /// /// What parameter is bound to. /// public int ParamIdx { get; private set; } /// /// Returns true if Finish() or Cancel() is already called. /// public bool IsFinished { get { return this.Controller.StreamingIDs[this.ParamIdx] == null; } private set { if (value) this.Controller.StreamingIDs[this.ParamIdx] = null; } } /// /// The unique generated id of this parameter channel. /// public string StreamingId { get { return this.Controller.StreamingIDs[this.ParamIdx]; } } internal UploadChannel(IUPloadItemController ctrl, int paramIdx) { this.Controller = ctrl; this.ParamIdx = paramIdx; } /// /// Uploads a parameter value to the server. /// public void Upload(T item) { string streamId = this.StreamingId; if (streamId != null) this.Controller.UploadParam(streamId, item); } /// /// Calling this function cancels the call itself, not just a parameter upload channel. /// public void Cancel() { if (!this.IsFinished) { // Cancel all upload stream, cancel will also set streaming ids to 0. this.Controller.Cancel(); } } /// /// Finishes the channel by telling the server that no more uplode items will follow. /// public void Finish() { if (!this.IsFinished) { string streamId = this.StreamingId; if (streamId != null) { // this will set the streaming id to 0 this.IsFinished = true; var message = new Message { type = MessageTypes.Completion, invocationId = streamId.ToString() }; this.Controller.Hub.SendMessage(message); } } } void IDisposable.Dispose() { if (!this.IsFinished) Finish(); GC.SuppressFinalize(this); } } } #endif