【问题标题】:Observer OnError delegate is not called multiple times观察者 OnError 委托未被多次调用
【发布时间】:2019-01-06 15:41:28
【问题描述】:

您好,我想了解为什么 OnError 不会每次都被 Observer 调用。 当尝试调试在我的IOBservable 实现中调用的OnError 时,调试器会通过。

可观察实施

public class Reactive:IStorage,IObservable<SquareResult>
        {
            private object @lock = new object();
            private List<IObserver<SquareResult>> observers = new List<IObserver<SquareResult>>();


            public static Reactive Create()
            {
                return new Reactive();
            }

            public void Enqueue(SquareResult square)
            {
                lock (@lock)
                {
                    foreach (var item in observers)
                    {
                        if (square.Result < 0)
                        {
                            item.OnError(new InvalidSquareException());
                        }
                        else
                        item.OnNext(square);
                    }
                }
            }
            public void EndStoring()
            {
                this.observers.ForEach(obs => obs.OnCompleted());
            }

            public IDisposable Subscribe(IObserver<SquareResult> observer)
            {
                if (!this.observers.Contains(observer))
                {
                    this.observers.Add(observer);
                }
                return new Unsubscriber(observer, this.observers);
            }

            public class Unsubscriber : IDisposable
            {
                private List<IObserver<SquareResult>> observers;
                private IObserver<SquareResult> observer;

                public Unsubscriber(IObserver<SquareResult>obs,List<IObserver<SquareResult>>observers)
                {
                    this.observer = obs;
                    this.observers = observers;
                }

                public void Dispose()
                {
                    if (this.observer == null)
                    {
                        return;
                    }
                    this.observers.Remove(this.observer);
                }
            }
        }

生产者调用的唯一方法是Enqueue

然后我们有一个服务,其中consumerIObservable 获取数据并通过套接字发送:

消费者

class ProgressService
    {
        private IStorage observable;
        public ProgressService(IStorage storage)
        {
            this.observable = storage;
        }

        public async Task NotifyAsync(WebSocket socket)
        {

            byte[] buffer = ArrayPool<byte>.Shared.Rent(1024);
            CancellationTokenSource cancelSignal = new CancellationTokenSource();
            this.observable.Subscribe(async (next) =>
            {

                try
                {
                    ReadOnlyMemory<byte> data = next.Encode();
                    await socket.SendAsync(data, WebSocketMessageType.Text, true, CancellationToken.None);
                }
                catch (Exception)
                {
                    if (socket.State == WebSocketState.Open)
                    {

                        await socket.CloseAsync(WebSocketCloseStatus.InternalServerError, "Threw on data receive", CancellationToken.None);
                    }
                    return;
                }
            }, async(ex) =>
            {
                 //!! this delegate does not get called everytime the observable calls OnError(exception)
                await socket.SendAsync($"Exception :{ex.Message}".Encode().ToArray(), WebSocketMessageType.Text, true, CancellationToken.None);
            }, async () =>
            {
                await socket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Finished test", CancellationToken.None);
            }, cancelSignal.Token);

            await socket.ReceiveAsync(buffer, CancellationToken.None);
            cancelSignal.Cancel();
        }


    }

所以基本上发生的情况是,如果我从某个地方调用 myobservable.OnError(new exception()) 几次,consumer 的回调只被调用一次,我不明白为什么。

示例

observable.OnError(new Exception()); observable.OnError(new Exception());

Observer.OnError 实现

public void OnError(Exception ex)
{
  Console.WriteLine(ex.Message);
 }

我的代码中的上述示例将只打印一次异常消息。

【问题讨论】:

    标签: .net-core observable system.reactive


    【解决方案1】:

    'Observable contract' 允许 0 个或多个 OnNext 通知,然后是一个 OnCompleted 或一个 OnError 通知,它们会终止 observable。一旦 observable 终止(在您的情况下是第一个 OnError),就不会发出更多通知,这意味着不会再调用任何委托。

    【讨论】:

    • 哦,好吧,但是如果管道中有一个错误(OnNext 被调用),有什么方法可以不终止 observable 吗?
    • 不是真的。有像CatchRetry 这样的运算符,可以让您对OnError 做出反应并有效地重新连接。我建议阅读introtorx.com,特别是this section 了解更多信息。
    • 允许出现多个错误的最简单方法是以IObservable&lt;IObservable&lt;T&gt;&gt; 开头并执行以下操作:source.Select(inner =&gt; inner.Materialize()).Merge()。然后你就可以处理多个错误了。
    猜你喜欢
    • 2012-05-23
    • 1970-01-01
    • 2015-11-05
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多