【问题标题】:Is it possible to use Rx "Using" operator with IAsyncDisposable?是否可以将 Rx“使用”运算符与 IAsyncDisposable 一起使用?
【发布时间】:2021-12-27 10:43:30
【问题描述】:

是否可以将 Rx.Net 中的 Using 运算符与实现 IAsyncDisposable 而不是 IDisposable 的资源一起使用?如果没有,我可以使用某种解决方法吗?

【问题讨论】:

    标签: c# .net reactive-programming system.reactive rx.net


    【解决方案1】:

    这是一个适用于IAsyncDisposable 对象的Using 方法:

    /// <summary>
    /// Constructs an observable sequence that depends on a resource object,
    /// whose lifetime is tied to the resulting observable sequence's lifetime.
    /// </summary>
    public static IObservable<TResult> Using<TResult, TResource>(
        Func<TResource> resourceFactory,
        Func<TResource, IObservable<TResult>> observableFactory)
        where TResource : IAsyncDisposable
    {
        return Observable.Defer(() =>
        {
            TResource resource = resourceFactory();
            IObservable<TResult> observable;
            try { observable = observableFactory(resource); }
            catch (Exception ex) { observable = Observable.Throw<TResult>(ex); }
    
            Lazy<Task> lazyDisposeTask = new(() => resource.DisposeAsync().AsTask());
            IObservable<TResult> disposer = Observable
                .FromAsync(() => lazyDisposeTask.Value)
                .Select(_ => default(TResult))
                .IgnoreElements();
    
            return observable
                .Catch((Exception ex) => disposer.Concat(Observable.Throw<TResult>(ex)))
                .Concat(disposer)
                .Finally(() => lazyDisposeTask.Value.GetAwaiter().GetResult());
        });
    }
    

    此方法与 Rx Observable.Using 方法具有相同的签名(where 子句除外),并且可以以相同的方式使用。

    此实现处理所有完成情况:

    1. 成功完成:IAsyncDisposable 资源由 Concat 运算符异步处理。
    2. 完成错误:IAsyncDisposable 资源由 Catch 运算符异步处理。
    3. 序列在完成之前被取消订阅:IAsyncDisposable 资源由Finally 运算符同步处理。在这种情况下,无法异步处理资源,原因已解释为 here

    带有异步工厂方法的变体:

    public static IObservable<TResult> Using<TResult, TResource>(
        Func<CancellationToken, Task<TResource>> resourceFactoryAsync,
        Func<TResource, CancellationToken, Task<IObservable<TResult>>> observableFactoryAsync)
        where TResource : IAsyncDisposable
    {
        return Observable.Create<TResult>(async (observer, cancellationToken) =>
        {
            TResource resource = await resourceFactoryAsync(cancellationToken);
            IObservable<TResult> observable;
            try { observable = await observableFactoryAsync(resource, cancellationToken); }
            catch { await resource.DisposeAsync(); throw; }
    
            Lazy<Task> lazyDisposeTask = new(() => resource.DisposeAsync().AsTask());
            IObservable<TResult> disposer = Observable
                .FromAsync(() => lazyDisposeTask.Value)
                .Select(_ => default(TResult))
                .IgnoreElements();
    
            return observable
                .Catch((Exception ex) => disposer.Concat(Observable.Throw<TResult>(ex)))
                .Concat(disposer)
                .Finally(() => lazyDisposeTask.Value.GetAwaiter().GetResult())
                .Subscribe(observer);
        });
    }
    

    【讨论】:

    • @jackdry 当然,我更新了答案。
    • 非常感谢。出于兴趣,是否有充分的理由为“lazyDisposeTask”使用Lazy&lt;Task&gt; 而不是Func&lt;Task&gt;
    • @jackdry 是的。 Lazy&lt;Task&gt; 确保 DisposeAsync 只会被调用一次。大多数现实世界的一次性用品都可以容忍多次处理,但最好是安全而不是抱歉。 :-)
    • @jackdry 仅供参考我修复了两个 Using 方法的实现中的错误。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2018-08-07
    • 1970-01-01
    • 2012-09-04
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多