【发布时间】:2015-09-21 21:03:38
【问题描述】:
我正在使用 Rx 来确保我们的后端遵守某些第三方 API 的请求限制。
下面的实现使用一个简单的Subject<T> 作为输入队列,然后使用James World's custom Pace operator驯服。
这有效,但前提是在ObserveOn(TaskPoolScheduler.Default) 强制执行的主线程上未观察到throttledRequests。
只要我注释掉这一行(第 61 行),程序就会表现得好像根本没有使用 Pace 运算符,并且请求会在排队时再次得到处理。谁能解释这种行为?
using System;
using System.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Reactive.Threading.Tasks;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApplication1
{
public static class ObservableExtensions
{
/// <summary>
/// James World's Pace operater (see https://stackoverflow.com/a/21589238/88513)
/// </summary>
public static IObservable<T> Pace<T>(this IObservable<T> source, TimeSpan interval)
{
return source.Select(i => Observable.Empty<T>()
.Delay(interval)
.StartWith(i))
.Concat();
}
}
class Program
{
ISubject<int> requests;
IObservable<int> throttledRequests;
private Task<T> QueueRequest<T>(int work, Func<int, Task<T>> doWork)
{
var task = throttledRequests
.Where(x => x == work)
.Take(1)
.SelectMany(doWork)
.ToTask();
// queue it
requests.OnNext(work);
return task;
}
private Task<int> DoRequest(int x)
{
Console.WriteLine("{0:T}: DoRequest({1}) on TID {2}", DateTime.UtcNow, x, Thread.CurrentThread.ManagedThreadId);
return Task.FromResult(x);
}
private void Run()
{
// initialize request queue
requests = new Subject<int>();
// create a derived rate-limited queue
throttledRequests = requests
.Pace(TimeSpan.FromMilliseconds(1000))
.Publish()
.RefCount()
.ObserveOn(TaskPoolScheduler.Default);
Console.WriteLine("Main TID: {0}", Thread.CurrentThread.ManagedThreadId);
int i = 0;
while (true)
{
// Queue a number of requests
var tasks = Enumerable.Range(i * 10, 10)
.Select(x => QueueRequest(x, DoRequest))
.ToArray();
Task.WaitAll(tasks);
Console.ReadLine();
i++;
}
}
static void Main(string[] args)
{
new Program().Run();
}
}
}
【问题讨论】:
-
注释掉
.ObserveOn(TaskPoolScheduler.Default)时应该尝试找出具体使用了哪些调度程序。我怀疑它将是当前线程或即时调度程序。这应该会告诉你发生了什么。 -
你有什么理由这么混合 Rx 和 TPL 吗?
-
@Enigmativity 当我注释掉该行时,DoRequest 将在主线程上执行(如预期的那样)。我已经修改了程序的控制台输出以使其更加明显。
-
@Enigmativity 这真的是 TPL 和 Rx 的奇怪组合吗?我认为 Rx 是小跑部分的正确技术,处理 Web 请求非常适合 TPL。
-
建议坚持一个或另一个。 TPL 可以强制执行 Rx 查询,并且它们之间的更改会引入死锁。如有必要,请务必在最后一个可能的时间点切换。
标签: c# system.reactive reactive-programming