让我从一些背景开始。
.NET 框架有许多特殊类型 - 适当的类或接口 - Task<T>、IObservable<T>、Nullable<T>、IEnumerable<T>、Lazy<T> 等 - 为底层类型提供特殊的权力T.
TPL 使用Task<T> 表示T 的单个值的异步计算。
Rx 使用IObservable<T> 表示对T 的零个或多个值的异步计算。
正是这两者的“异步计算”方面将 TPL 和 Rx 结合在一起。
现在,TPL 还使用 Task 类型来表示 Action lambda 的异步执行,但这可以被认为是 Task<T> 的特殊情况,其中 T 是 void。非常像 c# 中的标准方法返回 void,如下所示:
public void MyMethod() { }
Rx 还允许使用称为Unit 的特殊类型来实现相同的特殊情况。
TPL 和 Rx 之间的区别在于返回值的数量。 TPL 是一且只有一,而 Rx 是零或更多。
因此,如果您通过仅使用返回单个值的可观察序列来以特殊方式处理 Rx,您可以以类似于 TPL 的方式进行一些计算。
例如,在 TPL 中我可以这样写:
Task.Factory
.StartNew(() => "Hello")
.ContinueWith(t => Console.WriteLine(t.Result));
在 Rx 中,等价物是:
Observable
.Start(() => "Hello")
.Subscribe(x => Console.WriteLine(x));
我可以在 Rx 中更进一步,指定应该使用 TPL 来执行计算,如下所示:
Observable
.Start(() => "Hello", Scheduler.TaskPool)
.Subscribe(x => Console.WriteLine(x));
(默认使用线程池。)
现在我可以做一些“混合和匹配”了。如果我添加对 System.Reactive.Threading.Tasks 命名空间的引用,我可以很容易地在任务和可观察对象之间移动。
Task.Factory
.StartNew(() => "Hello")
.ToObservable()
.Subscribe(x => Console.WriteLine(x));
Observable
.Start(() => "Hello")
.ToTask()
.ContinueWith(t => Console.WriteLine(t.Result));
注意 ToObservable() 和 .ToTask() 调用以及由此产生的从一个库到另一个库的翻转。
如果我有一个 observable 返回多个值,我可以使用 observable .ToArray() 扩展方法将多个序列值转换为一个可以转换为任务的数组值。像这样:
Observable
.Interval(TimeSpan.FromSeconds(1.0))
.Take(5) // is IObservable<long>
.ToArray()
.ToTask() // is Task<long[]>
.ContinueWith(t => Console.WriteLine(t.Result.Length));
我认为这是对您问题的一个相当基本的答案。是你所期待的吗?