【问题标题】:TPL Dataflow Transform block post to two action block in parallelTPL 数据流转换块并行发布到两个操作块
【发布时间】:2022-01-26 03:26:05
【问题描述】:

我有一个 TPL 数据流,它只使用一个转换块,然后是一个动作块就可以正常工作。 我添加了一个新的动作块,与现有的动作块同时执行,但我的新动作块永远不会被击中。没有错误或异常被抛出。 我需要在代码中添加一个步骤吗?

var ListDocId = new ConcurrentBag<string>(ConvertDataSetToList(IdDocDataSet));

if (ListDocId.Any())
{
    var num_thread = GetThreadNumber();

    //Initialize the pipeline of actions
    var downloadBlock = new TransformBlock<string, RequestObject>(docId =>
        new RequestObject
        {
            DownloadedFile = ListDownloadocId),
            IdDoc = docId
        },
        new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 }
    );

    var uploadInS3Block = new ActionBlock<S3RequestUpload>(requestS3Upload =>
        UploadFileAsync(RequestObject.DownloadedFile, RequestObject.IdDoc),
        new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 }
    );

    var InsertdocIdIntoDbBlock = new ActionBlock<RequestObject>(s3Request =>
        InsertIntotDataBase(s3Request.IdDoc, InsertDate),
        new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 }
    );

    var options = new DataflowLinkOptions { PropagateCompletion = true };

    downloadBlock.LinkTo(uploadInS3Block, options);
    downloadBlock.LinkTo(InsertdocIdIntoDbBlock, options);

    foreach (var idDoc in ListDocId)
        downloadInAsterionBlock.Post(idDoc);

    downloadBlock.Complete();
    //uploadInS3Block.Completion.Wait();
    //InsertdocIdIntoDbBlock.Completion.Wait();
    Task.WhenAll(uploadInS3Block.Completion,
        InsertdocIdIntoDbBlock.Completion).Wait();

【问题讨论】:

  • downloadBlock 和 uploadInS3Block 工作正常,但 InsertdocIdIntoDbBlock 永远不会受到打击!
  • 您的代码不是真正的交易,是吗? downloadInAsterionBlockdownloadBlock 一样吗?此外,您将RequestObject 类型的变换块链接到S3RequestUpload 类型的动作块
  • 谢谢彼得,我用相同的方式编辑了代码(downloadBlock)
  • 我使用 RequestObject 因为我必须传递多个参数并且我不能使用元组(C# 5)
  • 这一行有语法错误:DownloadedFile = ListDownloadocId),

标签: c# parallel-processing task-parallel-library tpl-dataflow


【解决方案1】:

您不能将TransformBlock 链接到多个其他块。所以只考虑第一次调用downloadInAsterionBlock.LinkTo()

您需要在downloadBlock 和两个ActionBlock 块之间添加一个BroadcastBlock

downloadBlock -> broadcastBlock -> uploadInS3Block
                                -> InsertdocIdIntoDbBlock 

在代码中它看起来像这样:

var bc = new BroadcastBlock<RequestObject>(ro => ro);

downloadBlock.LinkTo(bc);
bc.LinkTo(uploadInS3Block);
bc.LinkTo(InsertdocIdIntoDbBlock);

【讨论】:

  • 谢谢彼得,现在处理很好地到达了最后一个方法(InsertIntotDataBase),但是在执行将数据插入数据库的最后一个方法时我有一个 AccessViolationException,我添加了 HandleProcessCorruptedStateExceptionsAttribute 但我可以不处理。
  • @NHARIMed 我认为最好为此创建新问题,因为它与使用 Dataflow 块无关。
  • 好的,谢谢彼得!
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多