【问题标题】:Looking for clarity on MSDN TPL Dataflow sample, and parallelization寻找关于 MSDN TPL 数据流示例和并行化的清晰度
【发布时间】:2021-03-17 00:27:22
【问题描述】:

我正在查看this MSDN sample code 并进行了一些调整,以弄清楚如果受限资源(如演示中所定义)是创建了一个、两个、四个或更多实例的“内存”会发生什么。

例如,Main() 有一个定义的数量,可以增加或减少数量

networkResources.Post(new NetworkResource() { Name = "eth0" });
memoryResources.Post(new MemoryResource() { Name = "Memory01" });
fileResources.Post(new FileResource() { Name = "MFMHardDrive01" });

我还试图更多地了解 ActionBlock,特别是“资源被释放回其池”的行。该行是否只是对需要完成的额外工作的引用,因为它似乎只是像 while() 循环一样循环消息,没有结束,这使得这感觉类似于线程的 SpinWait()。

添加此循环引用循环的行(可能意味着从另一个队列中提取)如下。也许这是作为某种线程心跳来防止线程的低效释放/重新分配

networkResources.Post(data.Item1);
memoryResources.Post(data.Item2);

为了更容易弄清楚这一点,我在输出中添加了颜色以使线程更容易识别。

我的理解和期望是,通过添加更多 MemoryResources 是系统应该并行运行,类似于 Windows NT 管理,我可以将网卡分配给各个 CPU 以实现更有效的并行化。当我添加额外的 MemoryResource 项时,在同一个域(网络)或多个源域(网络 + 磁盘)内不会发生并行化。

并行化的瓶颈在哪里定义?

using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks.Dataflow;

// Demonstrates how to use non-greedy join blocks to distribute
// resources among a dataflow network.
class Program
{
    // Represents a resource. A derived class might represent
    // a limited resource such as a memory, network, or I/O
    // device.
    abstract class Resource
    {
        public object TeapotIsResource { get; set; }

        public int TouchCount { get; set; }
    }

    // Represents a memory resource. For brevity, the details of
    // this class are omitted.
    class MemoryResource : Resource
    {
        public MemoryResource()
        {
            NetworkSequenceID = ran.Next();
        }

        Random ran = new Random((int)DateTime.Now.Ticks);
        public int NetworkSequenceID { get; set; }

        public object TeapotIsMemory { get; set; }

        public string Name { get; set; }
    }

    // Represents a network resource. For brevity, the details of
    // this class are omitted.
    class NetworkResource : Resource
    {
        public NetworkResource()
        {
            NetworkSequenceID = ran.Next();
        }

        Random ran = new Random((int)DateTime.Now.Ticks);
        public int NetworkSequenceID { get; set; }

        public object TeapotIsNetwork { get; set; }

        public string Name { get; set; }
    }

    // Represents a file resource. For brevity, the details of
    // this class are omitted.
    class FileResource : Resource
    {
        public FileResource()
        {
            NetworkSequenceID = ran.Next();
        }

        Random ran = new Random((int)DateTime.Now.Ticks);
        public int NetworkSequenceID { get; set; }

        public string Name { get; set; }
    }

    public static  DateTime TimeForFile = DateTime.UtcNow;
    public  static DateTime TimeForNetwork = DateTime.UtcNow;

    static int NetworkSequenceID = 0;
    static void Main(string[] args)
    {
        Console.ForegroundColor = ConsoleColor.White;
        colorMap.Add(867);


        // Create three BufferBlock<T> objects. Each object holds a different
        // type of resource.
        var networkResources = new BufferBlock<NetworkResource>();
        var fileResources = new BufferBlock<FileResource>();
        var memoryResources = new BufferBlock<MemoryResource>();

        // Create two non-greedy JoinBlock<T1, T2> objects.
        // The first join works with network and memory resources;
        // the second pool works with file and memory resources.

        var joinNetworkAndMemoryResources =
           new JoinBlock<NetworkResource, MemoryResource>(
              new GroupingDataflowBlockOptions
              {
                  Greedy = false,
              });

        var joinFileAndMemoryResources =
           new JoinBlock<FileResource, MemoryResource>(
              new GroupingDataflowBlockOptions
              {
                  Greedy = false,
              });

        // Create two ActionBlock<T> objects.
        // The first block acts on a network resource and a memory resource.
        // The second block acts on a file resource and a memory resource.

        var networkMemoryAction =
           new ActionBlock<Tuple<NetworkResource, MemoryResource>>(
              data =>
              {
                  // Perform some action on the resources.

                  // Print a message.
                  PrintLine("Network worker", data.Item1.Name, data.Item1.NetworkSequenceID, networkResources.Count,
                  "using resources on", data.Item2.Name, data.Item2.NetworkSequenceID, memoryResources.Count);

                  // Simulate a lengthy operation that uses the resources.
                  Thread.Sleep(2000);

                  // Print a message.
                  PrintLine("Network worker", data.Item1.Name, data.Item1.NetworkSequenceID, networkResources.Count,
                  "finished using resources on", data.Item2.Name, data.Item2.NetworkSequenceID, memoryResources.Count);

                  data.Item1.TouchCount = data.Item1.TouchCount + 1;
                  data.Item2.TouchCount = data.Item2.TouchCount + 1;

                  if (data.Item2.TouchCount == 10)
                  {
                      Console.WriteLine(DateTime.UtcNow - TimeForNetwork);
                  }

                  networkResources.Post(data.Item1);
                  memoryResources.Post(data.Item2);
              });

        var fileMemoryAction =
           new ActionBlock<Tuple<FileResource, MemoryResource>>(
              data =>
              {
                  // Perform some action on the resources.
                  // Print a message. 
                  PrintLine("File worker", data.Item1.Name, data.Item1.NetworkSequenceID, fileResources.Count,
                      "using resources on", data.Item2.Name, data.Item2.NetworkSequenceID, memoryResources.Count);

                  // Simulate a lengthy operation that uses the resources.
                  //Thread.Sleep(new Random().Next(500, 2000));
                  Thread.Sleep(2000);

                  // Print a message.
                  PrintLine("File worker", data.Item1.Name, data.Item1.NetworkSequenceID, fileResources.Count,
               "finished using resources on", data.Item2.Name, data.Item2.NetworkSequenceID, memoryResources.Count);

                  data.Item1.TouchCount = data.Item1.TouchCount + 1;
                  data.Item2.TouchCount = data.Item2.TouchCount + 1;

                  if (data.Item2.TouchCount == 10 )
                  {
                      Console.WriteLine(DateTime.UtcNow - TimeForFile);
                  }
                  // Release the resources back to their respective pools.
                  fileResources.Post(data.Item1);
                  memoryResources.Post(data.Item2);
              });

        // Link the resource pools to the JoinBlock<T1, T2> objects.
        // Because these join blocks operate in non-greedy mode, they do not
        // take the resource from a pool until all resources are available from
        // all pools.

        networkResources.LinkTo(joinNetworkAndMemoryResources.Target1);
        memoryResources.LinkTo(joinNetworkAndMemoryResources.Target2);

        fileResources.LinkTo(joinFileAndMemoryResources.Target1);
        memoryResources.LinkTo(joinFileAndMemoryResources.Target2);

        // Link the JoinBlock<T1, T2> objects to the ActionBlock<T> objects.

        joinNetworkAndMemoryResources.LinkTo(networkMemoryAction);
        joinFileAndMemoryResources.LinkTo(fileMemoryAction);

        // Populate the resource pools. In this example, network and
        // file resources are more abundant than memory resources.

        Console.WriteLine("ADDING: Allocating 4 Network interfaces");
        networkResources.Post(new NetworkResource() { Name = "eth0" });
        networkResources.Post(new NetworkResource() { Name = "eth1" });
        //networkResources.Post(new NetworkResource() { Name = "eth2" });
        //networkResources.Post(new NetworkResource() { Name = "eth3" });

        Console.WriteLine("ADDING: Allocate a small memory resource");
        memoryResources.Post(new MemoryResource() { Name = "Memory01" }); 
        memoryResources.Post(new MemoryResource() { Name = "Memory02" }); 

        Console.WriteLine("ADDING: Old disk technology simulator ");
        fileResources.Post(new FileResource() { Name = "MFMHardDrive01" });
        //fileResources.Post(new FileResource() { Name = "MFMHardDrive02" });

        // Allow data to flow through the network for several seconds.
        Thread.Sleep(10000000);
    }

    public static void MessageProcessor(string from)
    {
    }

    private static void PrintLine(string workerType, string dataItem1Name, int networkSequenceID1, int count1, string onString, string item2Name, int networkSequenceID2, int count2)
    {
        // Many threads writing to console
        lock (colorMap)
        {
            int fixAPICountingAtZero = 1;

            Console.Write(workerType + " " + dataItem1Name + "/");
            SetConsoleColor(networkSequenceID1);
            Console.Write("(" + (count1 + fixAPICountingAtZero) + "): " + onString + item2Name + "/");
            SetConsoleColor(networkSequenceID2);
            Console.WriteLine(" (" + (count2 + fixAPICountingAtZero) + ")");
        }
    }

    static  List<int> colorMap = new List<int>();
    static string SetConsoleColor(int anumber)
    {
       
            if (!colorMap.Contains(anumber))
            {
                colorMap.Add(anumber);
            }

            var original = Console.ForegroundColor;
            Console.ForegroundColor = (ConsoleColor)colorMap.IndexOf(anumber);
            Console.Write(anumber);
            Console.ForegroundColor = original;

        return "";
    }
}

/* Sample output:
File worker: using resources...
File worker: finished using resources...
Network worker: using resources...
Network worker: finished using resources...
File worker: using resources...
File worker: finished using resources...
Network worker: using resources...
Network worker: finished using resources...
File worker: using resources...
File worker: finished using resources...
File worker: using resources...
File worker: finished using resources...
Network worker: using resources...
Network worker: finished using resources...
Network worker: using resources...
Network worker: finished using resources...
File worker: using resources...
*/

【问题讨论】:

  • 不要过多关注如何分配各种线程来完成这项工作,因为 TPL Dataflow 库是基于 Task Parallel Library (TPL) 的,线程的使用由任务的使用。 TPL 数据流在内部操作任务,而不是线程。

标签: c# task-parallel-library msdn tpl-dataflow


【解决方案1】:

如果我对问题的理解正确的话。

我看到的第一个问题是,ActionBlock 构造函数可以采用 options,如果未设置,则默认为以下。

ExecutionDataflowBlockOptions

未设置具体配置选项时,如下 使用默认值:

选项默认

  • 任务调度器:默认
  • CancellationToken:无
  • MaxMessagesPerTask :DataflowBlockOptions.Unbounded (-1)
  • BoundedCapacity:DataflowBlockOptions.Unbounded (-1)
  • MaxDegreeOfParallelism:1

因此,如果您没有在 ActionBlock 上明确设置选项,它将被限制为串行处理消息,因为 MaxDegreeOfParallelism默认1

示例

var blobk = new ActionBlock<T>(
     x => {...},
     new ExecutionDataflowBlockOptions
     {
         MaxDegreeOfParallelism = <something that makes sense to your solution>
     });

注意:您当前的实施使用同步工作流。要真正打开 Dataflow 管道的性能和可扩展性,您需要确保使用 Func&lt;Task&gt; 委托重载,并尽可能针对任何 IO 绑定工作负载利用异步和等待模式。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-06-26
    • 1970-01-01
    • 2023-03-21
    • 1970-01-01
    • 2015-03-26
    • 2017-01-27
    相关资源
    最近更新 更多