【发布时间】:2026-02-01 21:55:01
【问题描述】:
这是我正在处理的一个有趣的 RxJava 小谜题。假设我有一个无限的Observable<List<Parent>> infiniteParentListStream,每个Parent 都有一个无限的Observable<List<Child>> infiniteChildListStream 属性。
我想在发出的List<Parent> 中获取所有Parent 实例,并将它们发出的每个List<Child> 项目合并为一个完整的List<Child>,反映所有父母的所有孩子。
Parent 中的Observable<List<Child>> infiniteChildListStream 属性是无限的,这使得toList() 任务有点挑战性。
public final class NestedInfiniteTest {
private static final BehaviorSubject<Integer> parentSubject = BehaviorSubject.create(1);
private static final BehaviorSubject<Integer> childSubject = BehaviorSubject.create(1);
public static void main(String[] args) {
Observable<List<Parent>> infiniteParentListStream = parentSubject
.map(i -> Arrays.asList(new Parent(), new Parent(), new Parent()))
.cache(1);
Observable<List<Child>> allCurrentChildren = infiniteParentListStream.<List<Child>>flatMap(parentList ->
Observable.from(parentList)
.flatMap(p -> p.getInfiniteChildListStream().flatMap(Observable::from)).toList()
).cache(1);
allCurrentChildren.subscribe(cl -> System.out.println("WHOLE CHILD LIST SIZE: " + cl.size()));
}
private static final class Parent {
private final Observable<List<Child>> infiniteChildListStream = childSubject
.map(i -> Arrays.asList(new Child(), new Child(), new Child())).cache(1);
public Observable<List<Child>> getInfiniteChildListStream() {
return infiniteChildListStream;
}
}
private static final class Child {
}
}
当然,我发现的一个解决方法是通过调用first() 将infiniteChildListStream 变为有限。但这不太理想,因为它不再更新。
Observable<List<Child>> allCurrentChildren = infiniteParentListStream.<List<Child>>flatMap(parentList ->
Observable.from(parentList)
.flatMap(p -> p.getInfiniteChildListStream().first().flatMap(Observable::from)).toList()
).cache(1);
我觉得有一种方法可以手动调用Observable.create() 或使用flatMap() 技巧来解决这个问题。有没有更好的方法来做到这一点,并让事情与无限的来源反应?在我在这个 SSCCE 之外的实际应用程序中,这些 observable 是无限的,因为驱动 Parent 和 Child 的数据源可能会更改并发出新值...
我想我的问题的根源是如何将多个无限 Observable<List<T>> 合并到一个 Observable<List<T>> 中?
【问题讨论】:
-
更新:我认为
Observable.combineLatest()可能会提供解决方案?
标签: java reactive-programming rx-java