【发布时间】:2020-12-05 18:17:46
【问题描述】:
我有一个IObservable,它会生成一次性物品,并且在其生命周期内可能会生成无限数量的物品。因此,我想在每次生成新项目时处理最后一个项目,因此 Using 运算符不适用于此。有没有不同的 Rx.NET 算子可以完成这个功能?
【问题讨论】:
标签: c# reactive-programming system.reactive
我有一个IObservable,它会生成一次性物品,并且在其生命周期内可能会生成无限数量的物品。因此,我想在每次生成新项目时处理最后一个项目,因此 Using 运算符不适用于此。有没有不同的 Rx.NET 算子可以完成这个功能?
【问题讨论】:
标签: c# reactive-programming system.reactive
如果您有IObservable<IDisposable> source,那么执行此操作以自动处理先前的值并在序列结束时进行清理:
IObservable<IDisposable> query =
Observable.Create<IDisposable>(o =>
{
var serial = new SerialDisposable();
return new CompositeDisposable(
source.Do(x => serial.Disposable = x).Subscribe(o),
serial);
})
【讨论】:
这是一个 DisposePrevious 运算符,基于 Enigmativity 的 solution 的略微修改版本。
/// <summary>Disposes the previous element of an observable sequence. The last
/// element is disposed when the observable sequence completes.</summary>
public static IObservable<T> DisposePrevious<T>(this IObservable<T> source)
where T : IDisposable
{
return Observable.Using(() => new SerialDisposable(), serial =>
source.Do(x => serial.Disposable = x));
}
SerialDisposable 类...
表示一个一次性资源,其底层一次性资源可以被另一个一次性资源替换,从而导致前一个底层一次性资源的自动处置。
【讨论】:
我遇到了 Shlomo 的 this answer 并将其改编为我自己的目的:
public class DisposerProperty<T> : IDisposable, IObservable<T> where T : IDisposable
{
private IDisposable Subscription { get; }
private IObservable<T> Source { get; }
public T Value { get; private set; }
public DisposerProperty(IObservable<T> source, T defaultValue = default)
{
Value = defaultValue;
Source = source;
Subscription = source.Subscribe(t =>
{
Value?.Dispose();
Value = t;
});
}
public void Dispose() => Subscription.Dispose();
/// <inheritdoc />
public IDisposable Subscribe(IObserver<T> observer) => Source.Subscribe(observer);
}
现在,每当我想要这个功能时,我只需使用 DisposerProperty<T> 而不是直接订阅 observable。
【讨论】: