【发布时间】:2020-01-14 23:29:47
【问题描述】:
我有以下代码
string dataDirectory = _settingsProvider.DataSettings.BaseDirectory;
_solverManagementService.MergedPointCloudProducer(dataDirectory, cancellationToken)
.Subscribe(PointCloudMergerCompleted);
SolverManagementService _solverManagementService 在哪里
Public class SolverManagementService : ISolverManagementService
{
public IObservable<IPointCloud> MergedPointCloudProducer(string dataDirectory,
CancellationToken token)
{
return Observable.Create<IPointCloud>(
observer =>
{
PairCollectionProducer(dataDirectory, token)
.Subscribe(pairCollection =>
{
observer.OnNext(_icpBatchSolverService.RecursivelyMergeAsync(
pairCollection, token));
},
onCompleted: () =>
{
observer.OnCompleted();
});
return () => { };
});
}
... // Other methods.
}
但是这里_icpBatchSolverService.RecursivelyMergeAsync(pairCollection, token) 是昂贵的,虽然这会返回一个Task<IPointCloud>,但我不会线程化这个并且这个调用会阻塞。由于RecursivelyMergeAsync 返回一个Task<IPointCloud> 可以等待,所以我修改了代码以使用async/await
public IObservable<IPointCloud> MergedPointCloudProducer(string dataDirectory,
CancellationToken token)
{
return Observable.Create<IPointCloud>(
observer =>
{
PairCollectionProducer(dataDirectory, token)
.Subscribe(async (pairCollection) =>
{
observer.OnNext(await _icpBatchSolverService.RecursivelyMergeAsync(
pairCollection, token));
},
onCompleted: () =>
{
observer.OnCompleted();
});
return () => { };
});
}
但现在它立即返回并且控制台应用程序关闭。我确信这可以在不需要Semephores 的情况下完成,但我是 RX 新手。如何将 RecursivelyMergeAsync 配置为同时运行每个返回的 pairCollection 而不会阻塞并在所有递归合并完成时收到通知?
注意。在单元测试中,我执行以下操作
public class IcpBatchSolverServiceTests
{
private Mock<ISettingsProvider> _mockSettingsProvider;
private IIcpBatchSolverService _icpBatchSolverService;
[OneTimeSetUp]
public void Setup()
{
_mockSettingsProvider = new Mock<ISettingsProvider>();
_mockSettingsProvider.Setup(m => m.IcpSolverSettings).Returns(new IcpSolverSettings());
_mockSettingsProvider.Object.IcpSolverSettings.MaximumDegreeOfParallelism = 6;
Log.Logger = new LoggerConfiguration()
.WriteTo.Console()
.CreateLogger();
var serviceProvider = new ServiceCollection()
.AddLogging(builder =>
{
builder.SetMinimumLevel(LogLevel.Trace);
builder.AddSerilog(Log.Logger);
})
.BuildServiceProvider();
ILogger<IcpBatchSolverServiceTests> logger = serviceProvider
.GetService<ILoggerFactory>()
.CreateLogger<IcpBatchSolverServiceTests>();
_icpBatchSolverService = new IcpBatchSolverService(_mockSettingsProvider.Object, logger);
}
[Test]
public async Task CanSolveBatchAsync()
{
IPointCloud @static = PointCloudFactory.GetRandomPointCloud(1000);
List<IPointCloud> pointCloudList = PointCloudFactory.GenerateRandomlyRotatedBatch(@static, 12);
IPartitioningService<IPointCloud> ps = new PointCloudPartitioningService();
IPointCloud result = await _icpBatchSolverService.RecursivelyMergeAsync(ps.Partition(pointCloudList), CancellationToken.None);
Assert.AreEqual(@static.Vertices.Length, result.Vertices.Length);
}
}
而且这个过程完美地同时进行。
编辑。概述当为不同的几何图形(不同角度的不同几何图形的深度图)提供一个文件夹时我需要做的处理,命名约定为 .NNNN.exr,其中 NNNN 是某个数值。对于一批文件。
- 使用不同几何图形的文件名将这些文件批处理到集合中。
foreach 文件批处理
- [*Serial*] 调用 C++ API 从图像文件中提取深度图。
- [*Parallel*] 将
DepthMaps转换为PointClouds。这可以一次完成。 - [*Parallel*] 使用 ICP 算法合并点云(昂贵),但将
TaskScheduler的并发限制为两个线程(根据机器架构/内存等选择)
最后,我使用第 3 步中的合并点云再次调用 C++ API。所以在 RX 中,我当前的完整管道看起来像
public class SolverManagementService : ISolverManagementService
{
private readonly IIcpBatchSolverService _icpBatchSolverService;
private readonly IDepthMapToPointCloudAdapter _pointCloudAdapter;
private readonly ILogger<SolverManagementService> _logger;
public SolverManagementService(
IIcpBatchSolverService icpBatchSolverService,
IDepthMapToPointCloudAdapter pointCloudAdapter,
ILogger<SolverManagementService> logger)
{
_icpBatchSolverService = icpBatchSolverService ?? throw new ArgumentNullException("icpBatchSolverService cannot be null");
_pointCloudAdapter = pointCloudAdapter ?? throw new ArgumentNullException("pointCloudAdapter cannot be null");
_logger = logger;
}
public IObservable<IPointCloud> MergedPointCloudProducer(string dataDirectory, CancellationToken token)
{
return Observable.Create<IPointCloud>(
observer =>
{
PairCollectionProducer(dataDirectory, token)
.Subscribe(pairCollection =>
{
observer.OnNext(_icpBatchSolverService.RecursivelyMergeAsync(pairCollection, token).Result);
},
onCompleted: () =>
{
observer.OnCompleted();
});
return () => { };
});
}
public IObservable<PairCollection<IPointCloud>> PairCollectionProducer(string dataDirectory, CancellationToken token)
{
return Observable.Create<PairCollection<IPointCloud>>(
observer =>
{
Parallel.ForEach(
Utils.GetFileBatches(dataDirectory),
(fileBatch) =>
{
var producer = RawDepthMapProducer(fileBatch, token);
ConcurrentBag<IPointCloud> bag = new ConcurrentBag<IPointCloud>();
producer.Subscribe(rawDepthMap =>
{
bag.Add(_pointCloudAdapter.GetPointCloudFromDepthMap(rawDepthMap));
_logger?.LogDebug($"Thread {Thread.CurrentThread.ManagedThreadId}: {bag.Count:N0} PointCloud(s) added to concurrent bag");
},
onCompleted: () =>
{
PointCloudPartitioningService ps = new PointCloudPartitioningService();
observer.OnNext(ps.Partition(bag.ToList()));
_logger?.LogDebug($"Thread {Thread.CurrentThread.ManagedThreadId}: PointCloud PairCollection generated " +
$"for file set \"{Path.GetFileNameWithoutExtension(bag.FirstOrDefault().Source)}\"");
});
});
observer.OnCompleted();
return () => { };
});
}
public IObservable<RawDepthMap> RawDepthMapProducer(List<string> filePaths, CancellationToken token)
{
return Observable.Create<RawDepthMap>(
observer =>
{
int index = 0;
foreach(var filePath in filePaths)
{
token.ThrowIfCancellationRequested();
var extractor = DepthMapExtractorFactory.GetDepthMapExtractor(filePath);
observer.OnNext(extractor.GetDepthMap(filePath, index++));
_logger?.LogDebug($"Thread {Thread.CurrentThread.ManagedThreadId}: DepthMap extracted from \"{filePath}\"");
}
observer.OnCompleted();
return () => { };
});
}
}
我正在寻求: 1. 我上面的代码有什么问题 _icpBatchSolverService.RecursivelyMergeAsync 返回一个 Task<IPointCloud 并且是并发的,我希望这个拖链同时运行。 2. 我的代码还有什么问题?
【问题讨论】:
-
你不应该在create方法中调用subscribe。
-
我正在链接
IObservable提要。我的PairCollectionProducer也是和IObservable。 -
您还将 async 和 await 与 observable 混合在一起,这是一个很大的禁忌 - 说明原因。如果你把它清理干净,问题无疑会变得清晰。
-
我认为问题在于生成的
IObservable在所有异步操作完成之前就完成了。您可能需要推迟observer.OnCompleted()通话。 -
@MoonKnight - 你的主要问题是你在
Subscribe中调用OnNext。这是灾难的秘诀。此外,过度使用Observable.Create会让您的生活变得艰难。我真的需要从你那里得到一个minimal reproducible example 才能解决这一切,但我怀疑你可以用一些纯 RX 查询来做你需要的事情。你能提供给我完整的、可编译的代码吗?
标签: c# observable system.reactive rx.net