【问题标题】:Processing requests at a maximum rate以最大速率处理请求
【发布时间】:2015-09-21 21:03:38
【问题描述】:

我正在使用 Rx 来确保我们的后端遵守某些第三方 API 的请求限制。

下面的实现使用一个简单的Subject<T> 作为输入队列,然后使用James World's custom Pace operator驯服

这有效,但前提是在ObserveOn(TaskPoolScheduler.Default) 强制执行的主线程上未观察到throttledRequests

只要我注释掉这一行(第 61 行),程序就会表现得好像根本没有使用 Pace 运算符,并且请求会在排队时再次得到处理。谁能解释这种行为?

using System;
using System.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Reactive.Threading.Tasks;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApplication1
{
    public static class ObservableExtensions
    {
        /// <summary>
        /// James World's Pace operater (see https://stackoverflow.com/a/21589238/88513)
        /// </summary>
        public static IObservable<T> Pace<T>(this IObservable<T> source, TimeSpan interval)
        {
            return source.Select(i => Observable.Empty<T>()
                    .Delay(interval)
                    .StartWith(i))
                .Concat();
        }
    }

    class Program
    {
        ISubject<int> requests;
        IObservable<int> throttledRequests;

        private Task<T> QueueRequest<T>(int work, Func<int, Task<T>> doWork)
        {
            var task = throttledRequests
                .Where(x => x == work)
                .Take(1)
                .SelectMany(doWork)
                .ToTask();

            // queue it
            requests.OnNext(work);

            return task;
        }

        private Task<int> DoRequest(int x)
        {
            Console.WriteLine("{0:T}: DoRequest({1}) on TID {2}", DateTime.UtcNow, x, Thread.CurrentThread.ManagedThreadId);
            return Task.FromResult(x);
        }

        private void Run()
        {
            // initialize request queue
            requests = new Subject<int>();

            // create a derived rate-limited queue
            throttledRequests = requests
                .Pace(TimeSpan.FromMilliseconds(1000))
                .Publish()
                .RefCount()
                .ObserveOn(TaskPoolScheduler.Default);

            Console.WriteLine("Main TID: {0}", Thread.CurrentThread.ManagedThreadId);

            int i = 0;

            while (true)
            {
                // Queue a number of requests
                var tasks = Enumerable.Range(i * 10, 10)
                    .Select(x => QueueRequest(x, DoRequest))
                    .ToArray();

                Task.WaitAll(tasks);

                Console.ReadLine();
                i++;
            }
        }

        static void Main(string[] args)
        {
            new Program().Run();
        }
    }
}

【问题讨论】:

  • 注释掉.ObserveOn(TaskPoolScheduler.Default)时应该尝试找出具体使用了哪些调度程序。我怀疑它将是当前线程或即时调度程序。这应该会告诉你发生了什么。
  • 你有什么理由这么混合 Rx 和 TPL 吗?
  • @Enigmativity 当我注释掉该行时,DoRequest 将在主线程上执行(如预期的那样)。我已经修改了程序的控制台输出以使其更加明显。
  • @Enigmativity 这真的是 TPL 和 Rx 的奇怪组合吗?我认为 Rx 是小跑部分的正确技术,处理 Web 请求非常适合 TPL。
  • 建议坚持一个或另一个。 TPL 可以强制执行 Rx 查询,并且它们之间的更改会引入死锁。如有必要,请务必在最后一个可能的时间点切换。

标签: c# system.reactive reactive-programming


【解决方案1】:

我无法完整回答问题(不确定为什么它在 ThreadPoolScheduler 上运行时运行),但我会告诉你我的想法并展示如何修复它以使其在有或没有 ThreadPoolScheduler 的情况下按预期运行。

首先,您可能会注意到,即使在 ThreadPoolScheduler 上它也无法正常工作 - 通常前 1-3 个项目会在没有任何延迟的情况下得到处理。为什么在那之后他们开始延迟处理,但我仍然不清楚。现在说原因。考虑以下示例代码:

var result = Observable.Range(0, 10).Delay(TimeSpan.FromSeconds(10)).StartWith(1).Take(1).ToTask().Result;

在这里,不会有任何延迟,任务将立即完成。为什么?因为 StartWith 立即在序列的开头注入“1”,然后 Take(1) 获取该值并完成 - 没有理由继续序列,因此永远不会执行延迟。例如,如果您使用 Take(2) - 它会在完成前延迟 10 秒。

出于完全相同的原因,您的代码永远不会进入延迟(例如,您可以通过在延迟后选择并登录到控制台来使用调试器验证这一点)。要修复,只需删除 Take(1) (或将其更改为 Take(2) 例如) - 无论如何每个键总是只有一个项目。当你这样做时,无论有没有 ThreadPoolScheduler,代码都会正确运行。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2018-08-07
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多