【问题标题】:Observable.Using with async TaskObservable.Using with async Task
【发布时间】:2018-03-08 15:27:26
【问题描述】:

我已经使用 Observable.Using 与返回 IDisposable 的方法一起使用:

Observable.Using(() => new Stream(), s => DoSomething(s));

但是当流被异步创建时,我们将如何进行呢?像这样:

Observable.Using(async () => await CreateStream(), s => DoSomething(s));

async Task<Stream> CreateStream() 
{
    ...
}

DoSomething(Stream s)
{
    ...
}

这不会编译,因为它说 sTask&lt;Stream&gt; 而不是 Stream

怎么了?

【问题讨论】:

  • Observable.Using(async () =&gt; await CreateStream(), s =&gt; DoSomething(s.Result)); 确实有效,但似乎非常错误。这是一个很好的问题。期待答案。
  • 啊,你可以试试这个:Observable.Using(async () =&gt; await CreateStream(), s =&gt; Observable.FromAsync(() =&gt; s).SelectMany(x =&gt; DoSomething(x)))
  • @Enigmativity RE: s.Result 我同意这感觉非常错误。我认为这个想法是第二个函数应该异步使用资源。你可以写Observable.Using(async (ct) =&gt; await CreateStream(), async (s, ct) =&gt; await DoSomething(s))ctCancellationToken,它是解决过载所必需的。非常令人困惑的 IMO。

标签: c# .net observable system.reactive


【解决方案1】:

我们来看看Observable.Using的异步重载的来源:

Observable.FromAsync(resourceFactoryAsync)
    .SelectMany(resource => Observable.Using(() => resource, r =>
        Observable.FromAsync(ct => observableFactoryAsync(r, ct)).Merge()));

知道它只是在后台使用同步版本,您可以执行以下操作以使其适应您的使用:

Observable.FromAsync(CreateStream)
    .SelectMany(stream => Observable.Using(() => stream, DoSomething));

不幸的是,没有包含这个重载,但您始终可以创建自己的:

public static class ObservableEx
{
    public static IObservable<TSource> Using<TSource, TResource>(
        Func<Task<TResource>> resourceFactoryAsync,
        Func<TResource, IObservable<TSource>> observableFactory)
        where TResource : IDisposable =>
        Observable.FromAsync(resourceFactoryAsync).SelectMany(
            resource => Observable.Using(() => resource, observableFactory));
}

然后就这么简单:

ObservableEx.Using(CreateStream, DoSomething);

所有这一切都假设DoSomething 返回一个可观察对象,这在您的问题中未提及,但Observable.Using 的合同要求。

【讨论】:

  • 为什么这个问题有 5 个赞,而这个答案只有 1 个?这是一个很好的答案。
【解决方案2】:

所以这里的问题是有两个类型签名非常尴尬的重载。

当您将Task&lt;TResource&gt; 返回函数作为第一个参数传递时,它似乎推断出第二个参数“可观察工厂”也将返回Task,但推断随后失败,因为在您的示例中,任何一个函数都没有为取消令牌声明参数,并且都 需要该参数。推理在很多方面都失败了,而且会让人感到困惑。

我会尝试用一个现实的例子来说明它。

注意:编写如下代码可能是个坏主意,但它至少是真实世界类型的用例。

var connectionString = @"Data Source=.\SQLEXPRESS;Integrated Security=SSPI;app=LINQPad";

Observable.Using(
        resourceFactoryAsync: async ct => await ConnectAsync(connectionString, ct),
        observableFactoryAsync: async (connection, ct) => await QueryAsync(connection, ct)
    )
    .Subscribe(
        onNext: Console.WriteLiner,
        onError: Console.Error.WriteLine
    );

async Task<IObservable<string>> QueryAsync(SqlConnection c, CancellationToken ct = default)
{
    var command = c.CreateCommand();
    command.CommandText = "select * from Categories as c order by c.CategoryId";
    var enumerableQuery = 
        from IDataRecord record in await command.ExecuteReaderAsync(ct)
        select (string)record["CategoryName"];
    return enumerableQuery.ToObservable();
}

async Task<SqlConnection> ConnectAsync(string cs, CancellationToken ct = default)
{
    var connection = new SqlConnection(cs);
    await connection.OpenAsync(ct);
    return connection;
}

注意:使用命名参数只是为了记录选择了哪个重载。

这是我能想到的最好的,但请随时使用更好的示例对其进行编辑。

值得注意的是

  • 如果为 resourceFactoryAsyncobservableFactoryAsync 提供了一个空函数,所有这些都会完全崩溃,编译失败。您必须将CancellationToken 声明为资源工厂和可观察工厂函数的形式参数。为了简洁起见,它们在上面的代码中都被命名为ct。虽然我们只是传递它们,但重要的是它们已声明

  • 请注意,observableFactoryAsyncconnection 参数不是可等待资源,而是实际资源。您收到的错误提示您自然地假设应该等待资源,这是由于类型推断选择了同步重载,因此将 TResource 作为 Task&lt;X&gt; 转发到下一个也被假定为同步的函数。

  • 这些抽象的组合方式感觉相当尴尬,至少对我来说是这样。我们混合了多种异步和同步编程风格,并且将IDisposables 混合到其中,这样我们就可以显式地异步创建它们,但谁知道它们将如何处理。

  • QueryAsync 函数实际上创建了一个辅助IDisposable,一个SqlCommand,我通常会将它包装在一个using 块中,但不能因为我需要对保留的查询进行惰性求值,所以我当调用返回的闭包时,需要该命令尚未被释放。

  • 可能有更好的方法来做到这一点,但我想知道它是否涉及多次调用Observable.Using 然后合并它们。我现在基本上只是在谈论 Observable monad 感觉有多复杂(假设我是 Rx 新手),但它真的感觉它希望你的整个应用程序都被包裹在其中。

【讨论】:

    猜你喜欢
    • 2018-06-05
    • 2019-10-08
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2022-12-19
    • 2020-11-03
    • 1970-01-01
    • 2018-12-28
    相关资源
    最近更新 更多