【问题标题】:Azure Worker Role multithread queue processingAzure Worker Role 多线程队列处理
【发布时间】:2015-04-15 08:18:45
【问题描述】:

我有一个带有经典配置 WebRole + Worker 角色的天蓝色云服务。 Worker 从队列中获取消息,处理它而不是一次删除它。

我的代码是这样的:

public override void Run()
    {
        Trace.TraceInformation("Worker is running");
            try
            {
                this.RunAsync(this.cancellationTokenSource.Token).Wait();
            }
            finally
            {
                this.runCompleteEvent.Set();
            }
    }

public override bool OnStart()
        {
            ServicePointManager.DefaultConnectionLimit = 500;
            bool result = base.OnStart();
            Trace.TraceInformation("WorkerAnalytics has been started");
            return result;
        }



private async Task RunAsync(CancellationToken cancellationToken)
        {
            var queue = ....//omitted info for brevity
            CloudQueueMessage retrievedMessage = null;

            while (!cancellationToken.IsCancellationRequested)
            {
                 try
                    {
                        retrievedMessage = await queue.GetMessageAsync();
                        if (retrievedMessage != null)
                        {
                            await ProcessMessage(retrievedMessage);
                        }
                        else
                        {
                            System.Threading.Thread.Sleep(500);
                        }
                    }
                    catch (Exception e)
                    {
                        System.Threading.Thread.Sleep(500);
                    }
                }
            }

        }

现在这可以完美运行,但 CPU 非常低,只有 3%,一次只处理一个元素(每个大约 1 秒),但队列每秒大约有 1000 个新元素,这还不够。

如何使用机器拥有的所有 CPU 能力一次处理更多队列消息,而不会使代码过于复杂?

ServicePointManager.DefaultConnectionLimit 还有什么用途?

我花了几个小时寻找有效的 Worker Roles 多线程解决方案,但现在都是 WebJobs 或旧框架使事情变得复杂。

谢谢

【问题讨论】:

    标签: azure cloud azure-worker-roles azure-cloud-services


    【解决方案1】:

    您可以尝试运行RunAsync() 的多个任务。

    var tasks = new List<Task>();
    tasks.Add(this.RunAsync(this.cancellationTokenSource.Token));
    tasks.Add(this.RunAsync(this.cancellationTokenSource.Token));
    tasks.Add(this.RunAsync(this.cancellationTokenSource.Token));
    Task.WaitAll(tasks.ToArray());
    

    【讨论】:

    • 谢谢,这似乎有效!您建议创建多少个任务?天蓝色的机器是双核的,3GB RAM 队列的填充速度非常快,比如 1000 条新记录/秒,我m afraid that Ill 需要很多实例。
    • 那么尝试逐渐增加任务数并找到合适的任务数。如果您开始看到高 CPU 或性能下降升级到更大的虚拟机或运行多个辅助角色实例
    • @pauliusnrk 我需要在哪里编写这段代码?而不是这一行“this.RunAsync(this.cancellationTokenSource.Token).Wait();” ?
    • 我得到了答案,但现在我有另一个问题:@pauliusnrk 有什么方法可以在运行时分配新任务?因为我不想在达到某个阈值时一直使用 15-20 个任务......有办法做到这一点吗?
    • 如果您想根据您的工作量启动/停止新任务,您可以启动单独的任务任务,该任务有时会根据您的工作量启动/停止任务
    猜你喜欢
    • 2011-11-19
    • 1970-01-01
    • 2012-11-12
    • 2011-05-04
    • 2016-05-02
    • 2012-12-06
    • 2018-06-27
    • 2013-11-23
    • 1970-01-01
    相关资源
    最近更新 更多