【发布时间】:2021-08-21 20:14:05
【问题描述】:
我不确定这个标题是否有意义,这是我能想到的最好的,所以这是我的场景。
我有一个 ASP.NET Core 应用程序,我更多地将其用作外壳并用于 DI 配置。在Startup 中,它添加了一堆IHostedServices 作为单例,以及它们的依赖项,也作为单例,除了SqlConnection 和DbContext 的小例外,我们稍后会介绍。托管服务是一组类似的服务:
- 侦听来自 GPS 设备的传入报告并将其放入侦听缓冲区。
- 从侦听缓冲区中解析项目并放入已解析的缓冲区中。
最终会有一个服务读取已解析的缓冲区并实际处理已解析的报告。它通过将它从缓冲区中取出的报告传递给处理程序并等待它完成以移动到下一个来完成此操作。这在过去一年中运行良好,但似乎我们现在遇到了可扩展性问题,因为它一次处理一个报告,服务器上的平均处理时间为 62 毫秒,其中包括 Dapper 访问数据库以获取所需的数据以及保存更改的 EF Core 之旅。
但是,如果处理程序决定报告的信息需要触发后台作业,那么我怀疑它需要 100 毫秒或更长时间才能完成。随着时间的推移,缓冲区填满的速度超过了处理程序可以处理的速度,直到可以处理数千个报告为止,即使不是 100 也可以是 10 秒。这是一个问题,因为通知会延迟,并且如果在服务器在午夜重新启动时缓冲区仍然已满,则可能会丢失数据。
话虽如此,我正试图弄清楚如何使处理并行化。经过昨天的大量实验,我决定在缓冲区上使用Parallel.ForEach,使用GetConsumingEnumerable()。这很好用,除了我不知道该怎么做甚至打电话的奇怪行为。随着缓冲区被填满并且ForEach 正在对其进行迭代,它将开始将处理“分块”为不断增加的二的倍数。分块的大小受MaxDegreeOfParallelism 设置的影响。例如(N# = 缓冲区中的下一个报告数):
MDP = 1
- N3 = 1 次
- N6 = 2 一次
- N12 = 4 次
- ...
MDP = 2
- N6 = 1 次
- N12 = 2 次li>
- N24 = 4 次
- ...
MDP = 4
- N12 = 1 次
- N24 = 2 次li>
- N48 = 4 次
- ...
MDP = 8(我的 CPU 核心数)
- N24 = 1 次
- N48 = 2 次li>
- N96 = 一次 4 个
- ...
这可以说比我现在的串行执行更糟糕,因为到一天结束时,它会在实际处理它们之前缓冲并等待,例如,50 万个报告。
有没有办法解决这个问题?我对Parallel.ForEach 不是很有经验,所以从我的角度来看,这是一种奇怪的行为。最终,我正在寻找一种方法来并行处理报告,只要它们在缓冲区中,所以如果有其他方法可以实现这一点,我会全力以赴。这大致就是我的代码。处理报告的处理程序确实使用IServiceProvider 创建范围并获取SqlConnection 和DbContext 的实例。提前感谢您的任何建议!
public sealed class GpsReportService :
IHostedService {
private readonly GpsReportBuffer _buffer;
private readonly Config _config;
private readonly GpsReportHandler _handler;
private readonly ILogger _logger;
public GpsReportService(
GpsReportBuffer buffer,
Config config,
GpsReportHandler handler,
ILogger<GpsReportService> logger) {
_buffer = buffer;
_config = config;
_handler = handler;
_logger = logger;
}
public Task StartAsync(
CancellationToken cancellationToken) {
_logger.LogInformation("GPS Report Service => starting");
Task.Run(Process, cancellationToken).ConfigureAwait(false);// Is ConfigureAwait here correct usage?
_logger.LogInformation("GPS Report Service => started");
return Task.CompletedTask;
}
public Task StopAsync(
CancellationToken cancellationToken) {
_logger.LogInformation("GPS Parsing Service => stopping");
_buffer.CompleteAdding();
_logger.LogInformation("GPS Parsing Service => stopped");
return Task.CompletedTask;
}
// ========================================================================
// Utilities
// ========================================================================
private void Process() {
var options = new ParallelOptions {
MaxDegreeOfParallelism = 8,
CancellationToken = CancellationToken.None
};
Parallel.ForEach(_buffer.GetConsumingEnumerable(), options, async report => {
try {
await _handler.ProcessAsync(report).ConfigureAwait(false);
} catch (Exception e) {
if (_config.IsDevelopment) {
throw;
}
_logger.LogError(e, "GPS Report Service");
}
});
}
private async Task ProcessAsync() {
while (!_buffer.IsCompleted) {
try {
var took = _buffer.TryTake(out var report, 10);
if (!took) {
continue;
}
await _handler.ProcessAsync(report!).ConfigureAwait(false);
} catch (Exception e) {
if (_config.IsDevelopment) {
throw;
}
_logger.LogError(e, "GPS Report Service");
}
}
}
}
public sealed class GpsReportBuffer :
BlockingCollection<GpsReport> {
}
【问题讨论】:
-
您似乎混淆了任务和线程,这绝不是一个好主意。
-
Parallel.ForEach用于数据并行,而不是异步操作。它旨在通过对内存数据进行分区并在每个 CPU 内核上使用大约一个工作任务来处理大块内存数据,以减少内核之间的同步。它甚至会使用当前线程,这会导致错误地假设它正在阻塞。当所有核心都忙时,为什么不使用线程?无论如何它都无法运行。它不是用于处理消息流、异步操作或启动其他线程。使用 Task.Run 会适得其反 - Parallel.ForEach 无论如何都会使用所有内核 -
如果您使用正确的类 - 数据流块或通道而不是 BlockingCollection 和 Parallel.Foreach,您可以极大地简化问题
-
您如何处理 GPS 数据? Reactive Extensions 允许对事件流进行处理、聚合和分析。您可以使用它来计算每个“事物”在特定时间窗口中的行进距离,使用类似 LINQ 的操作,例如
events.Buffer(TimeSpan.FromMinutes(1)).Select(list=>list.Average(gps=>gps.Speed));
标签: c# asp.net-core task-parallel-library parallel.foreach asp.net-core-5.0