【问题标题】:how can i implement a concurrent execution queue class?如何实现并发执行队列类?
【发布时间】:2019-08-05 00:58:15
【问题描述】:

我需要一个在线程池中执行动作的类,但这些动作应该排队。例如:

方法一 方法二 方法三

当有人从他的线程中调用方法 1 时,他也可以调用方法 2 或方法 3,并且这 3 个方法可以同时执行,但是当用户对方法 1 或 2 或 3 进行另一个调用时,这一次是线程池应该阻止这些调用,直到旧的执行完成。

如下图所示:

我应该使用频道吗?

【问题讨论】:

  • 已经有一个通用的 ConcurrectQueue 类。您只需要选择 T 必须是哪种类型。 docs.microsoft.com/en-us/dotnet/api/…
  • 在该图像中,来自主线程的 1-4 是什么?为什么它们会被发送到函数中?
  • @Christopher 我认为它们是调用序列号,类似于第一次调用第二次调用等等......以可视化流程和架构:(
  • @Christopher 正如你提到的 CocurrectQueue 对这个解决方案有好处。但是类如何确定应该将哪个调用方法添加到队列中?
  • 您按照元素应该出现的确切顺序将元素添加到队列中。 |很难说你的目标是什么。现在阅读您的描述时存在语言障碍。你的画也没有真正告诉我你想做什么。我需要知道你正在拍摄的模式,才能从这些信息中猜出你的意思。我需要更多信息,否则您必须希望其他人可以帮助您。

标签: c# .net concurrency queue system.threading.channels


【解决方案1】:

should i use channels?,答案是肯定的,但也有其他可用的功能。

数据流

.NET 已经通过 TPL 数据流类提供了此功能。您可以使用ActionBlock 类将消息(即数据)传递给在后台以保证顺序和可配置的并行度执行的工作方法。频道是一项新功能,其工作基本相同。

您所描述的实际上是使用 ActionBlock 的最简单方法 - 只需向其发布数据消息并让它一一处理:

void Method1(MyDataObject1 data){...}

var block=new ActionBlock<MyDataObject1>(Method1);

//Start sending data to the block

for(var msg in someListOfItems)
{
    block.PostAsync(msg);
}

默认情况下,ActionBlock 有一个无限的输入队列。它将仅使用一项任务按照消息发布的顺序异步处理消息。

当你完成它时,你可以告诉它Complete() 并异步等待所有剩余的项目完成处理:

block.Complete();
await block.Completion;

要处理不同的方法,您可以简单地使用多个块,例如:

var block1=new ActionBlock<MyDataObject1>(Method1);
var block2=new ActionBlock<MyDataObject1>(Method2);

频道

通道是比块低级的功能。这意味着您必须编写更多代码,但您可以更好地控制“处理块”的工作方式。事实上,您可能可以使用通道重写 TPL Dataflow 库。

您可以使用以下(有点幼稚)方法创建类似于 ActionBlock 的处理块:

ChannelWriter<TIn> Work(Action<TIn> action)
{
    var channel=Channel.CreateUnbounded<TIn>();
    var workerTask=Task.Run(async ()=>{
        await foreach(var msg in channel.Reader.ReadAllAsync())
        {
            action(msg);
        }
    })

    var writer=channel.Writer;

    return writer;
}

此方法创建一个通道并在后台运行一个任务以异步读取数据并处理它们。我在这里使用 C#8 和 .NET Core 3.0 中可用的 await foreachChannelReader.ReadAllAsync() 作弊。

这个方法可以像块一样使用:

ChannelWriter<DataObject1> writer1 = Work(Method1);

foreach(var msg in someListOfItems)
{
    writer1.WriteAsync(msg);
}

writer1.Complete();

不过,频道还有很多其他功能。 SignalR 例如使用它们来允许streaming of notifications to the clients

【讨论】:

  • 很好的解释。非常感谢。你可能知道我正在开发一个多人游戏,但这是非常混乱的工作。
  • @aidinjalalvandi 块,尤其是通道在这种情况下更加有用 - 您可以为每个活跃的玩家/NPC/事物创建一个“代理”,并通过它自己的通道向它发送消息。您可以在管道中组合工作人员功能,将消息从一个工作人员路由到另一个工作人员等。
  • @aidinjalalvandi 或者您可以创建监听多个频道的“工人”,并首先处理更高优先级的消息。他们可以先阅读killedshutdown 频道,然后再继续处理其他消息。
  • 有没有办法在任务被取消时撤消操作,比如在游戏中设置动画然后取消它。这叫缓存吗?
【解决方案2】:

这是我的建议。对于每个同步方法,都应该添加一个异步方法。例如方法FireTheGun是同步的:

private static void FireTheGun(int bulletsCount)
{
    var ratata = Enumerable.Repeat("Ta", bulletsCount).Prepend("Ra");
    Console.WriteLine(String.Join("-", ratata));
}

异步对应的FireTheGunAsync 非常简单,因为将同步操作排队的复杂性委托给了辅助方法QueueAsync

public static Task FireTheGunAsync(int bulletsCount)
{
    return QueueAsync(FireTheGun, bulletsCount);
}

这里是QueueAsync的实现。每个动作都有其专用的SemaphoreSlim,以防止多个并发执行:

private static ConcurrentDictionary<MethodInfo, SemaphoreSlim> semaphores =
    new ConcurrentDictionary<MethodInfo, SemaphoreSlim>();

public static Task QueueAsync<T1>(Action<T1> action, T1 param1)
{
    return Task.Run(async () =>
    {
        var semaphore = semaphores
            .GetOrAdd(action.Method, key => new SemaphoreSlim(1));
        await semaphore.WaitAsync();
        try
        {
            action(param1);
        }
        finally
        {
            semaphore.Release();
        }
    });
}

使用示例:

FireTheGunAsync(5);
FireTheGunAsync(8);

输出:

Ra-Ta-Ta-Ta-Ta-Ta
Ra-Ta-Ta-Ta-Ta-Ta-Ta-Ta-Ta

使用不同数量的参数实现QueueAsync 的版本应该很简单。


更新:我之前的QueueAsync 实现可能具有不受欢迎的行为,即以随机顺序执行操作。发生这种情况是因为第二个任务可能是第一个获取信号量的任务。下面是一个保证正确执行顺序的实现。在高竞争的情况下性能可能会很差,因为每个任务都会进入一个循环,直到它以正确的顺序获取信号量。

private class QueueInfo
{
    public SemaphoreSlim Semaphore = new SemaphoreSlim(1);
    public int TicketToRide = 0;
    public int Current = 0;
}

private static ConcurrentDictionary<MethodInfo, QueueInfo> queues =
    new ConcurrentDictionary<MethodInfo, QueueInfo>();

public static Task QueueAsync<T1>(Action<T1> action, T1 param1)
{
    var queue = queues.GetOrAdd(action.Method, key => new QueueInfo());
    var ticket = Interlocked.Increment(ref queue.TicketToRide);
    return Task.Run(async () =>
    {
        while (true) // Loop until our ticket becomes current
        {
            await queue.Semaphore.WaitAsync();
            try
            {
                if (Interlocked.CompareExchange(ref queue.Current,
                    ticket, ticket - 1) == ticket - 1)
                {
                    action(param1);
                    break;
                }
            }
            finally
            {
                queue.Semaphore.Release();
            }
        }
    });
}

【讨论】:

    【解决方案3】:

    这个解决方案怎么样?

    public class ConcurrentQueue
    {
        private Dictionary<byte, PoolFiber> Actionsfiber;
        public ConcurrentQueue()
        {
            Actionsfiber = new Dictionary<byte, PoolFiber>()
            {
                { 1, new PoolFiber() },
                { 2, new PoolFiber() },
                { 3, new PoolFiber() },
            };
            foreach (var fiber in Actionsfiber.Values)
            {
                fiber.Start();
            }
        }
            
        public void ExecuteAction(Action Action , byte Code)
        {
            if (Actionsfiber.ContainsKey(Code))
                Actionsfiber[Code].Enqueue(() => { Action.Invoke(); });
            else
                Console.WriteLine($"invalid byte code");
        }
    
    }
    
    public static void SomeAction1()
    {
        Console.WriteLine($"{DateTime.Now} Action 1 is working");
        for (long i = 0; i < 2400000000; i++)
        {
    
        }
        Console.WriteLine($"{DateTime.Now} Action 1 stopped");
    }
                
    public static void SomeAction2()
    {
        Console.WriteLine($"{DateTime.Now} Action 2 is working");
        for (long i = 0; i < 5000000000; i++)
        {
    
        }
        Console.WriteLine($"{DateTime.Now} Action 2 stopped");
    }
                
    public static void SomeAction3()
    {
        Console.WriteLine($"{DateTime.Now} Action 3 is working");
        for (long i = 0; i < 5000000000; i++)
        {
    
        }
        Console.WriteLine($"{DateTime.Now} Action 3 stopped");
    }
    
    
    public static void Main(string[] args)
    {
        ConcurrentQueue concurrentQueue = new ConcurrentQueue();
    
        concurrentQueue.ExecuteAction(SomeAction1, 1);
        concurrentQueue.ExecuteAction(SomeAction2, 2);
        concurrentQueue.ExecuteAction(SomeAction3, 3);
        concurrentQueue.ExecuteAction(SomeAction1, 1);
        concurrentQueue.ExecuteAction(SomeAction2, 2);
        concurrentQueue.ExecuteAction(SomeAction3, 3);
    
        Console.WriteLine($"press any key to exit the program");
        Console.ReadKey();
    }
    

    输出:

    2019 年 8 月 5 日上午 7:56:57 操作 1 正在运行

    2019 年 8 月 5 日上午 7:56:57 操作 3 正在运行

    2019 年 8 月 5 日上午 7:56:57 操作 2 正在运行

    2019 年 8 月 5 日上午 7:57:08 操作 1 已停止

    2019 年 8 月 5 日上午 7:57:08 操作 1 正在运行

    2019 年 8 月 5 日上午 7:57:15 动作 2 停止

    2019 年 8 月 5 日上午 7:57:15 操作 2 正在运行

    2019 年 8 月 5 日上午 7:57:16 动作 3 已停止

    2019 年 8 月 5 日上午 7:57:16 操作 3 正在运行

    2019 年 8 月 5 日上午 7:57:18 动作 1 已停止

    2019 年 8 月 5 日上午 7:57:33 动作 2 停止

    2019 年 8 月 5 日上午 7:57:33 动作 3 停止

    poolFiber 是 ExitGames.Concurrency.Fibers 命名空间中的一个类。 更多信息:

    How To Avoid Race Conditions And Other Multithreading Issues?

    【讨论】:

    • 这是否按照问题的要求将操作排入线程池中以执行?
    • @TheodorZoulias ofcourse。我现在已经以多线程方式对其进行了测试,至少对我有用。
    • 您可能需要在控制台输出中添加Thread.CurrentThread.ManagedThreadId,以便更好地了解正在发生的事情。
    • @TheodorZoulias right.i 现在正在输出线程 id。实际上,当我创建一个线程和线程启动并为每个线程调用这些队列时,来自 ConcurrentQueue 类的线程 id 等于线程created.but 在“SomeActions”方法中线程是不同的,因为 poolfiber 根据上面的链接创建了一个在不同线程中工作的任务列表。但我不认为它会影响我的游戏。因为我只需要处理用户输入并行。但是这段代码会影响任何 CPU 问题吗?我的意思是,我是否为每个客户端使用了过多的 CPU?
    • @TheodorZoulias 看看这张照片。 ibb.co/ZVPVnvV 。我在每个“SomeActions”方法上放了一个 thread.sleep。所以执行“SomeActions”方法的最大成本时间是 4 秒。程序在 9 秒内结束。有 2 个函数调用。2 * 4 = 8 秒。是这个好消息与否?
    猜你喜欢
    • 2014-09-21
    • 2014-08-19
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2015-08-26
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多