【问题标题】:BroadcastBlock missing items广播块丢失项目
【发布时间】:2017-06-06 20:54:04
【问题描述】:

我有一个需要处理的项目编号列表。一个项目可能有大约 8000 个项目,我需要获取项目中每个项目的数据,然后将这些数据推送到服务器列表中。谁能告诉我以下..

1) 我在 iR 中有 1000 个项目,但只有 998 个项目被写入服务器。我是否通过使用 broadCastBlock 丢失了物品? 2) 我是否正确地等待所有 actionBlocks? 3) 如何使数据库调用异步?

这是数据库代码

    public  MemcachedDTO GetIR(MemcachedDTO dtoItem)
    {

        string[] Tables = new string[] { "iowa", "la" };
        using (SqlConnection connection = new SqlConnection(ConfigurationManager.ConnectionStrings["test"].ConnectionString))
        {
            using (SqlCommand command = new SqlCommand("test", connection))
            {
                DataSet Result = new DataSet();
                command.CommandType = CommandType.StoredProcedure;

                command.Parameters.Add("@ProjectId", SqlDbType.VarChar);
                command.Parameters["@ProjectId"].Value = dtoItem.ProjectId;


                connection.Open();
                Result.EnforceConstraints = false;
                Result.Load(command.ExecuteReader(CommandBehavior.CloseConnection), LoadOption.OverwriteChanges, Tables);
                dtoItem.test = Result;
            }
        }
        return dtoItem;
    }

更新: 我已将代码更新为以下内容。当我运行它时它只是挂起并且只将 1/4 的数据写入服务器?你能告诉我我做错了什么吗?

      public static ITargetBlock<T> CreateGuaranteedBroadcastBlock<T>(IEnumerable<ITargetBlock<T>> targets, DataflowBlockOptions options)
    {
        var targetsList = targets.ToList();

        var block = new ActionBlock<T>(
            async item =>
            {
                foreach (var target in targetsList)
                {
                    await target.SendAsync(item);
                }
            }, new ExecutionDataflowBlockOptions
            {
                CancellationToken = options.CancellationToken
            });

        block.Completion.ContinueWith(task =>
        {
            foreach (var target in targetsList)
            {
                if (task.Exception != null)
                    target.Fault(task.Exception);
                else
                    target.Complete();
            }
        });

        return block;
    }

    [HttpGet]
    public async Task< HttpResponseMessage> ReloadItem(string projectQuery)
    {
        try
        {

            var linkCompletion = new ExecutionDataflowBlockOptions
            {
                 MaxDegreeOfParallelism = 2
            };
             var cts = new CancellationTokenSource();
            var dbOptions = new DataflowBlockOptions { CancellationToken = cts.Token };


            IList<string> projectIds = projectQuery.Split(',').ToList();
            IEnumerable<string> serverList = ConfigurationManager.AppSettings["ServerList"].Split(',').Cast<string>();

            var iR = new TransformBlock<MemcachedDTO, MemcachedDTO>(
                dto => dto.GetIR(dto), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 3 });

            List<ActionBlock<MemcachedDTO>> actionList = new List<ActionBlock<MemcachedDTO>>();


            List<MemcachedDTO> dtoList = new List<MemcachedDTO>();

            foreach (string pid in projectIds)
            {
                IList<MemcachedDTO> dtoTemp = new List<MemcachedDTO>();
                dtoTemp = MemcachedDTO.GetItemIdsByProject(pid);
                dtoList.AddRange(dtoTemp);
            }
            foreach (string s in serverList)
            {
                var action = new ActionBlock<MemcachedDTO>(
                async dto => await PostEachServerAsync(dto, s, "setitemcache"));
                actionList.Add(action);
            }
            var bBlock = CreateGuaranteedBroadcastBlock(actionList, dbOptions);

            foreach (MemcachedDTO d in dtoList)
            {
                await iR.SendAsync(d);
            }

            iR.Complete();
            iR.LinkTo(bBlock);
            await Task.WhenAll(actionList.Select(action => action.Completion).ToList());

            return Request.CreateResponse(HttpStatusCode.OK, new { message = projectIds.ToString() + " reload success" });
        }
        catch (Exception ex)
        {
            return Request.CreateResponse(HttpStatusCode.InternalServerError, new { message = ex.Message.ToString() });
        }
    }

【问题讨论】:

标签: tpl-dataflow


【解决方案1】:

1) 我在 iR 中有 1000 个项目,但只有 998 个项目被写入服务器。我是否通过使用 broadCastBlock 丢失了物品?

是的,在下面的代码中,您将 BoundedCapacity 设置为 1,如果您的 BroadcastBlock 在任何时候都无法传递一个项目,它将丢弃它。此外,BroadcastBlock 只会将Completion 传播到TargetBlock,请勿在此处使用PropagateCompletion=true。如果您希望所有块都完成,您需要手动处理Completion。这可以通过在BroadcastBlock 上设置ContinueWith 以将Completion 传递给所有连接的目标来完成。

var action = new ActionBlock<MemcachedDTO>(dto => PostEachServerAsync(dto, s, "set"), new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 3, BoundedCapacity = 1 });
broadcast.LinkTo(action, linkCompletion);
actionList.Add(action);

选项: 使用有界的BufferBlock 代替BroadcastBlock。当您的下游块绑定到一个项目时,他们无法接收其他项目,直到他们完成处理他们拥有的东西。这将允许BufferBlock 将其物品提供给另一个可能空闲的ActionBlock

当您将项目添加到受限制的流中时,即BoundedCapacity 小于 Unbounded 的流。您需要使用SendAsync 方法或至少处理Post 的返回。我建议只使用SendAsync:

foreach (MemcachedDTO d in dtoList)
{
    await iR.SendAsync(d);
}

这将强制您的方法签名变为:

public async Task<HttpResponseMessage> ReloadItem(string projectQuery)

2) 我是否正确地等待所有 actionBlocks?

之前的更改将允许您放弃阻塞的 Wait 调用,转而使用 await Task.WhenAlll

iR.Complete();
actionList.ForEach(x => x.Completion.Wait());

To:

iR.Complete();
await bufferBlock.Completion.ContinueWith(tsk => actionList.ForEach(x => x.Complete());
await Task.WhenAll(actionList.Select(action => action.Completion).ToList());

3) 如何使数据库调用异步?

我将保持开放状态,因为它应该是一个与TPL-Dataflow 无关的单独问题,但简而言之,使用async Api 访问您的数据库,async 将自然地在您的代码库中增长。 This should get you started.

BufferBlock 与 BroadcastBlock

重新阅读您的previous question 和@VMAtm 的答案后。您似乎希望将每个项目发送到全部五台服务器,在这种情况下,您需要BroadcastBlock。您可以使用BufferBlock 将消息相对均匀地分配到一个灵活的服务器池中,每个服务器都可以处理一条消息。尽管如此,您仍然需要通过等待BroadcastBlock 的完成来控制向所有连接的ActionBlocks 传播完成和故障。

防止 BroadcastBlock 丢弃消息

一般你有两个选项,将你的ActionBlocks设置为未绑定,这是它们的默认值:

new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 3, BoundedCapacity = Unbounded });

从你自己的任何结构中广播你自己的信息。 Here is an example implementation 来自 @i3arnon。 And another 来自@svick

【讨论】:

  • 感谢您的详细回复@JSteward。我已经在上面的帖子中更新了我的代码,但仍然有问题......应用程序没有完成(我删除了 PropagateCompletion)并且只写入了 1/4 的记录。你能指出我正确的方向吗?谢谢
  • 首先跳出来的是:在所有数据都发送到流中之前,您不会将您的TransformBlock 链接到您的BroadcastBlock。当你修复那部分时会发生什么?
  • 另一种可能性,PostEachServerAsync 不等待,如果它正在运行 async,那么它被视为火灾并忘记,并且在您的流程完成时可能无法完成。
  • 我已经更新了@JSteward 上面的代码——请查看更新后的代码。我已将转换块链接到广播块并等待 PostEachServerAsync。现在我在 System.Data.dll 中抛出一个异常:'System.OutOfMemoryException'。您对如何解决此问题有任何想法吗?
  • 我实际上没有看到您的代码有任何更新,但您确实有一个未绑定的流程。这意味着每个MemcachedDTO 都在内存中。此时,您需要分析您的代码并找出占用如此多内存的原因。您可以尝试限制您的流程,以在给定时间仅允许 x 个项目通过。如果您需要进一步的帮助,您需要minimal reproducible example 并可以发布另一个问题。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2014-10-23
  • 2018-07-02
  • 1970-01-01
  • 1970-01-01
  • 2023-03-04
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多