【问题标题】:Process Asynchronous Calls in Sequence按顺序处理异步调用
【发布时间】:2026-01-26 00:30:01
【问题描述】:

我正在为 Azure 表存储创建一堆或 asynchronous calls。由于显而易见的原因,这些记录的插入顺序与调用它们的顺序不同。

我打算介绍ConcurrentQueue以确保顺序。以下以 POC 形式编写的示例代码似乎可以达到预期的效果。

我想知道这是确保异步调用的最佳方式 会按顺序完成吗?

public class ProductService
{
    ConcurrentQueue<string> ordersQueue = new ConcurrentQueue<string>();
    //Place make calls here
    public void PlaceOrder()
    {
        Task.Run(() =>
        {
            Parallel.For(0, 100, (i) =>
            {
                string item = "Product " + i;
                ordersQueue.Enqueue(item);
                Console.WriteLine("Placed Order: " + item);
                Task.Delay(2000).Wait();
            });

        });

    }

    //Process calls in sequence, I am hoping concurrentQueue will be consistent.
    public void Deliver()
    {
        Task.Run(() =>
        {
            while(true)
            {
                string productId;
                ordersQueue.TryDequeue(out productId);
                if (!string.IsNullOrEmpty(productId))
                {
                    Console.WriteLine("Delivered: " + productId);
                }
            }
        });
    }
}

【问题讨论】:

  • 1.如果你想按顺序做事,那么使用并发 API,Parallel,执行永远不会是一个好的开始。 2. 当您避免阻塞操作时,异步方法是好的,但前提是您将实际的阻塞操作转换为异步:即。不要阻塞锁上的执行线程 (ConcurrentQueue&lt;T&gt;.Enqueue)。
  • First of All Task.Delay 不是一个好的选择.... 你的一百个项目将一个一个地去队列,这意味着你现在有 100 个项目要插入,每个项目都是异步的向 Azure 请求。假设你有 8 台核心机器,你有 100 个任务,你不能保证哪个会在什么时候完成
  • 您能否在稍微抽象一点的层面上描述您正在尝试做的事情?例如,您是否只是尝试异步尝试调用一系列有序的 azure 插入?

标签: c# .net azure asynchronous task-parallel-library


【解决方案1】:

如果您想以异步和顺序的方式处理记录,这听起来非常适合 TPL Dataflow 的ActionBlock。只需创建一个块,其中包含要执行的操作并将记录发布到它。它支持async动作并保持秩序:

var block = new ActionBlock<Product>(async product =>
{
    await product.ExecuteAsync();
});

block.Post(new Product());

如果需要,它还支持并行处理和有限容量。

【讨论】:

    【解决方案2】:

    尝试使用 Microsoft 的响应式框架。

    这对我有用:

    IObservable<Task<string>> query =
        from i in Observable.Range(0, 100, Scheduler.Default)
        let item = "Product " + i
        select AzureAsyncCall(item);
    
    query
        .Subscribe(async x =>
        {
            var result = await x;
            /* do something with result */
        });
    

    我使用的AzureAsyncCall调用签名是public Task&lt;string&gt; AzureAsyncCall(string x)

    我加入了一堆 Console.WriteLine(Thread.CurrentThread.ManagedThreadId); 调用,以确保我在测试代码中获得了正确的异步行为。效果很好。

    所有的调用都是异步的,一个接一个地序列化。

    【讨论】: