【问题标题】:Using Rx to limit to X concurrent Tasks使用 Rx 限制 X 个并发任务
【发布时间】:2017-08-02 11:01:50
【问题描述】:

我对 Rx 非常陌生,我正在尝试使用它来处理多达 X 个并发订阅任务。数据源实际上来自数据库,所以我不得不轮询数据库。我意识到 Rx 背后的想法是它是 push 而不是 pull - 所以民意调查不太适合,但从概念上讲,进入数据库的数据是我想订阅并做某事的事件流。

我遇到的主要问题是LimitedConcurrencyLevelTaskScheduler 似乎没有成功限制指定数量的任务。它比我指定的 8 个并发运行更多。

我也不确定以下两个解决方案中哪一个更好(或者可能都是错误的?!)。

这是我尝试过的一种方法,它使用 Observable.Timer ...

public static void Main()
{
    var taskFactory = new TaskFactory (new LimitedConcurrencyLevelTaskScheduler (8));
    var scheduler = new TaskPoolScheduler (taskFactory);

    Observable.Timer (TimeSpan.FromMilliseconds (10), scheduler)
        .SelectMany (x => Observable.FromAsync (GetItemsSource))
        .Repeat ()
        .ObserveOn (scheduler)
        .Subscribe (x => Observable.FromAsync(y => DoSomethingAsync (x.ToList())));

    Console.ReadKey ();
}

private static async Task<IEnumerable<Guid>> GetItemsSource()
{
    return await _myRepo.GetMoreAsync(10);
}

private static async Task DoSomethingAsync(IEnumerable<Guid> items)
{
    // Do something with the data
}

我也尝试过这样做......

public static void Main()
{
    GetItemsSource()
        .ObserveOn(scheduler)
        .Select (async x => await DoSomethingAsync(x))
        .Subscribe();

    Console.ReadKey ();
}

public static IObservable<Guid> GetItemsSource()
{
    return Observable.Create<Guid>(
        async obs =>
        {
            while (true)
            {
                var item = (await _myRepo.GetMoreAsync(1)).FirstOrDefault();

                if(item != null)
                {
                    obs.OnNext(item);
                }

                await Task.Delay(TimeSpan.FromMilliseconds(10))
            }
        });
}

private static async Task DoSomethingAsync(IEnumerable<Guid> items)
{
    // Do something with the data
}

显然非常简单的示例,没有错误处理或取消支持。

两者似乎都有效,但都不限于 8 个并发任务。

正如我所说,我对 Rx 很陌生,可能缺少很多基本的东西。我当然打算做大量阅读以完全理解 Rx,因为它看起来非常强大,但现在我想让一些东西快速工作。

更新

从 Enigmativity 的答案和 cmets 开始,这里有一些记录并发计数的代码......

void Main()
{
    var taskFactory = new TaskFactory(new LimitedConcurrencyLevelTaskScheduler(8));
    var scheduler = new TaskPoolScheduler(taskFactory);

    using (
        (
            from n in Observable.Interval(TimeSpan.FromMilliseconds(10), scheduler)
            from g in Observable.FromAsync(GetItemsSource, scheduler)
            from u in Observable.FromAsync(() => DoSomethingAsync(g), scheduler)
            select u)
        .ObserveOn(scheduler)
        .Subscribe())
    {
        Console.ReadLine();
    }
}

private static volatile int _numIn = 0;
private static volatile int _numOut = 0;

public static async Task<IEnumerable<Guid>> GetItemsSource()
{
    try
    {
        _numIn++;

        $"Concurrent tasks (in): {_numIn}".Dump();

        // Simulate async API call
        await Task.Delay(TimeSpan.FromMilliseconds(10));

        return new List<Guid> { Guid.NewGuid() };
    }
    finally
    {
        _numIn--;
    }
}

private static async Task DoSomethingAsync(IEnumerable<Guid> deliveryIds)
{
    try
    {
        _numOut++;

        // Simulate async calls required to process the event
        await Task.Delay(TimeSpan.FromMilliseconds(1000));

        $"Concurrent tasks (out): {_numOut}".Dump();
    }
    finally
    {
        _numOut--;
    }
}

这显示了大约 64 个并发任务正在运行。

更新 2

看起来确实是因为订阅者是异步的。如果我使用非异步订阅者进行测试,它可以正常工作。不幸的是,我需要一个异步订阅者,因为它需要调用其他异步方法。

看起来我可以通过这样做来做类似的事情......

GetItemsSource2()
    .Select(x => Observable.FromAsync(() => DoSomethingAsync(x)))
    .Merge(64)
    .Subscribe();

所以使用Merge 而不是LimitedConcurrencyLevelTaskScheduler

【问题讨论】:

  • 你怎么知道它不限于8个并发任务?你的 CPU 有多少个内核?
  • 你真的在这里混合了 Rx、TPL 和 Linq-for-objects。你不应该那样做。这里的大部分事情都可以用 Rx 来完成。保持纯洁。这要容易得多。
  • 我不认为您的更新显示了有多少任务正在运行 - 它显示了在调用 GetItemsSourceDoSomethingAsync 期间运行了多少任务。这就像一条只能容纳 8 辆汽车的道路,但您计算的是前面的汽车和紧随其后的 7 辆汽车。
  • 我不太明白。 _numOut 仅在 DoSomethingAsync 方法中使用。开始时递增,结束时递减。这肯定表明其中有多少是同时运行的?
  • 看起来它与异步订阅者有关。我已经通过使用Merge 来解决它(请参阅我的Update 2。感谢@Enigmativity 的帮助!:)

标签: c# .net concurrency task-parallel-library system.reactive


【解决方案1】:

我让你的代码通过这样做:

void Main()
{
    var taskFactory = new TaskFactory(new LimitedConcurrencyLevelTaskScheduler(8));
    var scheduler = new TaskPoolScheduler(taskFactory);

    using (Observable.Timer(TimeSpan.FromMilliseconds(10), scheduler)
        .SelectMany(x => Observable.FromAsync(GetItemsSource))
        .Repeat()
        .ObserveOn(scheduler)
        .Subscribe(async x => await DoSomethingAsync(x.ToList())))
    {
        Console.ReadLine();
    };
}

private static async Task<IEnumerable<Guid>> GetItemsSource()
{
    return await Task.Run(() => Enumerable.Range(0, 10).Select(x => Guid.NewGuid()).ToArray());
}

private static async Task DoSomethingAsync(IEnumerable<Guid> items)
{
    await Task.Run(() => Console.WriteLine(String.Join("|", items.Select(x => x.ToString()))));
}

我还修改了LimitedConcurrencyLevelTaskSchedulerQueueTask 方法,以包含正在运行的代表数量的跟踪行:

protected sealed override void QueueTask(Task task)
{
    // Add the task to the list of tasks to be processed.  If there aren't enough 
    // delegates currently queued or running to process tasks, schedule another. 
    lock (_tasks)
    {
        Console.WriteLine(_delegatesQueuedOrRunning);
        _tasks.AddLast(task);
        if (_delegatesQueuedOrRunning < _maxDegreeOfParallelism)
        {
            ++_delegatesQueuedOrRunning;
            NotifyThreadPoolOfPendingWork();
        }
    }
}

当我运行你的代码时,我得到了这个输出:

0 1 1 1 874695ca-e9a8-4688-a4d2-d7d2446e7924 | e3cbadf1-c5cb-4339-ab59-33d873db98f2 | 8dd710f5-0b21-4b20-a547-6ccfe7c29af4 | 42739c5a-4602-4f76-8aed-40f1c0ffccdc | c599e879-06ca-459f-B27E-95ac15dc4cc1 | 562b5f7c-dcb2-47d8-aa4a-503499654139 | b78e5fc5-d152-4380-9799-2713c6f71c19 | e47669a3-a399-4891-91b6-3a28b52a941a | f6483f2f-f8d7-47e8-9f88-bf9bc5f61f3a | c8e75203-bc55-4e00-9f8b-ecf248f81454 2 2 1 2 3 92d6f76e-6a3d-475d-8c1c-bd3e59aaf2a1 | 1b2bd0d6-c439-4b3c-a1f1-9af1f5132afc | c140e0ec-6741-4310-9edf-547fdf390b01 | 5b29dcd3-21c9-46a2-ae98-cd7a7b1a003c | 5a808def-a09f-41a7-ACFC-d11cfbbb4faa | 28f47d0c-7762-4949-ae33-427c82756874 | 13087b1c-c4eb-4f0f-bf5f-665a2298ac13 | 50c00907-668d-44f3-9e2a-c790348fb715 | 34f8602a-18ef-45b6-b069-0fb30718a45b | 46d6616d-0e89-49cd-8905-dff72bb63add 2 1 1 2 7c6cbe5a-43d1-4aad-8eee-33a0d7f44276 | 2164e3e0-4e8d-4a57-99b7-0aa5e42281da | e2cf26ed-501F-4032-9761-53a301e16bb9 | a3e9171a-f490-4135-a930-017dc706293e | b9f43f5c-d652-4205-b857-724699f178c5 | 5d8e6149-f9ad-424D-9ee2-07afa344ab80 | 418d2526-adf6-4c26-ac84-636400fce547 | 3b8c3b14-9e91-4bb8-9cfa-c1d36f12ccc0 | be3c8c84-5112-4c85-ba7f-d1b41a2ec03a | bdc34ccb-a3d9-45fa-bf54-d42bcd791081 2 1 1 2 3d5e0d0a-ddb3-4595-b960-4d2050d4fa1a | 1b5a39c9-652c-4872-8d1c-6e1212cd4043 | fd9aff7b-9c77-47e4-a4c6-2ede38472b92 | ff6145d3-2dc2-45b6-bc40-2cc1f270572d | 5ba0e441-a9f1-4b2a - 巴卡,127cce622993 | e2650bb9-f2e4-4a89-8c2c-69859f5f381a | a86a1f4e-7ea7-465c-9730-7ac735c5616e | 5cf7135b - 费夫 - 4725-938e-5e7ea55f4c3e | dd26bda7-d86b-4bb9-977f-a29ef5cf6d76 | 6e77c1c1-4e8d-4b48-b6f4-d54f0ec9d269 1 1

...等等...

您的代码执行的并发任务永远不会超过 3 个。

现在我建议您尝试以更接近 Rx 的方式编写代码。试试这个:

using (
(
    from n in Observable.Interval(TimeSpan.FromSeconds(1.0), scheduler)
    from g in Observable.FromAsync(() => GetItemsSource(), scheduler)
    from u in Observable.FromAsync(() => DoSomethingAsync(g), scheduler)
    select u)
    .ObserveOn(scheduler)
    .Subscribe())
{
    Console.ReadLine();
};

它不会改变使用的任务数量,但它更干净。

【讨论】:

  • 那么我肯定做错了其他事情-因为您的后一个示例也使用了远远超过 8 个。我只是通过在各种方法中使用 refcount 来监视它。我的 DoSomethingAsync 调用一个 API 端点,因此我也在监视该端的并发请求数 - 这也显示超过 8 个。我现在将尝试您的第一个示例。我只知道这将是我正在做的非常明显和愚蠢的事情! :)
  • @Dan - 我认为存在测量问题。我已经证明并发任务不超过 3 个。你怎么看多了?请发布显示它的代码,以便我自己运行它。
  • 我刚刚用一些显示问题的代码更新了我的答案。顺便说一句,谢谢您的帮助-非常感谢:)
  • 你是怎么修改QueueTask的?这只是你自己对TaskScheduler 的实现吗?看LimitedConcurrencyLevelTaskScheduler,它的QueueTask是密封的。
  • 没关系 - 我可以看到LimitedConcurrencyLevelTaskScheduler 的来源在线,所以我也这样做了。而且_delegatesQueuedOrRunning 比我的_numOut 低很多。我真的不明白为什么 - 除非我的 DoSomethingAsync 从未被等待,所以它不知道有多少正在运行?
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-06-16
  • 1970-01-01
  • 2012-02-28
  • 2011-02-23
  • 1970-01-01
相关资源
最近更新 更多