【发布时间】:2023-05-24 21:07:01
【问题描述】:
我正在使用 RxJava 来收集单独发出的 Observable 列表,并将它们组合成一个 Observable 列表(本质上与 flatMap 相反)。这是我的代码:
// myEvent.findMemberships() returns an Observable<List<Membership>>
myEvent.findMemberships()
.flatMap(new Func1<List<Membership>, Observable<User>>() {
@Override
public Observable<User> call(List<Membership> memberships) {
List<User> users = new ArrayList<User>();
for (Membership membership : memberships) {
users.add(membership.getUser());
}
return Observable.from(users);
}
})
.toList()
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<List<User>>() {
@Override
public void onCompleted() { }
@Override
public void onError(Throwable e) {
Timber.e(e, "Error when trying to get memberships");
}
@Override
public void onNext(List<User> users) {
Timber.d("%d users emitted", users.size());
}
})
我注意到我的 onNext 从未被调用过。我似乎无法理解这一点。如果我删除 .toList 调用并基本上输出单个用户(如下所示),它会通过发出每个项目来工作。
subscriptions //
.add(currentEvent.findMemberships()
.flatMap(new Func1<List<Membership>, Observable<User>>() {
@Override
public Observable<User> call(List<Membership> memberships) {
List<User> users = new ArrayList<User>();
for (Membership membership : memberships) {
users.add(membership.getUser());
}
return Observable.from(users);
}
})
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<User>() {
@Override
public void onCompleted() { }
@Override
public void onError(Throwable e) {
Timber.e(e, "Error when trying to get memberships");
}
@Override
public void onNext(User user) {
Timber.d("%d users emitted", user.getName());
}
}));
第一季度。我对 .toList 的理解不正确吗?
第二季度。如何将单独发出的Observable<Object>s 流组合成单个Observable<List<Object>>?
** 编辑
@kjones 完全解决了这个问题。我没有通过 findMemberships 调用 onComplete。我在下面添加了代码 sn-p。我的真实用例与一堆更多的转换有点复杂,这就是我需要使用 .toList 调用的原因。正如@zsxwing 也正确指出的那样,对于这个用例,一个简单的地图就足够了。
public Observable<List<Membership>> findMemberships() {
return Observable.create(new Observable.OnSubscribe<List<Membership>>() {
@Override
public void call(Subscriber<? super List<Membership>> subscriber) {
try {
// .....
List<Membership> memberships = queryMyDb();
subscriber.onNext(memberships);
// BELOW STATEMENT FIXES THE PROBLEM ><
// subscriber.onCompleted();
} catch (SQLException e) {
// ...
}
}
});
}
【问题讨论】:
-
Subscription.unsubscribe何时被调用?它可以在发射任何东西之前被调用。顺便说一句,您可以使用map将Observable<List<Membership>>转换为Observable<List<User>>。 -
感谢@zsxwing 订阅得到正确处理。问题在于未调用 onComplete。你是对的,对于这个简单的用例,一张地图就足够了。