【问题标题】:avoiding mutable state with RxJava使用 RxJava 避免可变状态
【发布时间】:2017-09-29 04:39:02
【问题描述】:

我正在学习 RxJava,但在避免可变状态时遇到了问题。

我要解决的问题很简单:有一个项目输入流和一个项目组输入流。每个项目都属于一个组(具有组标识符)并具有一些数据。每个组都有一个标识符,也有一些数据。许多项目可能属于同一组。目标是将这些输入流组合成一个(项目、组)对的输出流,这样:

  • (项目,组)对仅在项目及其组都已知时才发出
  • 每次收到更新的项目数据时,都必须发出一个更新的(项目、组)对
  • 当接收到该组的更新数据时,必须发出对应于属于该组的所有项目的(项目,组)对

这是一个工作实现(ItemWithGroup 是一个表示(项目,组)对的类):

public class StateMutationWithinOperator {
    private Map<Integer, Group> allGroups = new HashMap<>();
    private Map<Integer, List<Item>> allItems = new HashMap<>();

    public Observable<ItemWithGroup> observe(Observable<Item> items, Observable<Group> groups) {
        return Observable.merge(
            items.flatMap(this::processItem), 
            groups.flatMap(this::processGroup));
    }

    private Observable<ItemWithGroup> processItem(Item item) {
        allItems.computeIfAbsent(item.groupId, missing -> new ArrayList<>())
                .add(item);

        return Observable.just(allGroups)
                .filter(groups -> groups.containsKey(item.groupId))
                .map(groups -> new ItemWithGroup(item, groups.get(item.groupId)));
    }

    private Observable<ItemWithGroup> processGroup(Group group) {
        allGroups.put(group.id, group);

        return Observable.just(allItems)
                .filter(items -> items.containsKey(group.id))
                .flatMapIterable(items -> items.get(group.id))
                .map(item -> new ItemWithGroup(item, group));
    }
}

我想通过避免改变存储在 allGroups 和 allItems 字段中的共享状态来避免在 processItem() 和 processGroup() 中产生副作用。我如何做到这一点?

提前致谢!

【问题讨论】:

  • 你有一个发布(选择器)可以使用可以轻松地拆分 observables。您可以将您的 allGroups/allItems 放入该发布运算符。
  • 感谢您的回复!如果我理解正确,您的建议是使用 publish(selector) 运算符来拆分输入流并使用拆分输入流来更新状态和发出新的输出项,例如:inputGroups.publish(groups -&gt; { groups.subscribe(group -&gt; allGroups.put(group.id, group)); return groups.flatMap(this::processGroup); }); 它更好,但发布() 运算符仍然具有改变 allGroups 的副作用。当两个组同时来自两个线程时,这可能会导致对 allGroups 的并发写入。
  • 回答我自己:根据observable contract,observables 中的发射必须是序列化的,所以两个组更新同时发生的情况是不可能的。但是,我们仍然在 publish() 方法中产生了副作用。是不是很糟糕,会导致什么问题?

标签: java rx-java rx-java2


【解决方案1】:

这是基于@PhoenixWang 的建议的改进:

public class StateMutationWithinOperator {
  private Map<Integer, Group> allGroups = new HashMap<>();
  private Map<Integer, List<Item>> allItems = new HashMap<>();

  public Observable<ItemWithGroup> observe(Observable<Item> inputItems,
                                           Observable<Group> inputGroups) {
    Observable<ItemWithGroup> processedItems = inputGroups.publish(groups -> {
      groups.subscribe(group -> allGroups.put(group.id, group));

      return groups.flatMap(this::processGroup);
    });

    Observable<ItemWithGroup> processedGroups = inputItems.publish(items -> {
      items.subscribe(item
          -> allItems
          .computeIfAbsent(item.groupId, missing -> new ArrayList<>())
          .add(item));

      return items.flatMap(this::processItem);
    });

    return Observable.merge(processedItems, processedGroups);
  }

  private Observable<ItemWithGroup> processItem(Item item) {
    return Observable.just(allGroups)
        .filter(groups -> groups.containsKey(item.groupId))
        .map(groups -> new ItemWithGroup(item, groups.get(item.groupId)));
  }

  private Observable<ItemWithGroup> processGroup(Group group) {
    return Observable.just(allItems)
        .filter(items -> items.containsKey(group.id))
        .flatMapIterable(items -> items.get(group.id))
        .map(item -> new ItemWithGroup(item, group));
  }
}

更好,但是 publish() 操作符仍然会改变共享状态。我可以看到的一个问题是,我们可能同时从两个不同的线程接收一个项目和一个组,并最终同时读取和写入同一个地图。

【讨论】:

    【解决方案2】:

    首先,使用SerializedSubject 确保遵守线程安全。其次,使用ConcurrentMap 来保存中间表示; compute() 方法是原始的,可以防止竞争插入。第三,可以使用observeOn()强制订阅groupBy()操作符为串行。

    这是一个工作模型:

    public class ItemWithGroupTest {
    
        static class Item {
            final Integer id;
            final Integer groupId;
            final Integer value;
    
            @Override
            public String toString() {
                return "Item [id=" + id + ", groupId=" + groupId + ", value=" + value + "]";
            }
    
            public Item( Integer id, Integer groupId, Integer value ) {
                super();
                this.id = id;
                this.groupId = groupId;
                this.value = value;
            }
    
        }
    
        static class Group {
            final Integer id;
            final Integer value;
    
            @Override
            public String toString() {
                return "Group [id=" + id + ", value=" + value + "]";
            }
    
            public Group( Integer id, Integer value ) {
                super();
                this.id = id;
                this.value = value;
            }
        }
    
        static class ItemWithGroup {
            final Item item;
            final Group group;
    
            @Override
            public String toString() {
                return "ItemWithGroup [item=" + item + ", group=" + group + "]";
            }
    
            public ItemWithGroup( Item item, Group group ) {
                super();
                this.item = item;
                this.group = group;
            }
        }
    
        private final Logger logger =
                LoggerFactory.getLogger( ItemWithGroupTest.class );
        private SerializedSubject<Item, Item> originItems;
        private SerializedSubject<Group, Group> originGroups;
        private Map<Integer, SerializedSubject<Group, Group>> mapGroups;
        private SerializedSubject<ItemWithGroup, ItemWithGroup> itemGroupOutput;
    
        @Test
        public void testCreation() throws Exception {
    
            originGroups.onNext( new Group( Integer.valueOf( 3 ), Integer.valueOf( 42 ) ) );
            originItems.onNext( new Item( Integer.valueOf( 1 ), Integer.valueOf( 3 ), Integer.valueOf( 2 ) ) );
            originItems.onNext( new Item( Integer.valueOf( 2 ), Integer.valueOf( 3 ), Integer.valueOf( 8 ) ) );
            originItems.onNext( new Item( Integer.valueOf( 4 ), Integer.valueOf( 2 ), Integer.valueOf( 13 ) ) );
            originItems.onNext( new Item( Integer.valueOf( 1 ), Integer.valueOf( 3 ), Integer.valueOf( 31 ) ) );
            originGroups.onNext( new Group( Integer.valueOf( 3 ), Integer.valueOf( 44 ) ) );
            originGroups.onNext( new Group( Integer.valueOf( 2 ), Integer.valueOf( 41 ) ) );
    
        }
    
        @Before
        public void setup() {
            originItems = PublishSubject.<Item>create().toSerialized();
            originGroups = PublishSubject.<Group>create().toSerialized();
            mapGroups = Maps.newConcurrentMap();
            itemGroupOutput = PublishSubject.<ItemWithGroup>create().toSerialized();
            itemGroupOutput
                    .subscribe( v -> logger.debug( "output is {}", v ) );
    
            originGroups
                    .doOnNext( v -> logger.debug( "input group {}", v ) )
                    .subscribe();
            originItems
                    .doOnNext( v -> logger.debug( "input item {}", v ) )
                    .subscribe();
    
            originGroups.groupBy( group -> group.id )
                    .subscribe( gv -> {
                        Integer key = gv.getKey();
                        gv.subscribe( getMapGroup( key ) );
                    } );
            originItems.groupBy( item -> item.id )
                    .subscribe( itemsGroupedByGroupId -> {
                        Observable<Item> itemV = itemsGroupedByGroupId.share();
                        itemV
                                .take( 1 )
                                .flatMap( vFirst -> Observable.combineLatest( itemV.startWith( vFirst ),
                                        getMapGroup( vFirst.groupId ),
                                        ( i, g ) -> new ItemWithGroup( i, g ) ) )
                                .subscribe( ig -> itemGroupOutput.onNext( ig ) );
                    } );
        }
    
        public Subject<Group, Group> getMapGroup( Integer key ) {
            return mapGroups.compute( key,
                    ( id, obs ) -> obs != null ? obs : BehaviorSubject.<Group>create().toSerialized() );
        }
    
    }
    

    您将无法避免可变状态。您可以控制可变性出现的时间和地点。在上面的代码中,它出现在同步区域内部,例如在compute()groupBy() 运算符内部。

    【讨论】:

    • 鲍勃,谢谢你的建议!这是非常有见地的!控制可变性而不是试图避免它的建议,序列化主题的使用和 groupBy() 运算符是我现在要反思的想法。我已将您的实现插入到我的单元测试中。它不仅在第一次尝试时运行良好,而且还让我在自己的实现中发现了一个错误:Item 类没有 id,因此没有正确处理 Item 更新。我将根据您的意见修改我的实施并提出一个新的实施。非常感谢!
    猜你喜欢
    • 2019-01-15
    • 2023-03-19
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多