【问题标题】:Add and remove Parallel.Invoke actions during runtime在运行时添加和删除 Parallel.Invoke 操作
【发布时间】:2021-10-23 00:39:57
【问题描述】:

我正在使用Parallel.Invoke 同时运行某些方法,并在所有方法完成后收集结果。

问题

正如您在“可怕的代码”部分看到的,动作列表被硬编码为三个元素,如果detectedDevicesList.Count != 3 将完全无用。

尝试过的解决方案

我尝试动态创建一个Actions[] 数组并将其作为Parallel.Invoke 的参数传递,但我无法将现有方法转换为Tasks,然后再转换为Actions。

非工作代码

public async Task<Task<String>> callWithEveryConnectedDevice(ListBox listBoxLog, Boolean sendAlarms)
{        
    String TEST_CALLS_COMPLETED = "All test calls completed.";
    String TEST_CALLS_FAILED = "One or more test cals failed";

    return Task.Run(async () =>
    {
        List<MobileEquipment> detectedDevicesList = await GetConnectedDevices.getAsync();

        if (detectedDevicesList.Count == 0)
        {
            UpdateGui.listboxAddItem(listBoxLog, "No devices are connected.", true);
            return TEST_CALLS_FAILED;
        }

        Console.WriteLine("Executing test calls...");

        List<Task<MobileEquipment>> results = new List<Task<MobileEquipment>>();

        //Horrible code begins...
        Parallel.Invoke(() =>
        {
            results.Add(new MakePhoneCall().call(detectedDevicesList[0], listBoxLog));
        },
        () =>
        {
            results.Add(new MakePhoneCall().call(detectedDevicesList[1], listBoxLog));
        },
        () =>
        {
            results.Add(new MakePhoneCall().call(detectedDevicesList[2], listBoxLog));
        });

        //Horrible code ends...
        foreach (Task<MobileEquipment> mobileEquipment in results)
        {
            UpdateGui.listboxAddItem(listBoxLog, "Test call result for " + mobileEquipment.Result.serial + " " + mobileEquipment.Result.operador + ": " + mobileEquipment.Result.callSuccess, true);

            if (!mobileEquipment.Result.callSuccess && sendAlarms)
            {                      
                await SendEmail.sendAlarmEmailsAsync(libreta, asunto, mensaje);
            }
        }
                      

        UpdateGui.listboxAddItem(listBoxLog, TEST_CALLS_COMPLETED, true);

        return TEST_CALLS_COMPLETED;
    });
}

编辑:给读者的有用信息和学到的经验

根据收到的出色答案和 cmets,我添加了一些原本缺失的代码,这些代码可以帮助您从并行任务中安全地与 Windows 窗体对象进行交互。

public static void ListboxAddItem(ListBox listBox, String argText, Boolean useTimestamp)
    {
        String timeStamp = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss");

        if (useTimestamp)
        {
            argText = timeStamp + ": " + argText;
        }

        if (Thread.CurrentThread.IsBackground)
        {
            listBox.Invoke(new Action(() =>
            {
                listBox.Items.Add(argText);
                listBox.SelectedIndex = listBox.Items.Count - 1;
            }));
        }
        else
        {
            listBox.Items.Add(argText);
            listBox.SelectedIndex = listBox.Items.Count - 1;
        }
    }

此外,不要盲目遵循 IntelliSense 建议,以防止像 Task 这样的恶作剧,或在 C# 上使用类似 Java 的大小写。

很难选择最佳建议的答案,因为它们都可以正常工作并且没有任何明显的性能差异(MakePhoneCall().call 通过 ADB 连接的 Android 设备自动拨打电话)。检查哪个答案最适合您的特定应用。

【问题讨论】:

  • 也许使用Parallel.For?但要小心并行结果。添加。我怀疑 List 可以处理并行添加。为了安全起见,请使用并发 docs.microsoft.com/de-de/dotnet/api/…
  • 每当您必须编写这样的返回类型时:Task&lt;Task&lt;String&gt;&gt; 这是一个好兆头,表明您需要修改您的解决方案。
  • MakePhoneCall().call 方法是否受 I/O 或 CPU 限制? Parallel.XYZ 专为 CPU 绑定操作而设计。
  • 驼峰式方法名称在 C# 中不是标准的。帕斯卡大小写是常态。
  • MakePhoneCall.call() 是做什么的?如果它使任何类型的 IO Parallel.Invokewrong 方法。事实上,即使是 CPU 密集型代码,也有更好的选择,例如使用 ActionBlock 或 Channel

标签: c# parallel-processing task-parallel-library parallel.invoke


【解决方案1】:

你应该使用微软的响应式框架(又名 Rx)——NuGet System.Reactive 并添加 using System.Reactive.Linq;——然后你所有丑陋的代码变成这样:

IObservable<MobileEquipment> query =
    from detectedDevicesList in Observable.FromAsync(() => GetConnectedDevices.getAsync())
    from detectedDevice in detectedDevicesList.ToObservable()
    from mobileEquipment in Observable.FromAsync(() => new MakePhoneCall().call(detectedDevice, listBoxLog))
    select mobileEquipment;

完整方法现在正确返回Task&lt;String&gt;,而不是Task&lt;Task&lt;String&gt;&gt;

这里是:

public async Task<String> callWithEveryConnectedDevice(ListBox listBoxLog, Boolean sendAlarms)
{
    String TEST_CALLS_COMPLETED = "All test calls completed.";
    String TEST_CALLS_FAILED = "One or more test cals failed";

    IObservable<MobileEquipment> query =
        from detectedDevicesList in Observable.FromAsync(() => GetConnectedDevices.getAsync())
        from detectedDevice in detectedDevicesList.ToObservable()
        from mobileEquipment in Observable.FromAsync(() => new MakePhoneCall().call(detectedDevice, listBoxLog))
        select mobileEquipment;
        
    IList<MobileEquipment> results = await query.ToList();

    if (results.Count == 0)
    {
        UpdateGui.listboxAddItem(listBoxLog, "No devices are connected.", true);
        return TEST_CALLS_FAILED;
    }

    foreach (MobileEquipment mobileEquipment in results)
    {
        UpdateGui.listboxAddItem(listBoxLog, "Test call result for " + mobileEquipment.serial + " " + mobileEquipment.operador + ": " + mobileEquipment.callSuccess, true);

        if (!mobileEquipment.callSuccess && sendAlarms)
        {
            await SendEmail.sendAlarmEmailsAsync(libreta, asunto, mensaje);
        }
    }

    UpdateGui.listboxAddItem(listBoxLog, TEST_CALLS_COMPLETED, true);

    return TEST_CALLS_COMPLETED;
}

【讨论】:

  • 反应式框架看起来很棒的谜,非常感谢。我只是想知道如果检测到的设备列表返回的大小为 0,我怎么能阻止查询执行。我的平庸方法是添加 if(GetConnectedDevices.Conunt>0) 但我很确定 Reactive 有一个更体面的方法。
  • @FelipeLaRotta - 如果detectedDevicesList 为空,则查询立即停止。没必要检查。这就是我从代码中删除它的原因。
【解决方案2】:

Parallel.ForParallel.Foreachconcurrent collection。应该更合适:

ConcurrentStack<Task<MobileEquipment>> results = new ();
Parallel.Foreach(detectedDevicesList, d => results.Add(new MakePhoneCall().call(d, listBoxLog));

另一种选择是parallel Linq

var result = detectedDevicesList.AsParallel(
    d => results.Add(new MakePhoneCall().call(d, listBoxLog).ToList();

但是,Call 似乎返回了一个任务,所以您确定这是一个缓慢的阻塞调用吗?如果不是,最好使用常规循环来启动调用,并使用Task.WaitAll 来(a)等待它们完成。看起来您当前的解决方案可能会阻止 mobileEquipment.Result

还要注意listBoxLog 看起来像一个 UI 对象,并且不允许从工作线程访问 UI 对象。如果方法是“纯”且对象是不可变的,则使用后台线程进行处理会容易得多。 IE。避免可能不是线程安全的副作用。作为一般规则,我建议避免使用多线程编程,除非,a) 有充分的理由期待一些改进,b) 你很清楚 thread safety 的危险。

您也可以考虑使用Dataflow 设置一个管道,以并行和异步方式执行处理的每个步骤。

【讨论】:

  • DataFlow 库看起来很棒。它可能需要重写代码,但对于未来的项目来说是值得的,
【解决方案3】:

Parallel.Invoke 不是在这种情况下使用的正确工具,因为您的工作负载是异步的,而 Parallel.Invoke 不是异步友好的。您的问题可以通过一次创建所有CallAsync 任务来解决,然后await 全部使用Task.WhenAll 方法完成。在await 之后,您将返回 UI 线程,您可以使用结果安全地更新 UI。

Select LINQ 运算符是用于将检测到的设备投射到任务的便捷工具。

public static async Task CallWithEveryConnectedDevice(ListBox listBoxLog, Boolean sendAlarms)
{
    List<MobileEquipment> detectedDevicesList = await GetConnectedDevices.GetAsync();

    Task<MobileEquipment>[] tasks = detectedDevicesList
        .Select(device => new MakePhoneCall().CallAsync(device))
        .ToArray();

    MobileEquipment[] results = await Task.WhenAll(tasks);

    foreach (var mobileEquipment in results)
    {
        UpdateGui.ListboxAddItem(listBoxLog,
            $"Test call result for {mobileEquipment.Serial} {mobileEquipment.Operador}: {mobileEquipment.CallSuccess}", true);
    }

    foreach (var mobileEquipment in results)
    {
        if (!mobileEquipment.CallSuccess && sendAlarms)
        {
            await SendEmail.SendAlarmEmailsAsync(libreta, asunto, mensaje);
        }
    }
}

【讨论】:

    猜你喜欢
    • 2012-02-24
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2022-01-08
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多