【问题标题】:Azure Service Fabric Message QueueAzure Service Fabric 消息队列
【发布时间】:2016-07-07 12:53:10
【问题描述】:

我正在尝试对一系列任务进行排队,并使用 Azure Service Fabric 异步运行它们。我目前正在将 CloudMessageQueue 与工作角色一起使用。我正在尝试迁移到 Service Fabric。从工人角色来看,这是我的代码:

    private void ExecuteTask()
    {
        CloudQueueMessage message = null;

        if (queue == null)
        {
            jmaLogProvider.WriteToLog(new ErrorMessage(MessageSeverity.Error, String.Format("Queue for WorkerRole2 is null. Exiting.")));
            return;
        }

        try
        {
            message = queue.GetMessage();
            if (message != null)
            {
                JMATask task = GetTask(message.AsString);
                string msg = (message == null) ? string.Empty : message.AsString;
                //jmaLogProvider.WriteToLog(new ErrorMessage(MessageSeverity.JMA, String.Format("Executing task {0}", msg)));
                queue.DeleteMessage(message);
                PerformTask(task);
            }
        }
        catch (Exception ex)
        {
            string msg = (message == null) ? string.Empty : message.AsString;
            jmaLogProvider.WriteToLog(new ErrorMessage(MessageSeverity.Error, String.Format("Message {0} Error removing message from queue {1}", msg, ex.ToString())));
        }
    }

我有一些问题:

  1. 如何异步运行执行任务方法?我想同时运行大约 30 - 40 个任务。
  2. 我有一个 JMATask 列表。如何将列表添加到队列中?
  3. 是否需要将列表添加到队列中?

    namespace Stateful1
    {
       public class JMATask
       {
         public string Name { get; set; }
       }
    
    /// <summary>
    /// An instance of this class is created for each service replica by the Service Fabric runtime.
    /// </summary>
    internal sealed class Stateful1 : StatefulService
    {
    public Stateful1(StatefulServiceContext context)
        : base(context)
    { }
    
    /// <summary>
    /// Optional override to create listeners (e.g., HTTP, Service Remoting, WCF, etc.) for this service replica to handle client or user requests.
    /// </summary>
    /// <remarks>
    /// For more information on service communication, see http://aka.ms/servicefabricservicecommunication
    /// </remarks>
    /// <returns>A collection of listeners.</returns>
    protected override IEnumerable<ServiceReplicaListener> CreateServiceReplicaListeners()
    {
        return new ServiceReplicaListener[0];
    }
    
    /// <summary>
    /// This is the main entry point for your service replica.
    /// This method executes when this replica of your service becomes primary and has write status.
    /// </summary>
    /// <param name="cancellationToken">Canceled when Service Fabric needs to shut down this service replica.</param>
    protected override async Task RunAsync(CancellationToken cancellationToken)
    {
        // TODO: Replace the following sample code with your own logic 
        //       or remove this RunAsync override if it's not needed in your service.
    
        IReliableQueue<JMATask> tasks = await this.StateManager.GetOrAddAsync<IReliableQueue<JMATask>>("JMATasks");
        //var myDictionary = await this.StateManager.GetOrAddAsync<IReliableDictionary<string, long>>("myDictionary");
    
        while (true)
        {
            cancellationToken.ThrowIfCancellationRequested();
    
            using (var tx = this.StateManager.CreateTransaction())
            {
                var result = await tasks.TryDequeueAsync(tx);
    
                //how do I execute this method async?
                PerformTask(result.Value);
    
                //Create list of JMA Tasks to queue up
                await tasks.EnqueueAsync(tx, new JMATask());
    
                //var result = await myDictionary.TryGetValueAsync(tx, "Counter");
    
                //ServiceEventSource.Current.ServiceMessage(this, "Current Counter Value: {0}",
                //    result.HasValue ? result.Value.ToString() : "Value does not exist.");
    
                //await myDictionary.AddOrUpdateAsync(tx, "Counter", 0, (key, value) => ++value);
    
                // If an exception is thrown before calling CommitAsync, the transaction aborts, all changes are 
                // discarded, and nothing is saved to the secondary replicas.
                await tx.CommitAsync();
            }
    
            await Task.Delay(TimeSpan.FromSeconds(1), cancellationToken);
        }
    }
    
    private async void PerformTask(JMATask task)
    {
        //execute task
    }
    

    }

【问题讨论】:

标签: c# azure azure-service-fabric


【解决方案1】:

RunAsync 方法不应有此行代码:await tasks.EnqueueAsync(tx, new JMATask());

创建要排队的 JMA 任务列表应该是有状态服务中的另一种方法,如下所示:

    public async Task AddJMATaskAsync(JMATask task)
    {
        var tasksQueue = await StateManager.GetOrAddAsync<IReliableQueue<JMATask>>("JMATasks");
        using (var tx = StateManager.CreateTransaction())
        {
            try
            {
                await tasksQueue.EnqueueAsync(tx, request);
                await tx.CommitAsync();
            }
            catch (Exception ex)
            {
                tx.Abort();
            }
        }
    }

然后您的 PerformTask 方法可以包含对无状态微服务的调用:

    public async Task PerformTask (JMATask task)
    {
        //1. resolve stateless microservice URI
        // statelessSvc

        //2. call method of the stateless microservice
        // statelessSvc.PerformTask(task);
    }

所以基本上,有状态服务只会对任务进行排队和出队。执行实际任务可以由集群中的所有节点都可以使用的微服务来完成。

【讨论】:

    【解决方案2】:

    您可以创建一个任务列表,然后执行 await Task.WhenAll(taskList);

    这可能是最简单的直接答案。

    但是 - 如果每个任务都略有不同,您是否考虑过为每个任务创建单独的微服务?

    【讨论】:

    • 是的,每个任务都不同。如何创建单独的微服务?
    • 理想情况下为每个任务创建一个服务(有状态、无状态)。如果没有更深入的了解,很难准确回答,但这将是我的直觉反应。每个服务都应该有一个简单的单一职责,并且非常擅长做它应该做的事情。
    • 我更新了我的帖子。这些任务将数据从一个系统移动到另一个系统。有数百个不同的任务,但其中许多执行相同类型的操作。
    • 您可能希望将 IReliableQueue 用于传入任务和 IReliableDictionary 组合以跟踪当前正在处理的任务。使用一项任务从队列中出列并启动处理,跟踪处理字典。在启动过程中,队列之前的字典中尚未完成的任何内容。记得在需要时取消,以防止挂起任务!
    猜你喜欢
    • 1970-01-01
    • 2017-01-16
    • 1970-01-01
    • 2019-11-30
    • 2017-03-28
    • 1970-01-01
    • 2017-07-25
    • 1970-01-01
    • 2020-03-20
    相关资源
    最近更新 更多