【问题标题】:RxJava - Control one Observable by another oneRxJava - 控制一个 Observable 由另一个 Observable
【发布时间】:2018-02-14 03:27:31
【问题描述】:

更新

我正在寻找一种方法来控制另一个 observable 的流量。例如,让我们有 2 个单调递增(重要)可观察的整数:

source  : 1----2-2---2--3--3--4----4--5---6----8---9---10--------11------
control : -1----3----------------5-----------6-------9-----------12------

我需要生成一个新的 observable,其元素与源完全匹配,但它们的时间由 control observable 控制,方式如下:源值应始终小于或等于控制值。这意味着只有大于最近发布的控件的所有源值应该等到它们被控件“释放”

source         : 1----2-2---2--3--3--4----4--5---6----8---9---10--------11------
control        : -1----3----------------5-----------6-------9-----------12------
expected result: -1----2-2--2--3--3-----4-4--5------6-------8-9---------10-11---

请看下面的代码示例:

private static <T, C> Observable<T> combine(Observable<T> source, Observable<C> control, BiFunction<T, C, Boolean> predicate) {
    // ???
}

@Test
public void testControl() throws InterruptedException {
    Subject<Integer> control = PublishSubject.create();
    Observable<Integer> source = Observable.fromArray(1, 2, 2, 2, 3, 3, 4, 4, 5, 6, 8, 10, 11);
    Observable<Integer> combined = combine(source, control, (s, c) -> s <= c);
    control.subscribe(val -> System.out.println("Control: " + val));
    combined.observeOn(Schedulers.io()).subscribe(val -> System.out.println("Value: " + val));

    control.onNext(3); // should release 1,2,2,2,3,3
    Thread.sleep(1000);
    control.onNext(6); // should release 4,4,5,6
    Thread.sleep(1000);
    control.onNext(11); // should release 8,10,11
    Thread.sleep(1000);
}

【问题讨论】:

  • 你能添加一些代码吗?
  • @Devstr,我已经添加了代码并试图让问题更清楚

标签: java rx-java rx-java2


【解决方案1】:

由于我没有找到任何优雅的解决方案,我最终自己实现了它。如果有人可以提出更优雅的解决方案,我会很高兴(在这种情况下,我将不接受这个答案并接受更好的答案)。以下是我的解决方案:

private static <T, C> Observable<T> combine(Observable<T> source, Observable<C> control, BiFunction<T, C, Boolean> predicate) {
    return Observable.create(emitter -> {
        Queue<T> buffer = new ArrayDeque<>();
        AtomicReference<C> lastControl = new AtomicReference<>();
        CompletableSubject sourceCompletable = CompletableSubject.create();
        CompletableSubject controlCompletable = CompletableSubject.create();
        Disposable disposable = new CompositeDisposable(
                control.subscribe(
                        val -> {
                            lastControl.set(val);
                            synchronized (buffer) {
                                while (!buffer.isEmpty() && predicate.apply(buffer.peek(), val)) {
                                    emitter.onNext(buffer.poll());
                                }
                            }
                        },
                        emitter::onError,
                        controlCompletable::onComplete),
                source.subscribe(
                        val -> {
                            C lastControlVal = lastControl.get();
                            synchronized (buffer) {
                                if (lastControlVal != null && predicate.apply(val, lastControlVal)) {
                                    emitter.onNext(val);
                                } else {
                                    buffer.add(val);
                                }
                            }
                        },
                        emitter::onError,
                        sourceCompletable::onComplete),
                controlCompletable.andThen(sourceCompletable).subscribe(emitter::onComplete));
        emitter.setDisposable(disposable);
    });
}

【讨论】:

  • 这样的解决方案让我怀疑 RxJava 是否是解决您问题的正确工具..
【解决方案2】:

您需要将buffer 功能(将为您缓冲项目)与combineLatest 功能(将确定何时可以释放缓冲区)结合起来。

Observable<Integer> source;
Observable<Integer> control;

Observable<Integer> canRelease = Observable.combineLatest(source, control, (s, c) -> s < c ? s : null).filter(val -> val != null);

Observable<Integer> result = source.buffer(canRelease).flatMap(Observable::from);

生成的 Observable 将缓冲源项目,直到 canRelease 中有值。 canRelease 将在每次源 Observable 最新项小于控制 Observable 最新项时发出;

【讨论】:

  • 抱歉,问题好像不够清楚。我试图让它更清楚
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多