【发布时间】:2017-11-17 20:56:00
【问题描述】:
Intro To RX 一书将 OnSubscribe 上的返回值描述为 IDisposible,并指出在调用 OnError 和 OnCompleted 时应处理订阅。
要考虑的一个有趣的事情是,当一个序列完成或 错误,您仍然应该处理您的订阅。
这是为什么?
作为参考,这是我目前正在学习的课程。我可能会在某个时候将其提交给代码审查。
using System;
using System.Reactive;
using System.Reactive.Linq;
using System.Reactive.Subjects;
/// <summary>
/// Provides a timeout mechanism that will not timeout if it is signalled often enough
/// </summary>
internal class TrafficTimeout
{
private readonly Action onTimeout;
private object signalLock = new object();
private IObserver<Unit> signals;
/// <summary>
/// Initialises a new instance of the <see cref="TrafficTimeout"/> class.
/// </summary>
/// <param name="timeout">The duration to wait after receiving signals before timing out.</param>
/// <param name="onTimeout">The <see cref="Action"/> to perform when the the timeout duration expires.</param>
public TrafficTimeout(TimeSpan timeout, Action onTimeout)
{
// Subscribe to a throttled observable to trigger the expirey
var messageQueue = new BehaviorSubject<Unit>(Unit.Default);
IDisposable subscription = null;
subscription = messageQueue.Throttle(timeout).Subscribe(
p =>
{
messageQueue.OnCompleted();
messageQueue.Dispose();
});
this.signals = messageQueue.AsObserver();
this.onTimeout = onTimeout;
}
/// <summary>
/// Signals that traffic has been received.
/// </summary>
public void Signal()
{
lock (this.signalLock)
{
this.signals.OnNext(Unit.Default);
}
}
}
【问题讨论】:
标签: system.reactive reactive-programming idisposable reactive object-lifetime