【问题标题】:Prevent task from starting when added to a concurrent queue添加到并发队列时阻止任务启动
【发布时间】:2026-01-17 04:50:01
【问题描述】:

我正在尝试将我的任务添加到自定义并发队列,但它一直在启动。我怎样才能将任务对象添加到队列而不启动它,以便稍后在代码中启动它?基本上它应该为每个片段做的是获取请求流,将其流式传输到一个文件,同时开始下一个片段。

我的自定义并发队列:

public sealed class EventfulConcurrentQueue<T> : ConcurrentQueue<T>
{
    public ConcurrentQueue<T> Queue;

    public EventfulConcurrentQueue()
    {
        Queue = new ConcurrentQueue<T>();
    }

    public void Enqueue(T item)
    {
        Queue.Enqueue(item);
        OnItemEnqueued();
    }

    public int Count => Queue.Count;


    public bool TryDequeue(out T result)
    {
        var success = Queue.TryDequeue(out result);

        if (success)
        {
            OnItemDequeued(result);
        }
        return success;
    }

    public event EventHandler ItemEnqueued;
    public event EventHandler<ItemDequeuedEventArgs<T>> ItemDequeued;

    void OnItemEnqueued()
    {
        ItemEnqueued?.Invoke(this, EventArgs.Empty);
    }

    void OnItemDequeued(T item)
    {
        ItemDequeued?.Invoke(this, new ItemDequeuedEventArgs<T> { Item = item });
    }
}

public sealed class ItemDequeuedEventArgs<T> : EventArgs
{
    public T Item { get; set; }
}

我用来将任务添加到队列的代码:

Parallel.ForEach(pieces, piece =>
{
    //Open a http request with the range
    var request = new HttpRequestMessage { RequestUri = new Uri(url) };
    request.Headers.Range = new RangeHeaderValue(piece.start, piece.end);

    //Send the request
    var downloadTask = client.SendAsync(request).Result;

    //Use interlocked to increment Tasks done by one
    Interlocked.Add(ref OctaneEngine.TasksDone, 1);

    //Add the task to the queue along with the start and end value
    asyncTasks.Enqueue(new Tuple<Task, FileChunk>(
        downloadTask.Content.ReadAsStreamAsync().ContinueWith(
        task =>
        {
            using (var fs = new FileStream(piece._tempfilename,
                FileMode.OpenOrCreate, FileAccess.Write))
            {
                task.Result.CopyTo(fs);
            }
        }), piece));
});

我用于稍后启动任务的代码:

Parallel.ForEach(asyncTasks.Queue, async (task, state) =>
{
    if (asyncTasks.Count > 0)
    {
        await Task.Run(() => task);
        asyncTasks.TryDequeue(out task);
        Interlocked.Add(ref TasksDone, 1);
    }
});

我不确定发生了什么,因此我们将不胜感激任何帮助!谢谢!

【问题讨论】:

  • 您的代码存在一个重大缺陷,任何试图回答问题的尝试都是徒劳的恕我直言。 Parallel.ForEachis not async-friendly。传递的 lambda 是 async void。不仅所有任务都会立即开始,而且Parallel.ForEach 也会在任务完成之前立即完成。
  • 您真正想要解决的问题是什么?在多个线程上下载数据,但在单个线程上将数据写入磁盘?网络和磁盘都是 IO 系统,所以只要你使用“真正的”异步方法来做 IO,多线程通常没有什么好处。
  • @JonasH 我基本上是在尝试将文件的不同部分同时流式传输到临时文件。当他们完成后,我试图将它们加入主文件。这是my project,以防你需要看看我在做什么
  • @GregJ 您是否测试过这个概念并观察到任何显着的性能提升?这对我来说听起来像是一种去优化。系统中最慢的部分可能是网络,其次最慢的通常是磁盘。这种方法会使磁盘访问量增加三倍,这通常不会提高性能。

标签: c# .net asynchronous async-await task


【解决方案1】:

在我看来,您在这里过于复杂了。

我不确定您是否需要任何队列。

这种方法允许网络访问工作同时发生:

using var semaphore = new SemaphoreSlim(1);
var tasks = pieces.Select(piece =>
{
    var request = new HttpRequestMessage { RequestUri = new Uri(url) };
    request.Headers.Range = new RangeHeaderValue(piece.start, piece.end);

    //Send the request
    var download = await client.SendAsync(request);

    //Use interlocked to increment Tasks done by one
    Interlocked.Increment(ref OctaneEngine.TasksDone);

    var stream = await download.Content.ReadAsStreamAsync();
    
    using (var fs = new FileStream(piece._tempfilename, FileMode.OpenOrCreate,
        FileAccess.Write))
    {
        await semaphore.WaitAsync(); // Only allow one concurrent file write
        await stream.CopyToAsync(fs);
    }

    semaphore.Release();

    Interlocked.Increment(ref TasksDone);
});

await Task.WhenAll(tasks);

由于您的工作涉及 I/O,因此尝试通过 Parallel.ForEach 使用多个线程几乎没有什么好处;等待async 方法将释放线程来处理其他请求。

【讨论】:

  • 这样有效吗?在我看来,尝试同时写入文件系统的 10 个(或 100 个)任务将比按顺序执行相同任务要慢。
  • 我正在使用 enqueue 和 dequeue 事件进行一种回调,以获取事件状态的进度。 My project 在这里,以防你需要看看我是如何做到的。我真的不知道如何跟踪事件的进展。
  • @TheodorZoulias 是的,取决于存储,它可能会降低性能。您可以使用信号量来允许 HTTP 请求同时发生,但序列化文件写入。我已经更新了我的答案。