【问题标题】:Task synchronization without a UI thread没有 UI 线程的任务同步
【发布时间】:2011-07-01 16:19:44
【问题描述】:

在下面的代码中,我想同步报告任务列表的结果。这现在有效,因为 task.Result 阻塞,直到任务完成。但是,任务 id = 3 需要很长时间才能完成,并阻止所有其他已完成的任务报告其状态。

我认为我可以通过将报告 (Console.Write) 移动到 .ContinueWith 指令中来做到这一点,但我没有 UI 线程,那么如何让 TaskScheduler 同步 .ContinueWith 任务?

我现在拥有的:

static void Main(string[] args)
{
    Console.WriteLine("Starting on {0}", Thread.CurrentThread.ManagedThreadId);

    var tasks = new List<Task<int>>();

    for (var i = 0; i < 10; i++)
    {
        var num = i;
        var t = Task<int>.Factory.StartNew(() =>
        {
           if (num == 3)
           {
               Thread.Sleep(20000);
           }
           Thread.Sleep(new Random(num).Next(1000, 5000));
           Console.WriteLine("Done {0} on {1}", num, Thread.CurrentThread.ManagedThreadId);
           return num;
        });
        tasks.Add(t);
    }

    foreach (var task in tasks)
    {
        Console.WriteLine("Completed {0} on {1}", task.Result, Thread.CurrentThread.ManagedThreadId);
    }

    Console.WriteLine("End of Main");
    Console.ReadKey();
}

我想转移到这个或类似的东西,但我需要 Console.Write("Completed...") 都发生在同一个线程上:

static void Main(string[] args)
{
    Console.WriteLine("Starting on {0}", Thread.CurrentThread.ManagedThreadId);

    for (var i = 0; i < 10; i++)
    {
        var num = i;
        Task<int>.Factory.StartNew(() =>
        {
           if (num == 3)
           {
               Thread.Sleep(20000);
           }
           Thread.Sleep(new Random(num).Next(1000, 10000));
           Console.WriteLine("Done {0} on {1}", num, Thread.CurrentThread.ManagedThreadId);
           return num;
       }).ContinueWith(value =>
       {
           Console.WriteLine("Completed {0} on {1}", value.Result, Thread.CurrentThread.ManagedThreadId);
       } 

     /* need syncronization context */);
    }

    Console.WriteLine("End of Main");
    Console.ReadKey();
}

-- 解决方案-- 在获得了一些 cmets 并阅读了一些解决方案之后,这就是我想要的完整解决方案。这里的目标是尽可能快地处理多个长时间运行的任务,然后一次处理每个任务的结果。

static void Main(string[] args)
{
    Console.WriteLine("Starting on {0}", Thread.CurrentThread.ManagedThreadId);

    var results = new BlockingCollection<int>();

    Task.Factory.StartNew(() =>
    {
        while (!results.IsCompleted)
        {
            try
            {
                var x = results.Take();
                Console.WriteLine("Completed {0} on {1}", x, Thread.CurrentThread.ManagedThreadId);
            }
            catch (InvalidOperationException)
            {
            }
        }
        Console.WriteLine("\r\nNo more items to take.");
    });

    var tasks = new List<Task>();

    for (var i = 0; i < 10; i++)
    {
        var num = i;
        var t = Task.Factory.StartNew(() =>
        {
            if (num == 3)
            {
                Thread.Sleep(20000);
            }
            Thread.Sleep(new Random(num).Next(1000, 10000));
            Console.WriteLine("Done {0} on {1}", num, Thread.CurrentThread.ManagedThreadId);
            results.Add(num);
        });

        tasks.Add(t);
    }

    Task.Factory.ContinueWhenAll(tasks.ToArray(), _ => results.CompleteAdding());

    Console.WriteLine("End of Main");
    Console.ReadKey();
}

【问题讨论】:

  • 我假设控制台线程是 GUI (WinForms/WPF) 的替身。这不是一个好主意,Dispatcher/Messageloop 的存在会产生(很大)差异。
  • 否则,想想你所说的“发生在同一个线程上”是什么意思。除非该线程正在轮询,否则不能这样做。
  • 我需要更改它,以便一次只运行一个 ContinueWith 任务。它们是否在同一个线程上运行并不重要,但我不能让它们中的两个并行运行。在现实生活中,我的 ContinueWith 部分是将大量数据写入数据库。
  • 所以你实际上有一个 n-Producer/1-Consumer 模式。
  • 这与在特定线程上写入控制台无关......我修改后的答案是合适的,可以扩展到“将大量数据写入数据库”。

标签: c# task task-parallel-library


【解决方案1】:

您必须创建某种类型的编写器任务,但是请记住,即使 任务也可以重新安排到另一个本机或托管线程上!使用 TPL 中的默认调度程序,您无法控制哪个托管线程接收工作。

public class ConcurrentConsole
{
    private static BlockingCollection<string> output
        = new BlockingCollection<string>();

    public static Task CreateWriterTask(CancellationToken token)
    {
        return new Task(
            () =>
            {
                while (!token.IsCancellationRequested)
                {
                    string nextLine = output.Take(token);
                    Console.WriteLine(nextLine);
                }
            },
            token);
    }

    public static void WriteLine(Func<string> writeLine)
    {
        output.Add(writeLine());
    }
}

当我将您的代码切换为使用它时,我收到以下输出:

End of Main
Done 1 on 6
Completed 1 on 6
Done 5 on 9
Completed 5 on 9
Done 0 on 4
Completed 0 on 4
Done 2 on 5
Completed 2 on 13
Done 7 on 10
Completed 7 on 10
Done 4 on 8
Completed 4 on 5
Done 9 on 12
Completed 9 on 9
Done 6 on 6
Completed 6 on 5
Done 8 on 11
Completed 8 on 4
Done 3 on 7
Completed 3 on 7

即使您的代码将() =&gt; String.Format("Completed {0} on {1}"... 发送到ConcurrentConsole.WriteLine,确保ManagedThreadId 将在ConcurrentConsole 任务上被拾取,它仍然会改变它运行在哪个线程上。虽然比执行任务的可变性要小。

【讨论】:

  • 这实际上比我的 foreach 任务循环更糟糕。这会等待所有任务完成,然后再报告其中任何一项的结果。我正在寻找的是一种在任务完成后立即报告任务完成并将该报告同步到一个线程的方法。我不希望一项长期运行的任务阻止其他任务进行报告。
  • @Ryan:我已将报告添加到特定任务中,这是您在 TPL 中可以保证的最佳状态。
  • 这并不是我实现系统的方式,但使用 BlockingCollection 的想法是实现的核心。
【解决方案2】:

您可以使用OrderedTaskScheduler 确保一次只运行一个任务完成;但是,它们将在线程池线程上运行(不一定都在同一个线程上)。

如果你真的需要它们都在同一个线程上(而不是一次一个),那么你可以使用Nito.Async library 中的ActionThread。它为其代码提供了SynchronizationContext,可以通过FromCurrentSynchronizationContext获取。

【讨论】:

  • 这是我要走的方向。我想将 ContinueWith 操作带到单个线程上,以确保一次只运行其中一个。不确定我是否可以在生产中使用 OrderedTaskScheduler。我想使用 FromCurrentSynchronizationContext 但我不能,因为我在服务环境中(也就是没有 UI 线程)并且 SyncronizationContext.Current 返回 null。
  • 我认为OrderedTaskSchedulerActionThread 都是生产质量。听起来他们中的任何一个都可以解决您的问题。
【解决方案3】:

我建议:

1) 创建锁对象
2) 创建要写入的字符串列表
3) 生成一个循环的线程,休眠一会儿,然后锁定字符串列表,如果它不为空,则写入所有字符串并清空列表
4)其他线程然后锁定列表,添加他们的状态,解锁并继续。

object writeListLocker = new object();
List<string> linesToWrite = new List<string>();

// Main thread loop
for (; ; )
{
    lock (writerListLocker)
    {
        foreach (string nextLine in linesToWrite)
            Console.WriteLine(nextLine);
        linesToWrite.Clear();
    }
    Thread.Sleep(500);
}

// Reporting threads
lock (writerListLocker)
{
    linesToWrite.Add("Completed (etc.)");
}

【讨论】:

  • 如果可以的话,我想继续使用 TPL。看起来上面的内容不允许在一系列线程上进行处理,而结果的写入在一个线程上同步。
  • 但这正是它的作用。主线程循环完成所有的写作。其他线程都添加到linesToWrite对象而不是写入。
【解决方案4】:

我认为您希望得到如下结果。

Starting on 8
Done 1 on 11
Completed 1 on 9
Done 5 on 11
Completed 5 on 9
Done 0 on 10
Completed 0 on 9
Done 2 on 12
Completed 2 on 9
Done 7 on 16
Completed 7 on 9
Done 4 on 14
Completed 4 on 9
Done 9 on 18
Completed 9 on 9
Done 6 on 15
Completed 6 on 9
Done 8 on 17
Completed 8 on 9
Done 3 on 13
Completed 3 on 9

如下所示,我在the Understanding SynchronizationContext 的代码中使用了StaSynchronizationContext,其中一个线程中的同步调用得到了很好的解释。请参考。

我的代码 sn-p 是:

static void Main(string[] args)
{
    StaSynchronizationContext context = new StaSynchronizationContext();
    StaSynchronizationContext.SetSynchronizationContext(context);
    Console.WriteLine("Starting on {0}", Thread.CurrentThread.ManagedThreadId);
    for (var i = 0; i < 10; i++)
    {
        var num = i;
        Task<int>.Factory.StartNew(() =>
        {
            if (num == 3)
            {
                Thread.Sleep(20000);
            }
            Thread.Sleep(new Random(num).Next(1000, 10000));
            Console.WriteLine("Done {0} on {1}", num, Thread.CurrentThread.ManagedThreadId);
            return num;
        }).ContinueWith(
        value =>
        {
            Console.WriteLine("Completed {0} on {1}", value.Result, Thread.CurrentThread.ManagedThreadId);
        }
       ,TaskScheduler.FromCurrentSynchronizationContext());
    }
    Console.WriteLine("End of Main");
    Console.ReadKey();
}

【讨论】:

    猜你喜欢
    • 2011-01-28
    • 2017-07-07
    • 2014-01-19
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-10-06
    • 1970-01-01
    相关资源
    最近更新 更多