【发布时间】:2019-01-06 15:41:28
【问题描述】:
您好,我想了解为什么 OnError 不会每次都被 Observer 调用。
当尝试调试在我的IOBservable 实现中调用的OnError 时,调试器会通过。
可观察实施
public class Reactive:IStorage,IObservable<SquareResult>
{
private object @lock = new object();
private List<IObserver<SquareResult>> observers = new List<IObserver<SquareResult>>();
public static Reactive Create()
{
return new Reactive();
}
public void Enqueue(SquareResult square)
{
lock (@lock)
{
foreach (var item in observers)
{
if (square.Result < 0)
{
item.OnError(new InvalidSquareException());
}
else
item.OnNext(square);
}
}
}
public void EndStoring()
{
this.observers.ForEach(obs => obs.OnCompleted());
}
public IDisposable Subscribe(IObserver<SquareResult> observer)
{
if (!this.observers.Contains(observer))
{
this.observers.Add(observer);
}
return new Unsubscriber(observer, this.observers);
}
public class Unsubscriber : IDisposable
{
private List<IObserver<SquareResult>> observers;
private IObserver<SquareResult> observer;
public Unsubscriber(IObserver<SquareResult>obs,List<IObserver<SquareResult>>observers)
{
this.observer = obs;
this.observers = observers;
}
public void Dispose()
{
if (this.observer == null)
{
return;
}
this.observers.Remove(this.observer);
}
}
}
生产者调用的唯一方法是Enqueue。
然后我们有一个服务,其中consumer 从IObservable 获取数据并通过套接字发送:
消费者
class ProgressService
{
private IStorage observable;
public ProgressService(IStorage storage)
{
this.observable = storage;
}
public async Task NotifyAsync(WebSocket socket)
{
byte[] buffer = ArrayPool<byte>.Shared.Rent(1024);
CancellationTokenSource cancelSignal = new CancellationTokenSource();
this.observable.Subscribe(async (next) =>
{
try
{
ReadOnlyMemory<byte> data = next.Encode();
await socket.SendAsync(data, WebSocketMessageType.Text, true, CancellationToken.None);
}
catch (Exception)
{
if (socket.State == WebSocketState.Open)
{
await socket.CloseAsync(WebSocketCloseStatus.InternalServerError, "Threw on data receive", CancellationToken.None);
}
return;
}
}, async(ex) =>
{
//!! this delegate does not get called everytime the observable calls OnError(exception)
await socket.SendAsync($"Exception :{ex.Message}".Encode().ToArray(), WebSocketMessageType.Text, true, CancellationToken.None);
}, async () =>
{
await socket.CloseAsync(WebSocketCloseStatus.NormalClosure, "Finished test", CancellationToken.None);
}, cancelSignal.Token);
await socket.ReceiveAsync(buffer, CancellationToken.None);
cancelSignal.Cancel();
}
}
所以基本上发生的情况是,如果我从某个地方调用 myobservable.OnError(new exception()) 几次,consumer 的回调只被调用一次,我不明白为什么。
示例:
observable.OnError(new Exception());
observable.OnError(new Exception());
Observer.OnError 实现
public void OnError(Exception ex)
{
Console.WriteLine(ex.Message);
}
我的代码中的上述示例将只打印一次异常消息。
【问题讨论】:
标签: .net-core observable system.reactive