【问题标题】:How can I check whether this interval is even running?如何检查此间隔是否正在运行?
【发布时间】:2016-11-24 08:36:50
【问题描述】:

我设置了一堆Console.WriteLines,据我所知,当我在 .NET Fiddle 中运行以下命令时,它们都没有被调用。

using System;
using System.Net;
using System.Linq.Expressions;
using System.Linq;  
using System.Threading;
using System.Threading.Tasks;
using System.Timers;
using System.Collections.Generic;

public class Program
{
    private static readonly object locker = new object();
    private static readonly string pageFormat = "http://www.letsrun.com/forum/forum.php?board=1&page={0}";

    public static void Main()
    {
        var client = new WebClient();

        // Queue up the requests we are going to make
        var tasks = new Queue<Task<string>>(
            Enumerable
            .Repeat(0,50)
            .Select(i => new Task<string>(() => client.DownloadString(string.Format(pageFormat,i))))
        );

        // Create set of 5 tasks which will be the at most 5
        // requests we wait on
        var runningTasks = new HashSet<Task<string>>();
        for(int i = 0; i < 5; ++i)
        {
            runningTasks.Add(tasks.Dequeue());
        }

        var timer = new System.Timers.Timer
        {
            AutoReset = true,
            Interval = 2000 
        };

        // On each tick, go through the tasks that are supposed
        // to have started running and if they have completed
        // without error then store their result and run the
        // next queued task if there is one. When we run out of 
        // any more tasks to run or wait for, stop the ticks.
        timer.Elapsed += delegate
        {
            lock(locker)
            {
                foreach(var task in runningTasks)
                {
                    if(task.IsCompleted)
                    {
                        if(!task.IsFaulted)
                        {
                            Console.WriteLine("Got a document: {0}", 
                                task.Result.Substring(Math.Min(30, task.Result.Length)));

                            runningTasks.Remove(task);

                            if(tasks.Any())
                            {
                                runningTasks.Add(tasks.Dequeue());
                            }
                        }
                        else
                        {
                            Console.WriteLine("Uh-oh, task faulted, apparently");
                        }
                    }
                    else if(!task.Status.Equals(TaskStatus.Running)) // task not started
                    {
                        Console.WriteLine("About to start a task.");
                        task.Start();
                    }
                    else
                    {
                        Console.WriteLine("Apparently a task is running.");
                    }
                }   

                if(!runningTasks.Any())
                {
                    timer.Stop();
                }
            }

        };
    }
}

我也很感激有关如何简化或修复其中的任何错误逻辑的建议。我想要做的模式就像

(1) 创建一个包含 N 个任务的队列

(2)创建一组M个任务,前M个出列项来自(1)

(3)启动M个任务运行

(4) X 秒后,检查已完成的任务。

(5) 对于任何已完成的任务,对结果执行某些操作,从集合中移除该任务并用队列中的另一个任务替换它(如果队列中还有其他任务)。

(6) 无限期地重复 (4)-(5)。

(7) 如果集合中没有剩余任务,我们就完成了。

但也许有更好的方法来实现它,或者也许有一些 .NET 函数可以轻松封装我正在尝试做的事情(以指定的最大并行度并行的 Web 请求)。

【问题讨论】:

  • 您选择这种方法而不是仅仅将任务推入线程池有什么原因吗?不是说你这样做是错的,我只是好奇。
  • 带有特定 DOP 的简单 ActionBlock 可以解决问题

标签: c# .net multithreading asynchronous parallel-processing


【解决方案1】:

您的代码中有几个问题,但由于您正在寻找更好的方法来实现它 - 您可以使用 Parallel.ForParallel.ForEach

Parallel.For(0, 50, new ParallelOptions() { MaxDegreeOfParallelism = 5 }, (i) =>
{
     // surround with try-catch
     string result;
     using (var client = new WebClient()) {
          result = client.DownloadString(string.Format(pageFormat, i));
     }
     // do something with result
     Console.WriteLine("Got a document: {0}", result.Substring(Math.Min(30, result.Length)));
});

它将并行执行主体(在任何给定时间不超过 5 个任务)。当一项任务完成时 - 下一项开始,直到它们全部完成,就像你想要的那样。

更新。使用这种方法有几个等待来限制任务,但最直接的就是休眠:

Parallel.For(0, 50, new ParallelOptions() { MaxDegreeOfParallelism = 5 },  
(i) =>
{
    // surround with try-catch
    var watch = Stopwatch.StartNew();
    string result;
    using (var client = new WebClient()) {
         result = client.DownloadString(string.Format(pageFormat, i));
    }
    // do something with result
    Console.WriteLine("Got a document: {0}", result.Substring(Math.Min(30, result.Length)));
    watch.Stop();
    var sleep = 2000 - watch.ElapsedMilliseconds;
    if (sleep > 0)
          Thread.Sleep((int)sleep);
});

【讨论】:

  • 我不想这样做的一个原因是它有可能使网站充满请求。如果我很快收到前 5 个响应,我还想在发出下一批请求之前等待一两秒。
  • 你知道站点究竟是如何限制并发请求的吗?我的意思是,类似于“在 Y 秒内不超过 X 个请求”。
  • 不,我不知道。但是假设我做了:那么 X 和 Y 的过程是什么?
  • 你真的不应该做new WebClient().DownloadString(,因为WebClientIDisposable
  • @Enigmativity 虽然一般来说是正确的,但 WebClient 的 Dispose 没有任何用处(它是 IDisposable 只是因为它继承自 IComponent),因此不释放它没有任何害处(不会留下任何打开的流,待处理的 web请求或类似的东西)。
【解决方案2】:

这不是您问题的直接答案。我只是想提出一种替代方法。

我建议您考虑使用 Microsoft 的响应式框架 (NuGet "System.Reactive") 来执行此类操作。

然后你可以这样做:

var query =
    Observable
        .Range(0, 50)
        .Select(i => string.Format(pageFormat, i))
        .Select(u => Observable.Using(
            () => new WebClient(),
            wc => Observable.Start(() => new { url = u, content = wc.DownloadString(u) })))
        .Merge(5);

IDisposable subscription = query.Subscribe(x =>
{
    Console.WriteLine(x.url);
    Console.WriteLine(x.content);
});

都是异步的,可以随时调用subscription.Dispose()停止进程;

【讨论】:

  • 在这种情况下添加如何限制请求不会有什么坏处(例如,作者需要在每批 5 个请求之间至少经过两秒)。
  • @Evk Rx 使用 Rx 控制要容易得多,它已经有操作员来控制或缓冲事件
  • @PanagiotisKanavos 是的,但是因为它是一个很好的答案,可以描述在这种特殊情况下如何准确地做到这一点。
猜你喜欢
  • 2022-12-03
  • 2017-10-14
  • 2016-10-21
  • 1970-01-01
  • 2016-02-07
  • 2021-10-27
  • 2017-08-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多