【问题标题】:How to limit number of parallel tasks running? [closed]如何限制运行的并行任务数量? [关闭]
【发布时间】:2021-07-09 19:18:37
【问题描述】:

我的代码可以侦听文件夹并检测放入其中的新文件。 目前,每个删除的新文件都会调用WatcherOnCreated 方法,该方法会创建一个新任务来与其他文件并行加密文件。

我想限制并行文件加密的数量。

我尝试使用计数为 5 的信号量,这样无论放入多少文件,都只能同时进行 5 次加密。

但是,它不起作用,导致我的程序没有响应。

class Test
{
        private static LimitEncryptionSemaphore;

        public Test()
        {
            LimitEncryptionSemaphore = new SemaphoreSlim(5);
        }
        
        // some function which calls WatcherOnCreated
         
        private void WatcherOnCreated(string filePath, WatcherChangeTypes changeType)
        {
            string fileName = Path.GetFileName(filePath);

            Logger.Debug($"A created item {fileName} was detected in drop folder.");

            LimitEncryptionSemaphore.WaitAsync();
            // FireAndForget calls Task.Run(Func<Task>) with the callback function
            TaskFactory.FireAndForget(async () =>
            {
                try
                {
                    await ExponentialBackoffPolicy.ExecuteAsync(async () =>
                    {
                        using (DeviceData deviceData = ReadDeviceData(filePath))
                        {
                            // Raise the event.
                            await OnDeviceDataAvailable(FolderDevice, deviceData);
                        }
                    });

                    if (FileSystem.Exists(filePath))
                    {
                        // Delete the file once the handler is done.
                        FileSystem.DeleteFile(filePath);

                        Logger.Debug($"{filePath} was deleted.");
                    }
                }
                catch (Exception ex)
                {
                    Logger.Error(ex);
                }
                finally
                {
                    // Release semaphore
                    LimitEncryptionSemaphore.Release();
                }
            });

        }

【问题讨论】:

  • 我认为您的问题会更多地与codereview.stackexchange.com 的主题有关,您应该尝试在那里发帖。
  • 您的代码限制为 10 而不是 5。
  • 请注意,虽然您已经编辑了您的问题以尽量减少基于意见的问题(这就是前两个接近投票的原因),但它仍然缺少可靠地重现您的问题的 minimal reproducible example描述,因此是第三次也是最后一次近距离投票。

标签: c# .net multithreading task-parallel-library semaphore


【解决方案1】:

代码有几个问题:

  1. LimitEncryptionSemaphore.WaitAsync 返回的任务被忽略,而不是awaited。
  2. LimitEncryptionSemaphore.WaitAsync 的调用方法与 LimitEncryptionSemaphore.Release 不同。

修复:

private void WatcherOnCreated(string filePath, WatcherChangeTypes changeType)
{
  string fileName = Path.GetFileName(filePath);
  Logger.Debug($"A created item {fileName} was detected in drop folder.");

  TaskFactory.FireAndForget(async () =>
  {
    await LimitEncryptionSemaphore.WaitAsync();
    try
    {
      ...
    }
    catch (Exception ex)
    {
      ...
    }
    finally
    {
      LimitEncryptionSemaphore.Release();
    }
  });
}

【讨论】:

  • 谢谢!我在 try 块的更深层方法(在 OnDeviceDataAvailable 内)中结束了使用 WaitAsync 和 Release。我还需要处理信号量吗?
  • @FryingPan:如果您需要重新创建它所在的组件,那么可以。如果整个应用程序正在关闭,则无需这样做。
  • WaitAsyncRelease 在应用程序运行期间确实被多次调用的函数中(每次上传新文件时,都会调用 Task.Run 来加密文件)。我认为这就是“重新创建组件”的意思。因此,在这种情况下,我必须在发布后处理每个线程中的信号量,对吗?
  • @FryingPan: 不。SemaphoreSlim 需要共享,并且只有在使用它完成所有操作后才能处理。
【解决方案2】:

我认为,由于您无法控制丢弃的文件数量,因此最好使用并发队列和循环处理排队的条目。

在文件观察器事件队列中,队列中的项目并启动一个后台工作者来处理数据。如果队列太长,您可以启动一个新的后台进程,但是如果这样做,您可能会饿死服务器处理器。

您还可以使用 Parallel.For 进行循环,并带有一个参数,说明如果队列比您喜欢的长,在您的情况 5 中,您希望开始的最大值。

【讨论】:

    【解决方案3】:

    TaskScheduler 的 .Net 文档有一个实现具有最大并行度的调度程序的示例。请看一下 https://docs.microsoft.com/en-us/dotnet/api/system.threading.tasks.taskscheduler?view=net-5.0

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2022-10-15
      • 2016-04-22
      • 2015-08-23
      相关资源
      最近更新 更多