【问题标题】:ReactiveX: calculate frequency of distinct elements in an ObservableReactiveX:计算 Observable 中不同元素的频率
【发布时间】:2015-11-24 21:20:54
【问题描述】:

我有一个Observable<String>。我想把它变成一个Map<String, Int>,它告诉我每个不同字符串的出现次数。

observable 包含约 10 亿个元素,其中 1000 个是不同的(因此,不能将整个数据集存储在 RAM 中)。目前我遍历Observable 并更新HashMap。我还确保在同一个线程上观察以避免竞争条件。但是,获取元素频率本质上应该很容易并行化,因此最好利用它。

有没有办法做到这一点?

【问题讨论】:

  • 由于数据源是顺序的,并且更新操作 O(1),我不确定你是否会通过并行获得多少。
  • @akarnokd 数据源是在多个线程上计算的,使用我当前的方法我必须同步这会减慢速度。

标签: java scala reactive-programming rx-java reactivex


【解决方案1】:

您可以使用groupBy 而不是自己维护HashMapgroupBy 将为每个密钥创建一个Observable,您可以在不同的调度程序上订阅它。例如,

public class KeyCounter {
    int key;
    long count;

    public KeyCounter(int key, long count) {
        this.key = key;
        this.count = count;
    }

    @Override
    public String toString() {
        return "key: " + key + " count: "  + count;
    }
}

@Test
public void foo() {
    Observable<Integer> o = Observable.just(1, 2, 3, 2, 1);
    o.groupBy(i -> i).flatMap(
        group ->
            group.subscribeOn(Schedulers.computation()).countLong().map(count -> new KeyCounter(group.getKey(), count))
    ).subscribe(System.out::println);

    try {
        Thread.sleep(1000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}

【讨论】:

    猜你喜欢
    • 2017-09-14
    • 2021-07-28
    • 2021-12-04
    • 2021-09-11
    • 2015-02-05
    • 2014-03-01
    • 2019-09-12
    • 2021-08-19
    相关资源
    最近更新 更多