【问题标题】:TPL Dataflow and Rx Combined example [closed]TPL 数据流和 Rx 组合示例 [关闭]
【发布时间】:2012-07-08 14:41:18
【问题描述】:

我只想学习两者以及如何一起使用它们。我知道它们可以相互补充我只是找不到一个实际这样做的例子。

【问题讨论】:

  • SO 最适合解决如何做某事的具体问题。 “给我一个 X 的例子”不太合适。
  • 好的,你对问题的形式是正确的,但我认为这个问题仍然有效。也许“如何结合使用它们来有效地利用两者的功能?”更合适。
  • @naeron84 看看 C#-async + Rx 的组合示例就足够了吗?
  • @GregC 实际上我想使用 f# 但异步与 TPL 数据流块无关,至少没有明确地。
  • @svick 涉及并行和异步的问题。如果我将它们混合在一起会发生什么。我开始认为这个问题是微不足道的或困难的或毫无意义的,我没有得到一些非常基本的东西:)

标签: .net task-parallel-library system.reactive c#-5.0 tpl-dataflow


【解决方案1】:

让我从一些背景开始。

.NET 框架有许多特殊类型 - 适当的类或接口 - Task<T>IObservable<T>Nullable<T>IEnumerable<T>Lazy<T> 等 - 为底层类型提供特殊的权力T.

TPL 使用Task<T> 表示T 的单个值的异步计算。

Rx 使用IObservable<T> 表示对T 的零个或多个值的异步计算。

正是这两者的“异步计算”方面将 TPL 和 Rx 结合在一起。

现在,TPL 还使用 Task 类型来表示 Action lambda 的异步执行,但这可以被认为是 Task<T> 的特殊情况,其中 Tvoid。非常像 c# 中的标准方法返回 void,如下所示:

public void MyMethod() { }

Rx 还允许使用称为Unit 的特殊类型来实现相同的特殊情况。

TPL 和 Rx 之间的区别在于返回值的数量。 TPL 是一且只有一,而 Rx 是零或更多。

因此,如果您通过仅使用返回单个值的可观察序列来以特殊方式处理 Rx,您可以以类似于 TPL 的方式进行一些计算。

例如,在 TPL 中我可以这样写:

Task.Factory
    .StartNew(() => "Hello")
    .ContinueWith(t => Console.WriteLine(t.Result));

在 Rx 中,等价物是:

Observable
    .Start(() => "Hello")
    .Subscribe(x => Console.WriteLine(x));

我可以在 Rx 中更进一步,指定应该使用 TPL 来执行计算,如下所示:

Observable
    .Start(() => "Hello", Scheduler.TaskPool)
    .Subscribe(x => Console.WriteLine(x));

(默认使用线程池。)

现在我可以做一些“混合和匹配”了。如果我添加对 System.Reactive.Threading.Tasks 命名空间的引用,我可以很容易地在任务和可观察对象之间移动。

Task.Factory
    .StartNew(() => "Hello")
    .ToObservable()
    .Subscribe(x => Console.WriteLine(x));

Observable
    .Start(() => "Hello")
    .ToTask()
    .ContinueWith(t => Console.WriteLine(t.Result));

注意 ToObservable().ToTask() 调用以及由此产生的从一个库到另一个库的翻转。

如果我有一个 observable 返回多个值,我可以使用 observable .ToArray() 扩展方法将多个序列值转换为一个可以转换为任务的数组值。像这样:

Observable
    .Interval(TimeSpan.FromSeconds(1.0))
    .Take(5) // is IObservable<long>
    .ToArray()
    .ToTask() // is Task<long[]>
    .ContinueWith(t => Console.WriteLine(t.Result.Length));

我认为这是对您问题的一个相当基本的答案。是你所期待的吗?

【讨论】:

  • TPL Dataflow 是一个与 TPL 非常不同的库,所以我认为答案并不能准确解决问题。然而,讨论值得注意,所以 +1。
  • 太糟糕了,.Net 类型系统实际上不允许Task&lt;void&gt;
  • 对不起,但正如 GregC 之前所说,我需要一个涉及 TPL 数据流的示例,而不是“只是”TPL。我想要的是结合 TPL Dataflow 块和 Rx。
  • 我很抱歉 - 我不知道 DataFlow。抱歉,我没有回答您的问题。
  • @svick - 缺少Task&lt;void&gt;IObservable&lt;Unit&gt; 的意思。你不能用Task&lt;Unit&gt;Task&lt;void&gt; 做同样的事情吗?
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-03-17
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多