【问题标题】:RxJava rx.exceptions.MissingBackpressureException with filter and map带有过滤器和映射的 RxJava rx.exceptions.MissingBackpressureException
【发布时间】:2015-12-13 17:05:50
【问题描述】:


我刚开始玩 RxJava/RxAndroid,在理解如何正确处理背压方面遇到了一些问题。

我有一个可观察的文件扫描器,它可以扫描目录并发出文件。应尽快处理这些文件,不要跳过任何文件。

所以管道看起来像这样: Observable<File> -> Filter<File, Boolean> {check if file is of type .xyz}

不幸的是,我收到 rx.exceptions.MissingBackpressureException 错误。所以我读到了背压,如果我理解正确的话,损失更少的选项只是缓冲区和窗口。

我试过onBackpressureBuffer(), buffer() and window()。虽然所有onBackpressureX() 命令似乎都没有效果,但buffer() 将项目分组为List<File>。我的问题是:

  1. 我应该如何过滤这些组? filter(<List<File>>, Boolean) 毫无意义……
  2. 如何在我的文件扫描器中实现可观察的背压处理,以便它等到我的管道/操作员/订阅者有容量?
  3. 使用例如转换项目是一种好习惯吗? map() 进入 XYZ-Entities 并将它们存储在单独的列表中,而不是活跃的订阅者,而是作为运营商的副作用?

一些反馈甚至提示会很有帮助,我们将不胜感激。

【问题讨论】:

  • 尝试使用onBackPressureDrop()
  • 感谢您的建议。由于 order(?),onBackPressureX() 命令不起作用 - 请参阅我的答案。此外,一项要求是操作是无损的。

标签: java android reactive-programming rx-java rx-android


【解决方案1】:

我想我找到了解决问题的方法: 此代码无效:

Observable<File> task = scanner.getProcessDirectoryTask(mountPoint);
Subscription _subscription = task
       .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .onBackpressureBuffer(10000)
        .filter(new Func1<File, Boolean>() {
            @Override
            public Boolean call(File file) {
                return file.getAbsolutePath().endsWith("xyz");
            }
        })
        .buffer(100)
        .subscribe(new Observer<List<File>>() { /*whatever you want to do*/ }

但是这段代码工作的:

Observable<File> task = scanner.getProcessDirectoryTask(mountPoint);
Subscription _subscription = task
        .onBackpressureBuffer(10000)
        .filter(new Func1<File, Boolean>() {
            @Override
            public Boolean call(File file) {
                return file.getAbsolutePath().endsWith("xyz");
            }
        })
        .buffer(100)
       .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Observer<List<File>>() { /*whatever you want to do*/ }

所以看来subscribeOn()observeOn() 的顺序有很大的不同!

我的第三个问题有点离题但仍然开放。也许有人可以对此发表评论。

【讨论】:

  • 是的,Schedulers.io() 创建另一个线程来处理您对与 MainThread 不同的线程中的文件的请求,因此您无法与订阅中的视图交互。 AndroidSchedulers.mainThread 创建要在 MainThread 中执行的订阅者。
  • 必须在observeOn等敏感操作前加上onBackpressureXXX。但是getProcessDirectoryTask 是如何实现的呢?标准来源应支持背压。
  • 它是使用 Observable.create() 和 OnSubscribe 中长时间运行的文件扫描操作创建的。
猜你喜欢
  • 1970-01-01
  • 2017-01-17
  • 2019-05-01
  • 2021-06-29
  • 2012-07-23
  • 1970-01-01
  • 1970-01-01
  • 2012-04-24
  • 1970-01-01
相关资源
最近更新 更多