【问题标题】:Execute multiple pairs of concurrent tasks in parallel并行执行多对并发任务
【发布时间】:2021-04-08 11:12:27
【问题描述】:

详情:

我有一个游戏,两个独立的 AI 互相对战。 每个 AI 都有自己的任务。 两个任务需要同时启动,需要带一些参数并返回一个值。 现在我想并行运行 100-200 个游戏(每两个任务)。

我现在遇到的问题是这两个任务没有一起启动。只要有一些免费资源,它们就会完全随机启动。

代码:

我目前的做法如下。

  • 我有一个输入对象列表,其中包含一些参数。
  • 使用 Parallel.ForEach,我为每个输入对象创建了一个游戏和两个 AI。
  • 哪个 AI 先完成游戏,另一个 AI 会停止另一个 AI,使用 CancellationToken 进行相同的游戏。
  • 所有返回值都保存在 ConcurrentBag 中。

因为每个游戏的两个 AI 任务没有一起启动,所以我添加了一个 AutoResetEvent。我希望我可以等待一个任务,直到第二个任务开始,但 AutoResetEvent.WaitOne 会阻止所有资源。所以 AutoResetEvent 的结果是第一个 AI 任务正在启动并等待第二个任务启动,但由于它们不再释放线程,所以它们永远等待。

        private ConcurrentBag<Individual> TrainKis(List<Individual> population) {
            ConcurrentBag<Individual> resultCollection = new ConcurrentBag<Individual>();
            ConcurrentBag<Individual> referenceCollection = new ConcurrentBag<Individual>();

            Parallel.ForEach(population, individual =>
            {
                GameManager gm = new GameManager();

                CancellationTokenSource c = new CancellationTokenSource();
                CancellationToken token = c.Token;
                AutoResetEvent waitHandle = new AutoResetEvent(false);

                KI_base eaKI = new KI_Stupid(gm, individual.number, "KI-" + individual.number, Color.FromArgb(255, 255, 255));
                KI_base referenceKI = new KI_Stupid(gm, 999, "REF-" + individual.number, Color.FromArgb(0, 0, 0));
                Individual referenceIndividual = CreateIndividual(individual.number, 400, 2000);

                var t1 = referenceKI.Start(token, waitHandle, referenceIndividual).ContinueWith(taskInfo => {
                    c.Cancel();
                    return taskInfo.Result;
                }).Result;
                var t2 = eaKI.Start(token, waitHandle, individual).ContinueWith(taskInfo => { 
                    c.Cancel(); 
                    return taskInfo.Result; 
                }).Result;

                referenceCollection.Add(t1);
                resultCollection.Add(t2);
            });

            return resultCollection;
        }

这是我等待第二个AI播放的AI的启动方法:

            public Task<Individual> Start(CancellationToken _ct, AutoResetEvent _are, Individual _i) {
                i = _i;
                gm.game.kis.Add(this);
                if (gm.game.kis.Count > 1) {
                    _are.Set();
                    return Task.Run(() => Play(_ct));
                }
                else {
                    _are.WaitOne();
                    return Task.Run(() => Play(_ct));
                }
            }

还有简化的玩法

public override Individual Play(CancellationToken ct) {
            Console.WriteLine($"{player.username} started.");
            while (Constants.TOWN_NUMBER*0.8 > player.towns.Count || player.towns.Count == 0) {
                try {
                    Thread.Sleep((int)(Constants.TOWN_GROTH_SECONDS * 1000 + 10));
                }
                catch (Exception _ex) {
                    Console.WriteLine($"{player.username} error: {_ex}");
                }
                
                //here are the actions of the AI (I removed them for better overview)

                if (ct.IsCancellationRequested) {
                    return i;
                }
            }
            if (Constants.TOWN_NUMBER * 0.8 <= player.towns.Count) {
                winner = true;
                return i;
            }
            return i;
        }

有没有更好的方法来做到这一点,保留所有东西但确保每个游戏中的两个 KI-Tasks 同时启动?

【问题讨论】:

  • 两位AI玩家如何互动?他们是否读写某些共享状态?这些读/写操作是使用lock 或其他同步原语同步的,还是无锁的?
  • 两个 AI 都与同一个游戏管理器交互。游戏管理器的某些部分(如包含游戏当前状态的四叉树)被锁定以防止错误。
  • 是否可以选择使用coroutines而不是Tasks来协调每对AI玩家?这个想法是将每个 AI 玩家公开为 iterator(一种返回 IEnumerable 并包含 yield 语句的方法)而不是 Task,并且对于每个“展开”两个迭代器的游戏都有一个 Task一步一个脚印。
  • 两位玩家会不会总是轮流出手?所以 KI1 做了一个动作,然后 KI2、KI1……等等?两个 KI 都需要完全免费玩...
  • 哦,我明白了。您能否在您的问题中包含一个 AI 播放器的异步 Start 方法的简化示例,以便我们能够提供替代建议? (而不是协程)

标签: c# .net multithreading task


【解决方案1】:

我的建议是更改Play 方法的签名,使其返回Task&lt;Individual&gt; 而不是Individual,并将对Thread.Sleep 的调用替换为await Task.Delay。这个小改动应该会对 AI 玩家的响应产生显着的积极影响,因为他们不会阻塞任何线程,ThreadPool 的小线程池将得到最佳利用。

public override async Task<Individual> Play(CancellationToken ct)
{
    Console.WriteLine($"{player.username} started.");
    while (Constants.TOWN_NUMBER * 0.8 > player.towns.Count || player.towns.Count == 0)
    {
        //...
        await Task.Delay((int)(Constants.TOWN_GROTH_SECONDS * 1000 + 10));
        //...
    }
}

您还可以考虑将方法名称从Play 更改为PlayAsync,以符合guidelines

然后你应该刮掉Parallel.ForEach方法,因为it is not async-friendly,而是将每个individual投影到Task,将所有任务捆绑在一个数组中,并等待它们全部使用Task.WaitAll方法完成(或者如果你想去async-all-the-way,可以使用await Task.WhenAll)。

Task[] tasks = population.Select(async individual =>
{
    GameManager gm = new GameManager();
    CancellationTokenSource c = new CancellationTokenSource();

    //...

    var task1 = referenceKI.Start(token, waitHandle, referenceIndividual);
    var task2 = eaKI.Start(token, waitHandle, individual);

    await Task.WhenAny(task1, task2);
    c.Cancel();
    await Task.WhenAll(task1, task2);
    var result1 = await task1;
    var result2 = await task2;
    referenceCollection.Add(result1);
    resultCollection.Add(result2);
}).ToArray();

Task.WaitAll(tasks);

这样您将获得最大并发,这可能并不理想,因为您的机器的 CPU 或 RAM 或 CPU RAM 带宽可能会饱和。您可以查看 here 以了解限制并发异步操作数量的方法。

【讨论】:

  • @theoretisch 简而言之,在程序中的任何位置定位所有.Wait().Result 调用,并尽可能多地消除它们(理想情况下是全部)。在大多数情况下,这些对程序的响应性或可扩展性是有害的。
  • 刚刚实现了它,它工作得很好。即使同时有 200 多个游戏。太感谢了。所有的链接也很有帮助!
  • @theoretisch 快速而肮脏的解决方法是通过在程序开始时调用ThreadPool.SetMinThreads(1000, 1000) 来增加ThreadPool 立即按需创建的线程数。尽管这可能会解决您的问题,但这会非常浪费,因为每个线程默认分配1 MB 的 RAM,仅用于其堆栈。因此,如果创建了约 400 个线程,您将浪费约 400MB 的 RAM,专门用于大部分时间都在休眠的线程。 :-)
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多