【发布时间】:2021-08-22 03:53:45
【问题描述】:
我有 2 个 observables,我想实现以下目标:
从Observable1中获取值,然后忽略Observable1,只等待Observable2中的值那么同样,从Observable1 以此类推..
有没有办法在 rxjs 中实现这一点?操作决策树没有用。我也玩过switchMap(),但这只能做第一部分
【问题讨论】:
标签: typescript rxjs
我有 2 个 observables,我想实现以下目标:
从Observable1中获取值,然后忽略Observable1,只等待Observable2中的值那么同样,从Observable1 以此类推..
有没有办法在 rxjs 中实现这一点?操作决策树没有用。我也玩过switchMap(),但这只能做第一部分
【问题讨论】:
标签: typescript rxjs
因为是星期五,所以我写了一个实现。
这是 javascript 不是打字稿,但你可以看到我是如何解决它的。
function roundRobinMerge(...observables) {
const indexedObservables = observables.map(
(obs, index) =>
obs.pipe(
map(
v => ({
index: index,
value: v
})
)
)
);
let nextIndexToListenTo = 0;
return merge(...indexedObservables)
.pipe(
filter(n => n.index === nextIndexToListenTo),
tap(n => nextIndexToListenTo = (n.index + 1)%indexedObservables.length),
map(n => n.value)
);
}
要查看此操作,请转至 https://rxviz.com,然后复制并粘贴以下代码
const { interval, merge } = Rx;
const { take, map, filter, tap, share } = RxOperators;
function roundRobinMerge(...observables) {
const indexedObservables = observables.map(
(obs, index) =>
obs.pipe(
map(
v => ({
index: index,
value: v
})
)
)
);
let nextIndexToListenTo = 0;
return merge(...indexedObservables)
.pipe(
filter(n => n.index === nextIndexToListenTo),
tap(n => nextIndexToListenTo = (n.index + 1)%indexedObservables.length),
map(n => n.value)
);
}
// Now to demonstrate it
function mapper(label) {
let index = 0;
return () => `${label}${++index}`;
};
const a = interval(750)
.pipe(
map(mapper('a')),
take(10),
share()
);
const b = interval(1300)
.pipe(
map(mapper('b')),
take(10),
share()
);
merge(
[
a,
b,
roundRobinMerge(a, b)
]
);
【讨论】:
对此可能有更优雅的解决方案,但我通常会使用scan 并创建一个little state machine:
import { merge, Subject } from 'rxjs';
import { scan, filter, map, take } from 'rxjs/operators';
const left = new Subject<number>();
const right = new Subject<number>();
const example = merge(
left.pipe(map(val => ({ tag: 'left', val }))),
right.pipe(map(val => ({ tag: 'right', val })))
).pipe(
scan<{ tag: string; val: number }, { previous: string; emit?: number }>(
(acc, { tag, val }) => {
if (acc.previous !== tag) {
return { previous: tag, emit: val };
}
return { ...acc, emit: null };
},
{
previous: 'right',
emit: null
}
),
filter(x => x.emit !== null),
map(x => x.emit)
);
console.clear();
const subscribe = example
.pipe(take(10))
.subscribe(val => console.log('output', val));
right.next(1);
left.next(2);
right.next(3);
left.next(4);
left.next(5);
right.next(6);
【讨论】:
您可能可以通过 Subjects 实现您想要的游戏。
这是一个可能的解决方案。
您从创建某种开关 的概念开始,该开关在Observable1 或Observable2 通知时随时触发。一旦 switch 发出,我们将switchMap 发送到源可观察对象之一,即Observable1 和Observable2),我们等待它通知,然后我们将switchMap 发送到另一个,我们再次等待它通知,最后我们next 开关重新开始。
看代码大概逻辑更清楚了。
// these are the 2 switches
const switch1 = new Subject<any>();
const switch2 = new Subject<any>();
// res1 and res2 are the Subjects that notify the results we want
const res1 = new Subject<any>();
const res2 = new Subject<any>();
// for each source Observable we create a pipe along the logic explained above
const switchToObs2 = switch1.pipe(
switchMap(() =>
observable1.pipe(first()).pipe(
tap((v) => res1.next(v)),
switchMap(() => observable2),
tap((v) => switch1.next(v))
)
)
);
const switchToObs1 = switch2.pipe(
switchMap(() =>
observable2.pipe(first()).pipe(
tap((v) => res2.next(v)),
switchMap(() => observable1),
tap((v) => switch2.next(v))
)
)
);
// we subscribe to the 2 pipes to activate the processing
merge(switchToObs2, switchToObs1).subscribe();
// we start the switches
switch1.next("start");
switch2.next("start");
// eventually we subscribe to the result
merge(res2, res1).subscribe(console.log);
Here a stackblitz 使用此解决方案
【讨论】:
似乎有很多方法可以解决这个问题。这是另一个expand。
import { interval } from 'rxjs';
import { expand, mapTo, take } from 'rxjs/operators';
const left = interval(1000).pipe(mapTo('left'));
const right = interval(5000).pipe(mapTo('right'));
const example = left.pipe(
take(1),
expand(x => (x === 'left' ? right : left).pipe(take(1)))
);
const subscribe = example.subscribe(val => console.log(val));
如果有人有办法使用buffer* 运算符之一,我会很感兴趣
【讨论】:
我为您创建了一个自定义运算符 (toggleEmit),它接受 N 可观察对象并在它们之间切换。
const result$ = toggleEmit(source1$, source2$);
实现的功能:
N observables 可以提供给 toggleEmit 操作符0 到索引N.length。当最后一个 observable 发出时,它再次从 0 开始0 - N.length。多个发射是distincted
仅供参考: 代码实际上非常直接。它看起来很大,因为我添加了几个 cmets 以避免混淆。如果您有问题评论,我会尽力回答。
const { Subject, merge } = rxjs;
const { map, scan, distinctUntilChanged, filter } = rxjs.operators;
const source1$ = new Subject();
const source2$ = new Subject();
function toggleEmit(...observables) {
// amount of all observables
const amount = observables.length;
// create your updating state that contains the last index and value
const createState = (value, index) => ({ index, value });
/*
* This function updates your state at every emit
* Keep in mind that updateState contains 3 functions:
* 1. is called directly: updateState(index, amount)
* 2. is called by the map operator: map(val => updateState(index, amount)(val)) -> Its just shorthand written
* 3. is called by the scan operator: fn(state)
*/
const updateState = (index, amount) => update => state =>
// Check initial object for being empty and index 0
Object.keys(state).length == 0 && index == 0
// Check if new index is one higher
|| index == state.index + 1
// Check if new index is at 0 and last was at end of observables
|| state.index == amount - 1 && index == 0
? createState(update, index)
: state
// Function is used to avoid same index emit twice
const noDoubleEmit = (prev, curr) => prev.index == curr.index
return merge(
...observables.map((observable, index) =>
observable.pipe(map(updateState(index, amount)))
)
).pipe(
scan((state, fn) => fn(state), {}),
filter(state => Object.keys(state).length != 0),
distinctUntilChanged(noDoubleEmit),
map(state => state.value),
);
}
const result$ = toggleEmit(source1$, source2$);
result$.subscribe(console.log);
source2$.next(0);
source1$.next(1);
source2$.next(2);
source2$.next(3);
source1$.next(4);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.5.3/rxjs.umd.min.js"></script>
【讨论】:
您可以将您的来源放在一个数组中,并使用switchMap 和BehaviorSubject:
const sources = [a$, b$];
const source$ = new BehaviorSubject<number>(0);
const emit$ = source$.pipe(
switchMap(n => sources[n]),
map((val, i) => {
const nextIndex = (i+1) % sources.length;
source$.next(nextIndex);
return val;
})
);
细分:
source$ 发出您要收听的源的索引switchMap 订阅源并发出其值map 调用 source$.next() 发出下一个索引,然后简单地返回接收到的值请注意,这适用于 1 个或多个来源。
这是一个有效的 StackBlitz 演示。
【讨论】:
这是另一个使用repeat 的有趣示例(无论如何在我看来)。从一个 observable 中取一个值,然后从第二个 observable 中取另一个值,然后重复:
import { concat, interval, of } from 'rxjs';
import { take, switchMap, repeat, mapTo } from 'rxjs/operators';
const left = interval(10000).pipe(mapTo('left'));
const right = interval(1000).pipe(mapTo('right'));
const example = left.pipe(
take(1),
switchMap(leftVal => concat(of(leftVal), right.pipe(take(1)))),
repeat()
);
console.clear();
const subscribe = example.subscribe(val => console.log('output', val));
【讨论】: