【发布时间】:2021-12-27 10:43:30
【问题描述】:
是否可以将 Rx.Net 中的 Using 运算符与实现 IAsyncDisposable 而不是 IDisposable 的资源一起使用?如果没有,我可以使用某种解决方法吗?
【问题讨论】:
标签: c# .net reactive-programming system.reactive rx.net
是否可以将 Rx.Net 中的 Using 运算符与实现 IAsyncDisposable 而不是 IDisposable 的资源一起使用?如果没有,我可以使用某种解决方法吗?
【问题讨论】:
标签: c# .net reactive-programming system.reactive rx.net
这是一个适用于IAsyncDisposable 对象的Using 方法:
/// <summary>
/// Constructs an observable sequence that depends on a resource object,
/// whose lifetime is tied to the resulting observable sequence's lifetime.
/// </summary>
public static IObservable<TResult> Using<TResult, TResource>(
Func<TResource> resourceFactory,
Func<TResource, IObservable<TResult>> observableFactory)
where TResource : IAsyncDisposable
{
return Observable.Defer(() =>
{
TResource resource = resourceFactory();
IObservable<TResult> observable;
try { observable = observableFactory(resource); }
catch (Exception ex) { observable = Observable.Throw<TResult>(ex); }
Lazy<Task> lazyDisposeTask = new(() => resource.DisposeAsync().AsTask());
IObservable<TResult> disposer = Observable
.FromAsync(() => lazyDisposeTask.Value)
.Select(_ => default(TResult))
.IgnoreElements();
return observable
.Catch((Exception ex) => disposer.Concat(Observable.Throw<TResult>(ex)))
.Concat(disposer)
.Finally(() => lazyDisposeTask.Value.GetAwaiter().GetResult());
});
}
此方法与 Rx Observable.Using 方法具有相同的签名(where 子句除外),并且可以以相同的方式使用。
此实现处理所有完成情况:
IAsyncDisposable 资源由 Concat 运算符异步处理。IAsyncDisposable 资源由 Catch 运算符异步处理。IAsyncDisposable 资源由Finally 运算符同步处理。在这种情况下,无法异步处理资源,原因已解释为 here。带有异步工厂方法的变体:
public static IObservable<TResult> Using<TResult, TResource>(
Func<CancellationToken, Task<TResource>> resourceFactoryAsync,
Func<TResource, CancellationToken, Task<IObservable<TResult>>> observableFactoryAsync)
where TResource : IAsyncDisposable
{
return Observable.Create<TResult>(async (observer, cancellationToken) =>
{
TResource resource = await resourceFactoryAsync(cancellationToken);
IObservable<TResult> observable;
try { observable = await observableFactoryAsync(resource, cancellationToken); }
catch { await resource.DisposeAsync(); throw; }
Lazy<Task> lazyDisposeTask = new(() => resource.DisposeAsync().AsTask());
IObservable<TResult> disposer = Observable
.FromAsync(() => lazyDisposeTask.Value)
.Select(_ => default(TResult))
.IgnoreElements();
return observable
.Catch((Exception ex) => disposer.Concat(Observable.Throw<TResult>(ex)))
.Concat(disposer)
.Finally(() => lazyDisposeTask.Value.GetAwaiter().GetResult())
.Subscribe(observer);
});
}
【讨论】:
Lazy<Task> 而不是Func<Task>?
Lazy<Task> 确保 DisposeAsync 只会被调用一次。大多数现实世界的一次性用品都可以容忍多次处理,但最好是安全而不是抱歉。 :-)
Using 方法的实现中的错误。