【问题标题】:Collecting Observables to a List doesn't seem to emit the collection at once将 Observables 收集到列表似乎不会立即发出集合
【发布时间】:2023-05-24 21:07:01
【问题描述】:

我正在使用 RxJava 来收集单独发出的 Observable 列表,并将它们组合成一个 Observable 列表(本质上与 flatMap 相反)。这是我的代码:

        // myEvent.findMemberships() returns an Observable<List<Membership>>

        myEvent.findMemberships()
             .flatMap(new Func1<List<Membership>, Observable<User>>() {
               @Override
               public Observable<User> call(List<Membership> memberships) {
                 List<User> users = new ArrayList<User>();
                 for (Membership membership : memberships) {
                   users.add(membership.getUser());
                 }
                 return Observable.from(users);
               }
             })
             .toList()
             .subscribeOn(Schedulers.newThread())
             .observeOn(AndroidSchedulers.mainThread())
             .subscribe(new Observer<List<User>>() {
               @Override
               public void onCompleted() { }

               @Override
               public void onError(Throwable e) {
                 Timber.e(e, "Error when trying to get memberships");
               }

               @Override
               public void onNext(List<User> users) {
                 Timber.d("%d users emitted", users.size());
               }
             })

我注意到我的 onNext 从未被调用过。我似乎无法理解这一点。如果我删除 .toList 调用并基本上输出单个用户(如下所示),它会通过发出每个项目来工作。

subscriptions //
    .add(currentEvent.findMemberships()
             .flatMap(new Func1<List<Membership>, Observable<User>>() {
               @Override
               public Observable<User> call(List<Membership> memberships) {
                 List<User> users = new ArrayList<User>();
                 for (Membership membership : memberships) {
                   users.add(membership.getUser());
                 }
                 return Observable.from(users);
               }
             })
             .subscribeOn(Schedulers.newThread())
             .observeOn(AndroidSchedulers.mainThread())
             .subscribe(new Observer<User>() {
               @Override
               public void onCompleted() { }

               @Override
               public void onError(Throwable e) {
                 Timber.e(e, "Error when trying to get memberships");
               }

               @Override
               public void onNext(User user) {
                 Timber.d("%d users emitted", user.getName());
               }
             }));

第一季度。我对 .toList 的理解不正确吗?

第二季度。如何将单独发出的Observable&lt;Object&gt;s 流组合成单个Observable&lt;List&lt;Object&gt;&gt;

** 编辑

@kjones 完全解决了这个问题。我没有通过 findMemberships 调用 onComplete。我在下面添加了代码 sn-p。我的真实用例与一堆更多的转换有点复杂,这就是我需要使用 .toList 调用的原因。正如@zsxwing 也正确指出的那样,对于这个用例,一个简单的地图就足够了。

public Observable<List<Membership>> findMemberships() {
return Observable.create(new Observable.OnSubscribe<List<Membership>>() {
  @Override
  public void call(Subscriber<? super List<Membership>> subscriber) {
    try {
      // .....
      List<Membership> memberships = queryMyDb();

      subscriber.onNext(memberships);

      // BELOW STATEMENT FIXES THE PROBLEM ><
      // subscriber.onCompleted();

    } catch (SQLException e) {
      // ...
    }
  }
});

}

【问题讨论】:

  • Subscription.unsubscribe 何时被调用?它可以在发射任何东西之前被调用。顺便说一句,您可以使用mapObservable&lt;List&lt;Membership&gt;&gt; 转换为Observable&lt;List&lt;User&gt;&gt;
  • 感谢@zsxwing 订阅得到正确处理。问题在于未调用 onComplete。你是对的,对于这个简单的用例,一张地图就足够了。

标签: java frp rx-java


【解决方案1】:

看起来myEvent.findMemberships() 调用返回的 Observable 从未调用 onComplete。你能出示这段代码吗?

如果是这种情况,它将解释您所看到的行为。 .toList() 将不会发出列表,直到所有项目都已发出(由 onComplete 发出信号)。

没有.toList() 的第二个版本将按如下方式进行:

.findMemberships()
    emits a single List<Membership>
.flatMap()
    transforms List<Membership> into a single List<User>
    Observable.from(users) creates an observable that emits each user
.subscribe()
    onNext() is called for each user
    onCompleted() is never called.

您的原始版本:

.findMemberships()
    emits a single List<Membership>
.flatMap()
    transforms List<Membership> into a single List<User>
    Observable.from(users) creates an observable that emits each user
.toList()
    buffers each User waiting for onCompleted() to be called
    onCompleted is never called because the .findMemberships Observable never completes

有几种解决方案:

1) 在完成时调用 findMemberShips() Observable。如果 findMemberShips() 返回的 Observable 是 Rx 主题(PublishSubject、BehaviorSubject 等),这可能是不可取的

2) 使用Observable.just() 而不是Observable.from()。您已经在 .flatMap() 中有一个 List&lt;User&gt;,只需将其返回即可。使用Observable.from(users) 创建一个可发出每个用户的 Observable。 Observable.just(users) 将创建一个发出单个 List&lt;User&gt; 的 Observable。不需要.toList()

3) 使用.map() 而不是.flatMap()。同样,不需要.toList()。由于每个List&lt;Membership&gt; 都会转换为List&lt;User&gt;,因此您只需使用.map()

myEvent
    .findMemberships()
    .map(new Func1<List<Membership>, List<User>>() {
        @Override
        public List<User> call(List<Membership> memberships) {
            List<User> users = new ArrayList<User>();
            for (Membership membership : memberships) {
                users.add(membership.getUser());
            }
            return users;
         }
    })

【讨论】:

  • 圣牛。你完全解决了这个问题。问题在于我的 Observable api 没有调用 onComplete。我将添加代码 sn-p 以显示我最初是如何创建它的。我的真实用例更加做作,这就是为什么我真的需要使用 .toList (有一堆更必要的转换),这就是直接映射不起作用的原因。谢谢。你的基础很强大!
  • 已经坚持了好几个小时,很棒的答案