【问题标题】:RxJava- Consolidating multiple, infinite Observable<List<T>>?RxJava- 合并多个无限的 Observable<List<T>>?
【发布时间】:2026-02-01 21:55:01
【问题描述】:

这是我正在处理的一个有趣的 RxJava 小谜题。假设我有一个无限的Observable&lt;List&lt;Parent&gt;&gt; infiniteParentListStream,每个Parent 都有一个无限的Observable&lt;List&lt;Child&gt;&gt; infiniteChildListStream 属性。

我想在发出的List&lt;Parent&gt; 中获取所有Parent 实例,并将它们发出的每个List&lt;Child&gt; 项目合并为一个完整的List&lt;Child&gt;,反映所有父母的所有孩子。

Parent 中的Observable&lt;List&lt;Child&gt;&gt; infiniteChildListStream 属性是无限的,这使得toList() 任务有点挑战性。

public final class NestedInfiniteTest {

    private static final BehaviorSubject<Integer> parentSubject = BehaviorSubject.create(1);
    private static final BehaviorSubject<Integer> childSubject = BehaviorSubject.create(1);

    public static void main(String[] args) {


        Observable<List<Parent>> infiniteParentListStream = parentSubject
                .map(i -> Arrays.asList(new Parent(), new Parent(), new Parent()))
                .cache(1);

        Observable<List<Child>> allCurrentChildren = infiniteParentListStream.<List<Child>>flatMap(parentList ->
                Observable.from(parentList)
                        .flatMap(p -> p.getInfiniteChildListStream().flatMap(Observable::from)).toList()
        ).cache(1);

        allCurrentChildren.subscribe(cl -> System.out.println("WHOLE CHILD LIST SIZE: " + cl.size()));
    }

    private static final class Parent {
        private final Observable<List<Child>> infiniteChildListStream = childSubject
                .map(i -> Arrays.asList(new Child(), new Child(), new Child())).cache(1);

        public Observable<List<Child>> getInfiniteChildListStream() {
            return infiniteChildListStream;
        }
    }
    private static final class Child {

    }
}

当然,我发现的一个解决方法是通过调用first()infiniteChildListStream 变为有限。但这不太理想,因为它不再更新。

Observable<List<Child>> allCurrentChildren = infiniteParentListStream.<List<Child>>flatMap(parentList ->
        Observable.from(parentList)
                .flatMap(p -> p.getInfiniteChildListStream().first().flatMap(Observable::from)).toList()
).cache(1);

我觉得有一种方法可以手动调用Observable.create() 或使用flatMap() 技巧来解决这个问题。有没有更好的方法来做到这一点,并让事情与无限的来源反应?在我在这个 SSCCE 之外的实际应用程序中,这些 observable 是无限的,因为驱动 ParentChild 的数据源可能会更改并发出新值...

我想我的问题的根源是如何将多个无限 Observable&lt;List&lt;T&gt;&gt; 合并到一个 Observable&lt;List&lt;T&gt;&gt; 中?

【问题讨论】:

  • 更新:我认为Observable.combineLatest() 可能会提供解决方案?

标签: java reactive-programming rx-java


【解决方案1】:

我想我是通过使用Observable.combineLatest() 弄明白的。为了增强测试,我还修改了源 observables 以根据对象推送的整数值创建不同的List 大小。这看起来效果很好。

public final class NestedInfiniteTest {

    private static final BehaviorSubject<Integer> parentSubject = BehaviorSubject.create(1);
    private static final BehaviorSubject<Integer> childSubject = BehaviorSubject.create(1);

    public static void main(String[] args) {

        Observable<List<Parent>> infiniteParentListStream = parentSubject
                .map(i -> IntStream.range(0,i).mapToObj(val -> new Parent()).collect(Collectors.toList()))
                .cache(1);

        Observable<List<Child>> allCurrentChildren = infiniteParentListStream.flatMap(parentList ->
                Observable.<Observable<List<Child>>>create(s -> {
                    parentList.stream().map(Parent::getInfiniteChildListStream).forEach(s::onNext);
                    s.onCompleted();
                })
                .toList() //List<<Observable<List<Child>>>>
                .flatMap(consolidatedChildList -> Observable.combineLatest(consolidatedChildList, new FuncN<List<Child>>() {
                    @Override
                    public List<Child> call(Object... args) {
                        ArrayList<Child> list = new ArrayList<>();
                        for (Object obj : args) {
                            list.addAll((List<Child>) obj);
                        }
                        return list;
                    }
                }))
        );


        allCurrentChildren.subscribe(cl -> System.out.println("WHOLE CHILD LIST SIZE: " + cl.size()));
        childSubject.onNext(10);
        parentSubject.onNext(5);
        childSubject.onNext(2);
    }

    private static final class Parent {
        private final Observable<List<Child>> infiniteChildListStream = childSubject
                .map(i -> IntStream.range(0, i).mapToObj(val -> new Child()).collect(Collectors.toList())).cache(1);

        public Observable<List<Child>> getInfiniteChildListStream() {
            return infiniteChildListStream;
        }
    }
    private static final class Child {

    }
}

输出:

WHOLE CHILD LIST SIZE: 1   //parentSubject = 1, childSubject = 1
WHOLE CHILD LIST SIZE: 10  //parentSubject = 1, childSubject = 10
WHOLE CHILD LIST SIZE: 50  //parentSubject = 5, childSubject = 10
WHOLE CHILD LIST SIZE: 2   //parentSubject = 5, childSubject = 2, adjusting
WHOLE CHILD LIST SIZE: 42  //adjusting
WHOLE CHILD LIST SIZE: 34  //adjusting
WHOLE CHILD LIST SIZE: 26  //adjusting
WHOLE CHILD LIST SIZE: 18  //adjusting
WHOLE CHILD LIST SIZE: 10  //parentSubject = 5, childSubject = 2, done!

更新:创建了一个 Transformer 来执行这个任务

public static class CombinedListTransformer<T,R> implements Observable.Transformer<List<T>,List<R>> {

    private final Func1<T,Observable<List<R>>> listMapper;

    public CombinedListTransformer(Func1<T,Observable<List<R>>> listMapper) {
        this.listMapper = listMapper;
    }
    @Override
    public Observable<List<R>> call(Observable<List<T>> sourceList) {
        return sourceList.flatMap(sl ->
            Observable.from(sl).map(t -> listMapper.call(t)).toList() //List<Observable<List<R>>
            .flatMap(consolidatedChildList -> Observable.combineLatest(consolidatedChildList, args -> {
                ArrayList<R> list = new ArrayList<>();
                for (Object obj : args) {
                    list.addAll((List<R>) obj);
                }
                return list;
            }))
        );
    }
}

【讨论】:

  • 为什么不使用 sl -> Observable.from(sl).map(listMapper) 而不是 Observable.create?
  • 刚刚更新,你说得对,它让它变得不那么冗长了。
  • listMapper 已经是一个 Func1 所以你不必写 t -> listMapper.call(t)
  • 哦是的...我总是忘记函数也是对象。