【问题标题】:Alternative to using async () => in Rx Finally最后在 Rx 中使用 async () => 的替代方案
【发布时间】:2017-07-20 04:22:30
【问题描述】:

我的理解是async voidshould be avoidedasync () => 只是async voidAction 一起使用时的伪装。

因此,应避免将the Rx.NET Finally operatorasync () => 异步使用,因为Finally 接受Action 作为参数:

IObservable<T>.Finally(async () =>
{
    await SomeCleanUpCodeAsync();
};

但是,如果这是不好的做法,那么在我需要异步在 OnCompleted 上关闭网络连接或我的可观察以 OnError 结束的情况下使用的最佳做法是什么?

【问题讨论】:

  • 您在此处显示的异步 lambda 是 async void 方法,但并非所有异步 lambda 都将是 void 方法。如果您在预期返回 Task 方法的上下文中提供异步 lambda,这就是您将得到的。

标签: c# asynchronous async-await system.reactive


【解决方案1】:

我的理解是应该避免异步无效,async () => 只是伪装的async void

这部分是错误的。 async () =&gt; 可以匹配 Func&lt;Task&gt;(好)或 Action(坏)。好/坏的主要原因是async void 调用中发生的异常使进程崩溃,而async Task 异常是可捕获的。

所以我们只需要编写一个AsyncFinally 运算符来接收Func&lt;Task&gt;,而不是像Observable.Finally 这样的Action

public static class X
{
    public static IObservable<T> AsyncFinally<T>(this IObservable<T> source, Func<Task> action)
    {
        return source
            .Materialize()
            .SelectMany(async n =>
            {
                switch (n.Kind)
                {
                    case NotificationKind.OnCompleted:
                    case NotificationKind.OnError:
                        await action();
                        return n;
                    case NotificationKind.OnNext:
                        return n;
                    default:
                        throw new NotImplementedException();
                }
            })
            .Dematerialize()
        ;
    }
}

下面是一个用法演示:

try
{
    Observable.Interval(TimeSpan.FromMilliseconds(100))
        .Take(10)
        .AsyncFinally(async () =>
        {
            await Task.Delay(1000);
            throw new NotImplementedException();
        })
        .Subscribe(i => Console.WriteLine(i));
}
catch(Exception e)
{
    Console.WriteLine("Exception caught, no problem");
}

如果您将AsyncFinally 换成Finally,则会导致进程崩溃。

【讨论】:

  • 我真的很喜欢这种方法。我不熟悉 Materialize/Dematerialize。使用它们的好方法。但是为什么要使用 SelectMany 而不仅仅是 Select?
  • 选择在那里不起作用。带有 select 的异步选择器函数会将其从 IObservable&lt;Notification&lt;T&gt;&gt; 转换为 IObservable&lt;Task&lt;Notification&lt;T&gt;&gt;。 SelectMany 版本扁平化了其中的任务部分。
  • 如果我处理订阅 OnCompleted 或 OnError 将不会发生,因此 SomeCleanUpCodeAsync 将不会运行。我以为会的。我做错了什么还是有解决方法?
  • 不,这是预期的功能。解决方法是在您处置订阅时显式调用清理。或者,您可以使用 Observable.Using.
  • 好的,但是我如何以异步方式运行清理? Disposable.Create 类似于 Final,因为它需要 Action 而不是 Func。有没有办法创建一个 Disposable.AsyncCreate 来调用 DisposeAsync,或者类似的东西。也许我应该为此创建一个单独的问题?
【解决方案2】:

它在 Rx 中就像在其他地方一样;像瘟疫一样避免async void。除了文章中列出的问题之外,在同步运算符中使用异步代码会“破坏”Rx。

我会考虑使用OnErrorResumeNext() 来异步清理资源。 OnErrorResumeNext() 让我们指定一个 observable,它将在第一个之后运行,无论它结束的原因是什么:

var myObservable = ...

myObservable
    .Subscribe( /* Business as usual */ );

Observable.OnErrorResumeNext(
        myObservable.Select(_ => Unit.Default),
        Observable.FromAsync(() => SomeCleanUpCodeAsync()))
    .Subscribe();

myObservable 最好是ConnectableObservable(例如Publish())以防止多次订阅。

【讨论】:

  • 这会处理 OnError 场景。但是,我还需要在 OnCompleted 之后进行清理。但是,有趣的是您正在使用 Observable.FromAsync()。这是解决async void的方法吗?
  • 与名称可能暗示的相反 OnErrorResumeNext() 实际上将第二个可观察对象连接到第一个可观察对象,而不管第一个可观察对象是否存在错误。 OnErrorResumeNext() 的行为很像 Concat(),但在 OnError OnCompleted 上移动到下一个 observable
  • 感谢您的澄清。会调查的。
  • Finally 在这种情况下实际上是更好的语义选择。
  • 嗨@Asti! Finally 调用 Action,而 OnErrorResumeNext 将 observable 作为延续。所以后者可能更适合替代async using
【解决方案3】:

Finally 的方法签名是

public static IObservable<TSource> Finally<TSource>(
    this IObservable<TSource> source,
    Action finallyAction
)

它需要一个动作,而不是一个任务。

作为附录,如果您想以异步方式运行某些东西,而不是 async void,请在方法内使用 Task.Factory 方法,这样意图就很明确了。

【讨论】:

    【解决方案4】:

    引用Intro to Rx:

    Finally 扩展方法接受 Action 作为参数。如果序列正常或错误终止,或者如果订阅已被处置,则将调用此 Action

    (强调)

    由于IObservable&lt;T&gt; 接口的定义方式,接受Func&lt;Task&gt; 参数的Finally 运算符无法复制此行为。通过调用Dispose 订阅的Dispose 方法可以取消订阅可观察序列。这种方法是同步的。整个 Rx 库都建立在这个接口之上。因此,即使您为IDisposables 创建了扩展方法DisposeAsync,内置的Rx 运算符(例如SelectSelectManyWhereTake 等)也不会意识到它的存在,并且在他们取消订阅源序列时不会调用它。与往常一样,调用上一个链接的同步Dispose 方法将自动取消链接运算符的订阅链。

    顺便说一句,已经尝试实现一个异步版本的 Rx (AsyncRx),它建立在如下所示的全新接口之上。这个库has not been released yet.

    public interface IAsyncObserver<in T>
    {
        ValueTask OnNextAsync(T value);
        ValueTask OnErrorAsync(Exception error);
        ValueTask OnCompletedAsync();
    }
    
    public interface IAsyncObservable<out T>
    {
        ValueTask<IAsyncDisposable> SubscribeAsync(IAsyncObserver<T> observer);
    }
    
    public interface IAsyncDisposable
    {
        public ValueTask DisposeAsync();
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2011-09-25
      • 2014-03-03
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-10-15
      相关资源
      最近更新 更多