【发布时间】:2018-02-14 03:27:31
【问题描述】:
更新
我正在寻找一种方法来控制另一个 observable 的流量。例如,让我们有 2 个单调递增(重要)可观察的整数:
source : 1----2-2---2--3--3--4----4--5---6----8---9---10--------11------
control : -1----3----------------5-----------6-------9-----------12------
我需要生成一个新的 observable,其元素与源完全匹配,但它们的时间由 control observable 控制,方式如下:源值应始终小于或等于控制值。这意味着只有大于最近发布的控件的所有源值应该等到它们被控件“释放”
source : 1----2-2---2--3--3--4----4--5---6----8---9---10--------11------
control : -1----3----------------5-----------6-------9-----------12------
expected result: -1----2-2--2--3--3-----4-4--5------6-------8-9---------10-11---
请看下面的代码示例:
private static <T, C> Observable<T> combine(Observable<T> source, Observable<C> control, BiFunction<T, C, Boolean> predicate) {
// ???
}
@Test
public void testControl() throws InterruptedException {
Subject<Integer> control = PublishSubject.create();
Observable<Integer> source = Observable.fromArray(1, 2, 2, 2, 3, 3, 4, 4, 5, 6, 8, 10, 11);
Observable<Integer> combined = combine(source, control, (s, c) -> s <= c);
control.subscribe(val -> System.out.println("Control: " + val));
combined.observeOn(Schedulers.io()).subscribe(val -> System.out.println("Value: " + val));
control.onNext(3); // should release 1,2,2,2,3,3
Thread.sleep(1000);
control.onNext(6); // should release 4,4,5,6
Thread.sleep(1000);
control.onNext(11); // should release 8,10,11
Thread.sleep(1000);
}
【问题讨论】:
-
你能添加一些代码吗?
-
@Devstr,我已经添加了代码并试图让问题更清楚