【发布时间】:2026-01-17 04:50:01
【问题描述】:
我正在尝试将我的任务添加到自定义并发队列,但它一直在启动。我怎样才能将任务对象添加到队列而不启动它,以便稍后在代码中启动它?基本上它应该为每个片段做的是获取请求流,将其流式传输到一个文件,同时开始下一个片段。
我的自定义并发队列:
public sealed class EventfulConcurrentQueue<T> : ConcurrentQueue<T>
{
public ConcurrentQueue<T> Queue;
public EventfulConcurrentQueue()
{
Queue = new ConcurrentQueue<T>();
}
public void Enqueue(T item)
{
Queue.Enqueue(item);
OnItemEnqueued();
}
public int Count => Queue.Count;
public bool TryDequeue(out T result)
{
var success = Queue.TryDequeue(out result);
if (success)
{
OnItemDequeued(result);
}
return success;
}
public event EventHandler ItemEnqueued;
public event EventHandler<ItemDequeuedEventArgs<T>> ItemDequeued;
void OnItemEnqueued()
{
ItemEnqueued?.Invoke(this, EventArgs.Empty);
}
void OnItemDequeued(T item)
{
ItemDequeued?.Invoke(this, new ItemDequeuedEventArgs<T> { Item = item });
}
}
public sealed class ItemDequeuedEventArgs<T> : EventArgs
{
public T Item { get; set; }
}
我用来将任务添加到队列的代码:
Parallel.ForEach(pieces, piece =>
{
//Open a http request with the range
var request = new HttpRequestMessage { RequestUri = new Uri(url) };
request.Headers.Range = new RangeHeaderValue(piece.start, piece.end);
//Send the request
var downloadTask = client.SendAsync(request).Result;
//Use interlocked to increment Tasks done by one
Interlocked.Add(ref OctaneEngine.TasksDone, 1);
//Add the task to the queue along with the start and end value
asyncTasks.Enqueue(new Tuple<Task, FileChunk>(
downloadTask.Content.ReadAsStreamAsync().ContinueWith(
task =>
{
using (var fs = new FileStream(piece._tempfilename,
FileMode.OpenOrCreate, FileAccess.Write))
{
task.Result.CopyTo(fs);
}
}), piece));
});
我用于稍后启动任务的代码:
Parallel.ForEach(asyncTasks.Queue, async (task, state) =>
{
if (asyncTasks.Count > 0)
{
await Task.Run(() => task);
asyncTasks.TryDequeue(out task);
Interlocked.Add(ref TasksDone, 1);
}
});
我不确定发生了什么,因此我们将不胜感激任何帮助!谢谢!
【问题讨论】:
-
您的代码存在一个重大缺陷,任何试图回答问题的尝试都是徒劳的恕我直言。
Parallel.ForEachis not async-friendly。传递的 lambda 是 async void。不仅所有任务都会立即开始,而且Parallel.ForEach也会在任务完成之前立即完成。 -
您真正想要解决的问题是什么?在多个线程上下载数据,但在单个线程上将数据写入磁盘?网络和磁盘都是 IO 系统,所以只要你使用“真正的”异步方法来做 IO,多线程通常没有什么好处。
-
@JonasH 我基本上是在尝试将文件的不同部分同时流式传输到临时文件。当他们完成后,我试图将它们加入主文件。这是my project,以防你需要看看我在做什么
-
@GregJ 您是否测试过这个概念并观察到任何显着的性能提升?这对我来说听起来像是一种去优化。系统中最慢的部分可能是网络,其次最慢的通常是磁盘。这种方法会使磁盘访问量增加三倍,这通常不会提高性能。
标签: c# .net asynchronous async-await task