【发布时间】:2017-09-29 04:39:02
【问题描述】:
我正在学习 RxJava,但在避免可变状态时遇到了问题。
我要解决的问题很简单:有一个项目输入流和一个项目组输入流。每个项目都属于一个组(具有组标识符)并具有一些数据。每个组都有一个标识符,也有一些数据。许多项目可能属于同一组。目标是将这些输入流组合成一个(项目、组)对的输出流,这样:
- (项目,组)对仅在项目及其组都已知时才发出
- 每次收到更新的项目数据时,都必须发出一个更新的(项目、组)对
- 当接收到该组的更新数据时,必须发出对应于属于该组的所有项目的(项目,组)对
这是一个工作实现(ItemWithGroup 是一个表示(项目,组)对的类):
public class StateMutationWithinOperator {
private Map<Integer, Group> allGroups = new HashMap<>();
private Map<Integer, List<Item>> allItems = new HashMap<>();
public Observable<ItemWithGroup> observe(Observable<Item> items, Observable<Group> groups) {
return Observable.merge(
items.flatMap(this::processItem),
groups.flatMap(this::processGroup));
}
private Observable<ItemWithGroup> processItem(Item item) {
allItems.computeIfAbsent(item.groupId, missing -> new ArrayList<>())
.add(item);
return Observable.just(allGroups)
.filter(groups -> groups.containsKey(item.groupId))
.map(groups -> new ItemWithGroup(item, groups.get(item.groupId)));
}
private Observable<ItemWithGroup> processGroup(Group group) {
allGroups.put(group.id, group);
return Observable.just(allItems)
.filter(items -> items.containsKey(group.id))
.flatMapIterable(items -> items.get(group.id))
.map(item -> new ItemWithGroup(item, group));
}
}
我想通过避免改变存储在 allGroups 和 allItems 字段中的共享状态来避免在 processItem() 和 processGroup() 中产生副作用。我如何做到这一点?
提前致谢!
【问题讨论】:
-
你有一个发布(选择器)可以使用可以轻松地拆分 observables。您可以将您的 allGroups/allItems 放入该发布运算符。
-
感谢您的回复!如果我理解正确,您的建议是使用 publish(selector) 运算符来拆分输入流并使用拆分输入流来更新状态和发出新的输出项,例如:
inputGroups.publish(groups -> { groups.subscribe(group -> allGroups.put(group.id, group)); return groups.flatMap(this::processGroup); });它更好,但发布() 运算符仍然具有改变 allGroups 的副作用。当两个组同时来自两个线程时,这可能会导致对 allGroups 的并发写入。 -
回答我自己:根据observable contract,observables 中的发射必须是序列化的,所以两个组更新同时发生的情况是不可能的。但是,我们仍然在 publish() 方法中产生了副作用。是不是很糟糕,会导致什么问题?