【发布时间】:2021-03-17 00:27:22
【问题描述】:
我正在查看this MSDN sample code 并进行了一些调整,以弄清楚如果受限资源(如演示中所定义)是创建了一个、两个、四个或更多实例的“内存”会发生什么。
例如,Main() 有一个定义的数量,可以增加或减少数量
networkResources.Post(new NetworkResource() { Name = "eth0" });
memoryResources.Post(new MemoryResource() { Name = "Memory01" });
fileResources.Post(new FileResource() { Name = "MFMHardDrive01" });
我还试图更多地了解 ActionBlock,特别是“资源被释放回其池”的行。该行是否只是对需要完成的额外工作的引用,因为它似乎只是像 while() 循环一样循环消息,没有结束,这使得这感觉类似于线程的 SpinWait()。
添加此循环引用循环的行(可能意味着从另一个队列中提取)如下。也许这是作为某种线程心跳来防止线程的低效释放/重新分配
networkResources.Post(data.Item1);
memoryResources.Post(data.Item2);
为了更容易弄清楚这一点,我在输出中添加了颜色以使线程更容易识别。
我的理解和期望是,通过添加更多 MemoryResources 是系统应该并行运行,类似于 Windows NT 管理,我可以将网卡分配给各个 CPU 以实现更有效的并行化。当我添加额外的 MemoryResource 项时,在同一个域(网络)或多个源域(网络 + 磁盘)内不会发生并行化。
并行化的瓶颈在哪里定义?
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks.Dataflow;
// Demonstrates how to use non-greedy join blocks to distribute
// resources among a dataflow network.
class Program
{
// Represents a resource. A derived class might represent
// a limited resource such as a memory, network, or I/O
// device.
abstract class Resource
{
public object TeapotIsResource { get; set; }
public int TouchCount { get; set; }
}
// Represents a memory resource. For brevity, the details of
// this class are omitted.
class MemoryResource : Resource
{
public MemoryResource()
{
NetworkSequenceID = ran.Next();
}
Random ran = new Random((int)DateTime.Now.Ticks);
public int NetworkSequenceID { get; set; }
public object TeapotIsMemory { get; set; }
public string Name { get; set; }
}
// Represents a network resource. For brevity, the details of
// this class are omitted.
class NetworkResource : Resource
{
public NetworkResource()
{
NetworkSequenceID = ran.Next();
}
Random ran = new Random((int)DateTime.Now.Ticks);
public int NetworkSequenceID { get; set; }
public object TeapotIsNetwork { get; set; }
public string Name { get; set; }
}
// Represents a file resource. For brevity, the details of
// this class are omitted.
class FileResource : Resource
{
public FileResource()
{
NetworkSequenceID = ran.Next();
}
Random ran = new Random((int)DateTime.Now.Ticks);
public int NetworkSequenceID { get; set; }
public string Name { get; set; }
}
public static DateTime TimeForFile = DateTime.UtcNow;
public static DateTime TimeForNetwork = DateTime.UtcNow;
static int NetworkSequenceID = 0;
static void Main(string[] args)
{
Console.ForegroundColor = ConsoleColor.White;
colorMap.Add(867);
// Create three BufferBlock<T> objects. Each object holds a different
// type of resource.
var networkResources = new BufferBlock<NetworkResource>();
var fileResources = new BufferBlock<FileResource>();
var memoryResources = new BufferBlock<MemoryResource>();
// Create two non-greedy JoinBlock<T1, T2> objects.
// The first join works with network and memory resources;
// the second pool works with file and memory resources.
var joinNetworkAndMemoryResources =
new JoinBlock<NetworkResource, MemoryResource>(
new GroupingDataflowBlockOptions
{
Greedy = false,
});
var joinFileAndMemoryResources =
new JoinBlock<FileResource, MemoryResource>(
new GroupingDataflowBlockOptions
{
Greedy = false,
});
// Create two ActionBlock<T> objects.
// The first block acts on a network resource and a memory resource.
// The second block acts on a file resource and a memory resource.
var networkMemoryAction =
new ActionBlock<Tuple<NetworkResource, MemoryResource>>(
data =>
{
// Perform some action on the resources.
// Print a message.
PrintLine("Network worker", data.Item1.Name, data.Item1.NetworkSequenceID, networkResources.Count,
"using resources on", data.Item2.Name, data.Item2.NetworkSequenceID, memoryResources.Count);
// Simulate a lengthy operation that uses the resources.
Thread.Sleep(2000);
// Print a message.
PrintLine("Network worker", data.Item1.Name, data.Item1.NetworkSequenceID, networkResources.Count,
"finished using resources on", data.Item2.Name, data.Item2.NetworkSequenceID, memoryResources.Count);
data.Item1.TouchCount = data.Item1.TouchCount + 1;
data.Item2.TouchCount = data.Item2.TouchCount + 1;
if (data.Item2.TouchCount == 10)
{
Console.WriteLine(DateTime.UtcNow - TimeForNetwork);
}
networkResources.Post(data.Item1);
memoryResources.Post(data.Item2);
});
var fileMemoryAction =
new ActionBlock<Tuple<FileResource, MemoryResource>>(
data =>
{
// Perform some action on the resources.
// Print a message.
PrintLine("File worker", data.Item1.Name, data.Item1.NetworkSequenceID, fileResources.Count,
"using resources on", data.Item2.Name, data.Item2.NetworkSequenceID, memoryResources.Count);
// Simulate a lengthy operation that uses the resources.
//Thread.Sleep(new Random().Next(500, 2000));
Thread.Sleep(2000);
// Print a message.
PrintLine("File worker", data.Item1.Name, data.Item1.NetworkSequenceID, fileResources.Count,
"finished using resources on", data.Item2.Name, data.Item2.NetworkSequenceID, memoryResources.Count);
data.Item1.TouchCount = data.Item1.TouchCount + 1;
data.Item2.TouchCount = data.Item2.TouchCount + 1;
if (data.Item2.TouchCount == 10 )
{
Console.WriteLine(DateTime.UtcNow - TimeForFile);
}
// Release the resources back to their respective pools.
fileResources.Post(data.Item1);
memoryResources.Post(data.Item2);
});
// Link the resource pools to the JoinBlock<T1, T2> objects.
// Because these join blocks operate in non-greedy mode, they do not
// take the resource from a pool until all resources are available from
// all pools.
networkResources.LinkTo(joinNetworkAndMemoryResources.Target1);
memoryResources.LinkTo(joinNetworkAndMemoryResources.Target2);
fileResources.LinkTo(joinFileAndMemoryResources.Target1);
memoryResources.LinkTo(joinFileAndMemoryResources.Target2);
// Link the JoinBlock<T1, T2> objects to the ActionBlock<T> objects.
joinNetworkAndMemoryResources.LinkTo(networkMemoryAction);
joinFileAndMemoryResources.LinkTo(fileMemoryAction);
// Populate the resource pools. In this example, network and
// file resources are more abundant than memory resources.
Console.WriteLine("ADDING: Allocating 4 Network interfaces");
networkResources.Post(new NetworkResource() { Name = "eth0" });
networkResources.Post(new NetworkResource() { Name = "eth1" });
//networkResources.Post(new NetworkResource() { Name = "eth2" });
//networkResources.Post(new NetworkResource() { Name = "eth3" });
Console.WriteLine("ADDING: Allocate a small memory resource");
memoryResources.Post(new MemoryResource() { Name = "Memory01" });
memoryResources.Post(new MemoryResource() { Name = "Memory02" });
Console.WriteLine("ADDING: Old disk technology simulator ");
fileResources.Post(new FileResource() { Name = "MFMHardDrive01" });
//fileResources.Post(new FileResource() { Name = "MFMHardDrive02" });
// Allow data to flow through the network for several seconds.
Thread.Sleep(10000000);
}
public static void MessageProcessor(string from)
{
}
private static void PrintLine(string workerType, string dataItem1Name, int networkSequenceID1, int count1, string onString, string item2Name, int networkSequenceID2, int count2)
{
// Many threads writing to console
lock (colorMap)
{
int fixAPICountingAtZero = 1;
Console.Write(workerType + " " + dataItem1Name + "/");
SetConsoleColor(networkSequenceID1);
Console.Write("(" + (count1 + fixAPICountingAtZero) + "): " + onString + item2Name + "/");
SetConsoleColor(networkSequenceID2);
Console.WriteLine(" (" + (count2 + fixAPICountingAtZero) + ")");
}
}
static List<int> colorMap = new List<int>();
static string SetConsoleColor(int anumber)
{
if (!colorMap.Contains(anumber))
{
colorMap.Add(anumber);
}
var original = Console.ForegroundColor;
Console.ForegroundColor = (ConsoleColor)colorMap.IndexOf(anumber);
Console.Write(anumber);
Console.ForegroundColor = original;
return "";
}
}
/* Sample output:
File worker: using resources...
File worker: finished using resources...
Network worker: using resources...
Network worker: finished using resources...
File worker: using resources...
File worker: finished using resources...
Network worker: using resources...
Network worker: finished using resources...
File worker: using resources...
File worker: finished using resources...
File worker: using resources...
File worker: finished using resources...
Network worker: using resources...
Network worker: finished using resources...
Network worker: using resources...
Network worker: finished using resources...
File worker: using resources...
*/
【问题讨论】:
-
不要过多关注如何分配各种线程来完成这项工作,因为 TPL Dataflow 库是基于 Task Parallel Library (TPL) 的,线程的使用由任务的使用。 TPL 数据流在内部操作任务,而不是线程。
标签: c# task-parallel-library msdn tpl-dataflow