【问题标题】:Make an IObservable subscription concurrent使 IObservable 订阅并发
【发布时间】:2020-01-14 23:29:47
【问题描述】:

我有以下代码

string dataDirectory = _settingsProvider.DataSettings.BaseDirectory;
_solverManagementService.MergedPointCloudProducer(dataDirectory, cancellationToken)
    .Subscribe(PointCloudMergerCompleted);

SolverManagementService _solverManagementService 在哪里

Public class SolverManagementService : ISolverManagementService
{
    public IObservable<IPointCloud> MergedPointCloudProducer(string dataDirectory,
        CancellationToken token)
    {
        return Observable.Create<IPointCloud>(
            observer =>
            {
                PairCollectionProducer(dataDirectory, token)
                    .Subscribe(pairCollection =>
                    {
                        observer.OnNext(_icpBatchSolverService.RecursivelyMergeAsync(
                            pairCollection, token));
                    },
                    onCompleted: () =>
                    {
                        observer.OnCompleted();
                    });
                return () => { };
            });
    }
    ... // Other methods. 
}

但是这里_icpBatchSolverService.RecursivelyMergeAsync(pairCollection, token) 是昂贵的,虽然这会返回一个Task&lt;IPointCloud&gt;,但我不会线程化这个并且这个调用会阻塞。由于RecursivelyMergeAsync 返回一个Task&lt;IPointCloud&gt; 可以等待,所以我修改了代码以使用async/await

public IObservable<IPointCloud> MergedPointCloudProducer(string dataDirectory,
    CancellationToken token)
{
    return Observable.Create<IPointCloud>(
        observer =>
        {
            PairCollectionProducer(dataDirectory, token)
                .Subscribe(async (pairCollection) =>
                {
                    observer.OnNext(await _icpBatchSolverService.RecursivelyMergeAsync(
                        pairCollection, token));
                },
                onCompleted: () =>
                {
                    observer.OnCompleted();
                });
            return () => { };
        });
}

但现在它立即返回并且控制台应用程序关闭。我确信这可以在不需要Semephores 的情况下完成,但我是 RX 新手。如何将 RecursivelyMergeAsync 配置为同时运行每个返回的 pairCollection 而不会阻塞并在所有递归合并完成时收到通知?

注意。在单元测试中,我执行以下操作

public class IcpBatchSolverServiceTests
{
    private Mock<ISettingsProvider> _mockSettingsProvider; 
    private IIcpBatchSolverService _icpBatchSolverService;

    [OneTimeSetUp]
    public void Setup()
    {
        _mockSettingsProvider = new Mock<ISettingsProvider>();

        _mockSettingsProvider.Setup(m => m.IcpSolverSettings).Returns(new IcpSolverSettings());
        _mockSettingsProvider.Object.IcpSolverSettings.MaximumDegreeOfParallelism = 6;

        Log.Logger = new LoggerConfiguration()
            .WriteTo.Console()
            .CreateLogger();

        var serviceProvider = new ServiceCollection()
            .AddLogging(builder =>
            {
                builder.SetMinimumLevel(LogLevel.Trace);
                builder.AddSerilog(Log.Logger);
            })
            .BuildServiceProvider();

        ILogger<IcpBatchSolverServiceTests> logger = serviceProvider
            .GetService<ILoggerFactory>()
            .CreateLogger<IcpBatchSolverServiceTests>();

        _icpBatchSolverService = new IcpBatchSolverService(_mockSettingsProvider.Object, logger);
    }

    [Test]
    public async Task CanSolveBatchAsync()
    {
        IPointCloud @static = PointCloudFactory.GetRandomPointCloud(1000);
        List<IPointCloud> pointCloudList = PointCloudFactory.GenerateRandomlyRotatedBatch(@static, 12);

        IPartitioningService<IPointCloud> ps = new PointCloudPartitioningService();
        IPointCloud result = await _icpBatchSolverService.RecursivelyMergeAsync(ps.Partition(pointCloudList), CancellationToken.None);

        Assert.AreEqual(@static.Vertices.Length, result.Vertices.Length);
    }
}

而且这个过程完美地同时进行。


编辑。概述当为不同的几何图形(不同角度的不同几何图形的深度图)提供一个文件夹时我需要做的处理,命名约定为 .NNNN.exr,其中 NNNN 是某个数值。对于一批文件。

  1. 使用不同几何图形的文件名将这些文件批处理到集合中。

foreach 文件批处理

  1. [*Serial*] 调用 C++ API 从图像文件中提取深度图。
  2. [*Parallel*] 将DepthMaps 转换为PointClouds。这可以一次完成。
  3. [*Parallel*] 使用 ICP 算法合并点云(昂贵),但将 TaskScheduler 的并发限制为两个线程(根据机器架构/内存等选择)

最后,我使用第 3 步中的合并点云再次调用 C++ API。所以在 RX 中,我当前的完整管道看起来像

public class SolverManagementService : ISolverManagementService
{
    private readonly IIcpBatchSolverService _icpBatchSolverService;
    private readonly IDepthMapToPointCloudAdapter _pointCloudAdapter;
    private readonly ILogger<SolverManagementService> _logger;

    public SolverManagementService(
        IIcpBatchSolverService icpBatchSolverService,
        IDepthMapToPointCloudAdapter pointCloudAdapter,
        ILogger<SolverManagementService> logger)
    {
        _icpBatchSolverService = icpBatchSolverService ?? throw new ArgumentNullException("icpBatchSolverService cannot be null");
        _pointCloudAdapter = pointCloudAdapter ?? throw new ArgumentNullException("pointCloudAdapter cannot be null");
        _logger = logger; 
    }

    public IObservable<IPointCloud> MergedPointCloudProducer(string dataDirectory, CancellationToken token)
    {
        return Observable.Create<IPointCloud>(
            observer =>
            {
                PairCollectionProducer(dataDirectory, token)
                    .Subscribe(pairCollection =>
                    {
                        observer.OnNext(_icpBatchSolverService.RecursivelyMergeAsync(pairCollection, token).Result);
                    },
                    onCompleted: () =>
                    {
                        observer.OnCompleted();
                    });
                return () => { };
            });
    }

    public IObservable<PairCollection<IPointCloud>> PairCollectionProducer(string dataDirectory, CancellationToken token)
    {
        return Observable.Create<PairCollection<IPointCloud>>(
            observer =>
            {
                Parallel.ForEach(
                    Utils.GetFileBatches(dataDirectory), 
                    (fileBatch) =>
                {
                    var producer = RawDepthMapProducer(fileBatch, token);
                    ConcurrentBag<IPointCloud> bag = new ConcurrentBag<IPointCloud>();

                    producer.Subscribe(rawDepthMap =>
                    {
                        bag.Add(_pointCloudAdapter.GetPointCloudFromDepthMap(rawDepthMap));
                        _logger?.LogDebug($"Thread {Thread.CurrentThread.ManagedThreadId}: {bag.Count:N0} PointCloud(s) added to concurrent bag");
                    }, 
                    onCompleted: () =>
                    {
                        PointCloudPartitioningService ps = new PointCloudPartitioningService();
                        observer.OnNext(ps.Partition(bag.ToList()));

                        _logger?.LogDebug($"Thread {Thread.CurrentThread.ManagedThreadId}: PointCloud PairCollection generated " +
                            $"for file set \"{Path.GetFileNameWithoutExtension(bag.FirstOrDefault().Source)}\"");
                    });
                });
                observer.OnCompleted();
                return () => { };
            });
    }

    public IObservable<RawDepthMap> RawDepthMapProducer(List<string> filePaths, CancellationToken token)
    {
        return Observable.Create<RawDepthMap>(
            observer =>
            {
                int index = 0;
                foreach(var filePath in filePaths)
                {
                    token.ThrowIfCancellationRequested();
                    var extractor = DepthMapExtractorFactory.GetDepthMapExtractor(filePath);

                    observer.OnNext(extractor.GetDepthMap(filePath, index++));
                    _logger?.LogDebug($"Thread {Thread.CurrentThread.ManagedThreadId}: DepthMap extracted from \"{filePath}\"");
                }
                observer.OnCompleted();
                return () => { };
            });
    }
}

我正在寻求: 1. 我上面的代码有什么问题 _icpBatchSolverService.RecursivelyMergeAsync 返回一个 Task&lt;IPointCloud 并且是并发的,我希望这个拖链同时运行。 2. 我的代码还有什么问题?

【问题讨论】:

  • 你不应该在create方法中调用subscribe。
  • 我正在链接IObservable 提要。我的PairCollectionProducer 也是和IObservable
  • 您还将 async 和 await 与 observable 混合在一起,这是一个很大的禁忌 - 说明原因。如果你把它清理干净,问题无疑会变得清晰。
  • 我认为问题在于生成的IObservable在所有异步操作完成之前就完成了。您可能需要推迟observer.OnCompleted() 通话。
  • @MoonKnight - 你的主要问题是你在Subscribe 中调用OnNext。这是灾难的秘诀。此外,过度使用Observable.Create 会让您的生活变得艰难。我真的需要从你那里得到一个minimal reproducible example 才能解决这一切,但我怀疑你可以用一些纯 RX 查询来做你需要的事情。你能提供给我完整的、可编译的代码吗?

标签: c# observable system.reactive rx.net


【解决方案1】:

我将留下一个通用的答案,因为上面的代码过于广泛,无法归结为。

有两种语法可用于定义异步行为。第一个是async/await 模式,第二个是Subscribe() 模式(反应式)。

异步与并发是一回事吗?

不,绝对不是。对于那些可能正在阅读本文但不知道的人来说,异步意味着“它稍后发生”,而不是“它同时发生”。通过使用这两种语法中的任何一种,您都可以定义在满足某些谓词后立即发生的行为。一个非常常见的用例是处理从 Web 服务器返回的响应。您需要发出请求,然后在响应返回时执行某些操作。

并发是不同的。例如,您可以使用Task.Run()Parallel.ForEach() 调用并发。在这两种情况下,您都在定义一个分叉。在Task.Run 的情况下,您稍后可能会执行Task.WaitAll。在Parallel.ForEach 的情况下,它将为您执行分叉/连接。当然,reactive 有自己的一组 fork/join 操作。

当我等待或订阅时会发生什么?

以下两行代码都有相同的行为,这种行为让很多程序员感到困惑:

var result = await myAsync();

myObservable.Subscribe(result =&gt; { ... });

在这两种情况下,程序的控制流都以可预测但可能令人困惑的方式移动。在第一种情况下,控制流在等待await 时返回给父调用者。在第二个中,控制流转到下一行代码,在返回结果时调用 lambda 表达式。

我在学习如何使用它们的人中看到的一个常见情况是尝试将 lambda 中的变量分配给父作用域中的地址。这是行不通的,因为在执行 lambda 之前很久,该范围就会不复存在。使用async/await 不太可能做一些愚蠢的事情,但您还必须记住,控制流将向上调用堆栈,直到定义下一个同步操作。 This article explains it in a little more depththis article 比较容易理解。

【讨论】:

  • 谢谢。我显然需要阅读更多关于 Reactive 以获得我需要的并发性,而无需混合咒语。我实际上不喜欢使用 RX 链接操作的方式,所以我可能会将其删除并在没有 RX 的情况下重新编写。以为我会把它用作学习练习,但我似乎还有很多东西要学。非常感谢您在这里的时间,非常感谢。
  • @MoonKnight- 我实际上认为您的情况需要 Rx。链接的工作方式与 LINQ 链接的工作方式非常相似。它用于将一个 observable 转换为另一个 observable。然后在最后调用 subscribe 函数以开始整个过程​​。你通常也不需要使用 Observable.create - 你的第一个 observable 来自你正在调用的任何东西。
  • 我也是这么想的,但是我可以很容易地编写我需要的并发代码,并且由于缺乏知识,RX 似乎让我陷入困境(也许我应该投资学习我需要的东西)和我需要的东西的复杂性。我已经更新了这个问题,如果你能提供任何明确的例子来说明我如何调整我必须做的事情,我很乐意提供你选择的赏金。
  • 嗯,Rx 确实不是一个并发 框架,它是一个恰好支持有限并发度的异步 框架。看看Task Parallel Library
  • 是的,我想我最好离开这里。再次感谢您花时间帮助我。最受赞赏。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2016-09-02
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多