【问题标题】:Is there an await statement for threads?是否有线程的等待语句?
【发布时间】:2016-01-15 19:42:59
【问题描述】:

您好,我想知道是否有类似于 await 语句的东西,它用于任务,我可以在 c# 中使用线程来实现?

我想做的是:

启动线程 A,计算一些数据并将结果放入变量x。 在该变量 x 被转移到另一个线程 B 之后,同时 线程 A 再次开始计算一些数据,而线程 B 开始另一个计算,结果为 x

更新:好的,似乎有些混乱,所以我的描述会更准确:

我使用两个产生数据的传感器。需要以这样一种方式检索数据,即检索 SensorA 数据(这需要很长时间),然后必须立即在另一个线程中检索来自 SensorB 的数据,同时 SensorA 继续检索另一个数据块。问题是我无法将两个传感器的数据排列在同一个队列中,但我需要将两个传感器的数据存储在一个数据结构/对象中。

我的想法是这样的:

  1. 从线程 A 中的传感器 A 获取数据。
  2. 将结果交给线程 B 并重新启动线程 A。
  3. 当线程 A 再次运行时,线程 B 从传感器 B 获取数据并计算来自传感器 A 和 B 的数据

你可以假设线程 A 总是比线程 B 需要更长的时间

【问题讨论】:

  • 不清楚您希望await 做什么...毕竟,您不是在等待线程执行完成。也许您应该通过线程 A 知道的TaskCompletionSourcex 使用Task<T>?然后线程 B 可以等待该任务。不过,您可能想查看 Dataflow:msdn.microsoft.com/en-us/library/hh228603(v=vs.110).aspx
  • 听起来像经典的生产者/消费者。
  • 对我来说听起来像 TPL DataFlow :-)
  • @MatthewWatson 谁说该线程已被弃用?
  • @VMAtm 我故意夸大效果。我们不应该在没有充分理由的情况下使用线程。 Task 是一个更高级别的概念,它可能使用也可能不使用实际线程。 (换句话说,线程是一个实现细节。)

标签: c# multithreading async-await


【解决方案1】:

正如我在评论中所说。这看起来像经典的生产者/消费者,我们可以使用例如BlockingCollection

这是对该页面示例的轻微修改:

BlockingCollection<Data> dataItems = new BlockingCollection<Data>(100);

// "Thread B"
Task.Run(() => 
{
    while (!dataItems.IsCompleted)
    {
        Data dataA = null;
        try
        {
            dataA = dataItems.Take();
        }
        catch (InvalidOperationException) { }

        if (dataA != null)
        {
            var dataB = ReadSensorB();
            Process(dataA,dataB);
        }
    }
    Console.WriteLine("\r\nNo more items to take.");
});

// "Thread A"
Task.Run(() =>
{
    while (moreItemsToAdd)
    {
        Data dataA = ReadSensorA();
        dataItems.Add(dataA);
    }
    // Let consumer know we are done.
    dataItems.CompleteAdding();
});

然后moreItemsToAdd 就是您需要处理关闭此进程所需的任何代码。

【讨论】:

    【解决方案2】:

    我不确定您为什么要避免使用任务?也许您使用的是旧版本的 .net?如果是这样,Damien 建议的 BlockingCollection 也不是一种选择。如果您使用的是“普通”线程,则可以使用等待句柄在线程之间发出结果信号。例如,AutoResetEvent

    private int a;
    private AutoResetEvent newResult = new AutoResetEvent(false);
    
    private void ThreadA()
    {
        while (true)
        {
            a = GetSensorA();
            newResult.Set();
        }
    }
    
    private void ThreadB()
    {
        int b;
    
        while (true)
        {
            newResult.WaitOne();
            b = GetSensorB();         // or before "waitone"
            Console.WriteLine(a + b); // do something
        }
    }
    

    编辑:重置时有轻微错误,感谢您指出 Damien - 已更新

    【讨论】:

    • 自动ResetEvent - newResult.Reset();?此外,根据他们的叙述,他们希望传感器 B 的值与传感器 A 值可用的时间大致相同,而不是过去某个时间——这表明 GetSensorB() 调用应该在 WaitOne//do something.
    • 谢谢。更新为重置。我将把何时获取传感器 b 作为练习留给海报。顺便说一下,在您的解决方案上 +1,假设用户在 .net
    【解决方案3】:

    如果您可以使用 .Net 4.5 或更高版本,那么解决此问题的最佳方法是使用 TPL 的 DataFlow 组件。

    (您必须使用 NuGet 来安装 DataFlow;默认情况下它不是 CLR 的一部分。)

    这是一个示例可编译控制台应用程序,它演示了如何使用 DataFlow 来执行此操作:

    using System;
    using System.Threading;
    using System.Threading.Tasks;
    using System.Threading.Tasks.Dataflow;
    
    namespace SensorDemo
    {
        public sealed class SensorAData
        {
            public int Data;
        }
    
        public sealed class SensorBData
        {
            public double Data;
        }
    
        public sealed class SensorData
        {
            public SensorAData SensorAData;
            public SensorBData SensorBData;
    
            public override string ToString()
            {
                return $"SensorAData = {SensorAData.Data}, SensorBData = {SensorBData.Data}";
            }
        }
    
        class Program
        {
            static void Main()
            {
                var sensorADataSource = new TransformBlock<SensorAData, SensorData>(
                    sensorAData => addSensorBData(sensorAData), 
                    dataflowOptions());
    
                var combinedSensorProcessor = new ActionBlock<SensorData>(
                    data => process(data), 
                    dataflowOptions());
    
                sensorADataSource.LinkTo(combinedSensorProcessor, new DataflowLinkOptions { PropagateCompletion = true });
    
                // Create a cancellation source that will cancel after a few seconds.
                var cancellationSource = new CancellationTokenSource(delay:TimeSpan.FromSeconds(20));
    
                Task.Run(() => continuouslyReadFromSensorA(sensorADataSource, cancellationSource.Token));
    
                Console.WriteLine("Started reading from SensorA");
    
                sensorADataSource.Completion.Wait(); // Wait for reading from SensorA to complete.
                Console.WriteLine("Completed reading from SensorA.");
    
                combinedSensorProcessor.Completion.Wait();
                Console.WriteLine("Completed processing of combined sensor data.");   
            }
    
            static async Task continuouslyReadFromSensorA(TransformBlock<SensorAData, SensorData> queue, CancellationToken cancellation)
            {
                while (!cancellation.IsCancellationRequested)
                    await queue.SendAsync(readSensorAData());
    
                queue.Complete();
            }
    
            static SensorData addSensorBData(SensorAData sensorAData)
            {
                return new SensorData
                {
                    SensorAData = sensorAData,
                    SensorBData = readSensorBData()
                };
            }
    
            static SensorAData readSensorAData()
            {
                Console.WriteLine("Reading from Sensor A");
                Thread.Sleep(1000); // Simulate reading sensor A data taking some time.
                int value = Interlocked.Increment(ref sensorValue);
                Console.WriteLine("Read Sensor A value = " + value);
                return new SensorAData {Data = value}; 
            }
    
            static SensorBData readSensorBData()
            {
                Console.WriteLine("Reading from Sensor B");
                Thread.Sleep(100); // Simulate reading sensor B data being much quicker.
                int value = Interlocked.Increment(ref sensorValue);
                Console.WriteLine("Read Sensor B value = " + value);
                return new SensorBData {Data = value};
            }
    
            static void process(SensorData value)
            {
                Console.WriteLine("Processing sensor data: " + value);
                Thread.Sleep(1000); // Simulate slow processing of combined sensor values.
            }
    
            static ExecutionDataflowBlockOptions dataflowOptions()
            {
                return new ExecutionDataflowBlockOptions
                {
                    MaxDegreeOfParallelism = 1,
                    BoundedCapacity        = 1
                };
            }
    
            static int sensorValue;
        }
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2011-08-04
      • 2022-01-02
      • 1970-01-01
      • 1970-01-01
      • 2020-03-10
      相关资源
      最近更新 更多