【问题标题】:TPL Dataflow - how to call action item multiple itemsTPL Dataflow - 如何调用多个项目的操作项
【发布时间】:2017-06-02 22:55:30
【问题描述】:

我是TPL Dataflow 的新手。我有一个需要处理的项目编号列表。一个项目可能有大约8000 个项目,我需要获取项目中每个项目的数据,然后将这些数据推送到 5 个单独的服务器中。

这是我迄今为止编写的代码。我被困在如何将这些数据加载到 5 个服务器中的步骤上。我不确定这是否编码正确。非常感谢任何建议。

public  static bool PushData(string projectId)
{
    var linkCompletion = new DataflowLinkOptions
    {
        PropagateCompletion = true
    };

    var projectItems = new TransformBlock<ProjectDTO, ProjectDTO>(
        dto => dto.GetItemData(dto), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 });

    var itemData = new ActionBlock<ProjectDTO>(
         dto =>  PostEachServerAsync(dto, "server1", "setmemcache"), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 });


    projectItems.LinkTo(projectRules, linkCompletion);

    IList<ProjectDTO> dtoList = new List<ProjectDTO>();
    dtoList = MemcachedDTO.GetDataByProject(projectId);

    foreach (ProjectDTOd in dtoList)
    {
        projectItems.Post(d);
    }

    projectItems.Complete();
    projectItems.Completion.Wait();
    return false;
}

这是我现在的代码 - 但它没有正确完成 - 谁能告诉我我做错了什么?

             [HttpGet]
    public HttpResponseMessage ReloadItem(string projectQuery)
    {
        try
        {

            var linkCompletion = new DataflowLinkOptions
            {
                PropagateCompletion = true
            };

            IList<string> projectIds = projectQuery.Split(',').ToList();
            IEnumerable<string> serverList = ConfigurationManager.AppSettings["ServerList"].Split(',').Cast<string>();

            var iR = new TransformBlock<MemcachedDTO, MemcachedDTO>(
                dto => dto.GetIR(dto), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 3 });

            var pR = serverList.Select(
                    i => new { Id = i, Action = new ActionBlock<MemcachedDTO>(dto => PostEachServerAsync(dto, i, "set"), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 3 }) });

            List<MemcachedDTO> dtoList = new List<MemcachedDTO>();

            foreach (string pid in projectIds)
            {
                IList<MemcachedDTO> dtoTemp = new List<MemcachedDTO>();
                dtoTemp = MemcachedDTO.GetItemIdsByProject(pid);
                dtoList.AddRange(dtoTemp);
            }


            foreach (var action in pR)
            {
                iR.LinkTo(action.Action, linkCompletion);
            }

            foreach (MemcachedDTO d in dtoList)
            {
                iR.Post(d);
            }
            iR.Complete();
            foreach (var action in pR)
            {
                action.Action.Completion.Wait();
            }


            return Request.CreateResponse(HttpStatusCode.OK, new { message = projectIds.ToString() + " reload success" });
        }
        catch (Exception ex)
        {
            return Request.CreateResponse(HttpStatusCode.InternalServerError, new { message = ex.Message.ToString() });
        }
    }

【问题讨论】:

  • 如果您从 1000 条消息中只收到 998 条消息,请检查错误,可能存在您不知道的异常

标签: c# .net parallel-processing task-parallel-library tpl-dataflow


【解决方案1】:

您的代码根本无法编译,您如何运行它?

首先,不要用.Wait() 阻塞你的线程,这里使用async/await pattern。其次,您需要一个BroadcastBlock 来通知超过 1 个块与您的数据。第三,您需要 5 个不同的 ActionBlocks,而不是 1 个并行度为 5。第四,您正在等待错误的 Completion 任务 - 等待最后一个块完成,而不是第一个,所以在您的情况下,您需要使用 WhenAll method 等待 5 个块完成。

所以你的代码可能是这样的(我假设projectRulesitemsData 是同一个块):

public static async Task<bool> PushData(string projectId)
{
    var linkCompletion = new DataflowLinkOptions
    {
        PropagateCompletion = true
    };

    var projectItems = new TransformBlock<ProjectDTO, ProjectDTO>(
        dto => dto.GetItemData(dto), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 5 });

    var broadcast = new BroadcastBlock<ProjectDTO>();
    projectItems.LinkTo(broadcast, linkCompletion);

    var pR = serverList.Select(
            i => new { Id = i, Action = new ActionBlock<MemcachedDTO>(dto => PostEachServerAsync(dto, i, "set"), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 3 }) });

    foreach (var action in pR)
    {
        broadcast.LinkTo(action.Action, linkCompletion);
    }

    var dtoList = MemcachedDTO.GetDataByProject(projectId);

    foreach (var d in dtoList)
    {
        projectItems.Post(d);
    }
    projectItems.Complete();

    // wait all the action blocks to finish
    await Task.WhenAll(projectRules1.Completion, projectRules2.Completion, projectRules3.Completion, projectRules4.Completion, projectRules5.Completion);
    return false;
}

【讨论】:

  • 感谢您的回复 - 项目项目与 itemdata 不在同一个块上 - 这是因为 1)我将首先获得与每个项目编号相对应的列表项目 2)获取基于来自 step1) 的项目 ID 的项目数据。之后,我需要将该数据推送到 5 个服务器。我有点迷失如何迭代项目 ID 以获取项目 ID,然后获取与每个项目 ID 对应的数据
  • 这是我的代码 - 但它没有正确完成 - 我做错了什么?
  • 1.你没有使用广播块,所以只有第一个块会收到消息。 2.你用Wait阻塞你的线程,这很糟糕,使用asyncawait`。 3. SO 不是为您编译代码的地方。你明白了,你就让它发挥作用。
  • 很抱歉给您带来不便。这是我第一次在 SO 上发帖。我已经编辑了我的代码,以便现在可以编译它。我不确定我得到广播块。你能给我举个例子吗?所以我基本上想将数据推送到所有服务器,比如这里 var pR = serverList.Select( i => new { Id = i, Action = new ActionBlock(dto => PostEachServerAsync(dto, i, " set"), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 3 }) });
  • 我已经为您提供了一个广播示例。它将消息传播到所有个链接块。在您的示例中,只有第一个会收到消息
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2016-06-08
  • 2014-01-25
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多