【问题标题】:rxjs multiple observable switchrxjs 多个可观察开关
【发布时间】:2021-08-22 03:53:45
【问题描述】:

我有 2 个 observables,我想实现以下目标:

Observable1中获取值,然后忽略Observable1,只等待Observable2中的值那么同样,从Observable1 以此类推..

有没有办法在 rxjs 中实现这一点?操作决策树没有用。我也玩过switchMap(),但这只能做第一部分

【问题讨论】:

    标签: typescript rxjs


    【解决方案1】:

    因为是星期五,所以我写了一个实现。

    这是 javascript 不是打字稿,但你可以看到我是如何解决它的。

    function roundRobinMerge(...observables) {
      const indexedObservables = observables.map(
        (obs, index) =>
            obs.pipe(
                map(
                   v => ({
                        index: index,
                        value: v
                    })
              )
          )
      );
      let nextIndexToListenTo = 0;
      return merge(...indexedObservables)
        .pipe(
            filter(n => n.index === nextIndexToListenTo),
            tap(n => nextIndexToListenTo = (n.index + 1)%indexedObservables.length),
            map(n => n.value)
        );
    }
    

    要查看此操作,请转至 https://rxviz.com,然后复制并粘贴以下代码

    const { interval, merge } = Rx;
    const { take, map, filter, tap, share } = RxOperators;
    
    function roundRobinMerge(...observables) {
      const indexedObservables = observables.map(
        (obs, index) =>
            obs.pipe(
                map(
                v => ({
                        index: index,
                        value: v
                    })
              )
          )
      );
      let nextIndexToListenTo = 0;
      return merge(...indexedObservables)
        .pipe(
            filter(n => n.index === nextIndexToListenTo),
            tap(n => nextIndexToListenTo = (n.index + 1)%indexedObservables.length),
            map(n => n.value)
        );
    }
    
    // Now to demonstrate it
    function mapper(label) {
      let index = 0;
      return () => `${label}${++index}`;
    };
    
    const a = interval(750)
        .pipe(
            map(mapper('a')),
            take(10),
          share()
       );
    
    const b = interval(1300)
       .pipe(
         map(mapper('b')),
         take(10),
         share()
       );
    
    
    merge(
      [
        a,
        b,
        roundRobinMerge(a, b)
      ]
    );
    
    

    【讨论】:

      【解决方案2】:

      对此可能有更优雅的解决方案,但我通常会使用scan 并创建一个little state machine

      import { merge, Subject } from 'rxjs';
      import { scan, filter, map, take } from 'rxjs/operators';
      
      const left = new Subject<number>();
      const right = new Subject<number>();
      const example = merge(
        left.pipe(map(val => ({ tag: 'left', val }))),
        right.pipe(map(val => ({ tag: 'right', val })))
      ).pipe(
        scan<{ tag: string; val: number }, { previous: string; emit?: number }>(
          (acc, { tag, val }) => {
            if (acc.previous !== tag) {
              return { previous: tag, emit: val };
            }
            return { ...acc, emit: null };
          },
          {
            previous: 'right',
            emit: null
          }
        ),
        filter(x => x.emit !== null),
        map(x => x.emit)
      );
      console.clear();
      const subscribe = example
        .pipe(take(10))
        .subscribe(val => console.log('output', val));
      
      right.next(1);
      left.next(2);
      right.next(3);
      left.next(4);
      left.next(5);
      right.next(6);
      
      

      【讨论】:

      • 我在 stackblitz 中检查了您的解决方案,并按以下方向(obs2、obs1、obs2、obs2、obs1)填充了样本发射(obs1 和 obs2)。您的解决方案似乎不适用于所需的边缘情况。如果 observable 2 最初发出该值,尽管它应该从 obs 1 开始。如果一个 observable 双重发出 (obs1 - obs2 - obs2 - obs1),那么 obs2 值会被发出,尽管它不应该发出。
      • @JonathanStellwag 我已对其进行了更新,以便 obs1(在我的示例中留下)将首先发出。我不确定你的另一点。在我的例子中,左边的发射频率是右边的五倍,它给了我预期的结果。是由于日志语句造成的混乱吗?只有 output x 应该是交替的,而不是 merged x 可能重复
      • 我指的是:“然后忽略 Observable1,只等待来自 Observable2 的值”。我在 Stackblitz 中测试了这个用例,但它不起作用。 observable 1 连续发射。
      • 通过改变发出的值,你改变了扫描函数的行为。我使用字符串“left”和“right”来区分不同的 observables。我已编辑您的stackblitz 以更正它
      • 对不起!我没有详细阅读您的管道并在这里犯了错误。我现在看到了你的解决方案。不过,您的解决方案仅适用于固定值。要解决此问题,您可以在扫描中创建一个状态,该状态创建一个元组/对象,该元组/对象包含可观察对象的索引和值。然后您的管道也可以处理动态值(不仅是左/右)。但我必须感谢它适用于您给定的价值观。再次 - 抱歉,我没有看到这一点。
      【解决方案3】:

      您可能可以通过 Subjects 实现您想要的游戏。

      这是一个可能的解决方案。

      您从创建某种开关 的概念开始,该开关在Observable1Observable2 通知时随时触发。一旦 switch 发出,我们将switchMap 发送到源可观察对象之一,即Observable1Observable2),我们等待它通知,然后我们将switchMap 发送到另一个,我们再次等待它通知,最后我们next 开关重新开始。

      看代码大概逻辑更清楚了。

      // these are the 2 switches
      const switch1 = new Subject<any>();
      const switch2 = new Subject<any>();
      
      // res1 and res2 are the Subjects that notify the results we want
      const res1 = new Subject<any>();
      const res2 = new Subject<any>();
      
      // for each source Observable we create a pipe along the logic explained above
      const switchToObs2 = switch1.pipe(
        switchMap(() =>
          observable1.pipe(first()).pipe(
            tap((v) => res1.next(v)),
            switchMap(() => observable2),
            tap((v) => switch1.next(v))
          )
        )
      );
      const switchToObs1 = switch2.pipe(
        switchMap(() =>
          observable2.pipe(first()).pipe(
            tap((v) => res2.next(v)),
            switchMap(() => observable1),
            tap((v) => switch2.next(v))
          )
        )
      );
      
      // we subscribe to the 2 pipes to activate the processing
      merge(switchToObs2, switchToObs1).subscribe();
      
      // we start the switches
      switch1.next("start");
      switch2.next("start");
      
      // eventually we subscribe to the result
      merge(res2, res1).subscribe(console.log);
      

      Here a stackblitz 使用此解决方案

      【讨论】:

        【解决方案4】:

        似乎有很多方法可以解决这个问题。这是另一个expand

        import { interval } from 'rxjs';
        import { expand, mapTo, take } from 'rxjs/operators';
        
        const left = interval(1000).pipe(mapTo('left'));
        const right = interval(5000).pipe(mapTo('right'));
        const example = left.pipe(
          take(1),
          expand(x => (x === 'left' ? right : left).pipe(take(1)))
        );
        const subscribe = example.subscribe(val => console.log(val));
        

        如果有人有办法使用buffer* 运算符之一,我会很感兴趣

        【讨论】:

          【解决方案5】:

          我为您创建了一个自定义运算符 (toggleEmit),它接受 N 可观察对象并在它们之间切换。

          const result$ = toggleEmit(source1$, source2$);
          

          实现的功能:

          • N observables 可以提供给 toggleEmit 操作符
          • Observables 将一一发射并开始重复。方向是从索引0 到索引N.length。当最后一个 observable 发出时,它再次从 0 开始
          • Observable 在一次运行中只能发射一次:0 - N.length。多个发射是distincted

          仅供参考: 代码实际上非常直接。它看起来很大,因为我添加了几个 cmets 以避免混淆。如果您有问题评论,我会尽力回答。

          const { Subject, merge } = rxjs;
          const { map, scan, distinctUntilChanged, filter } = rxjs.operators;
          
          const source1$ = new Subject();
          const source2$ = new Subject();
          
          function toggleEmit(...observables) {
            // amount of all observables
            const amount = observables.length;
            
            // create your updating state that contains the last index and value
            const createState = (value, index) => ({ index, value });
          
            /*
            * This function updates your state at every emit
            * Keep in mind that updateState contains 3 functions:
            * 1. is called directly: updateState(index, amount)
            * 2. is called by the map operator: map(val => updateState(index, amount)(val)) -> Its just shorthand written
            * 3. is called by the scan operator: fn(state)
            */
            const updateState = (index, amount) => update => state =>
              // Check initial object for being empty and index 0
              Object.keys(state).length == 0 && index == 0
              // Check if new index is one higher
              || index == state.index + 1
              // Check if new index is at 0 and last was at end of observables
              || state.index == amount - 1 && index == 0
                ? createState(update, index)
                : state
            
            // Function is used to avoid same index emit twice
            const noDoubleEmit = (prev, curr) => prev.index == curr.index
          
            return merge(
              ...observables.map((observable, index) =>
                observable.pipe(map(updateState(index, amount)))
              )
            ).pipe(
              scan((state, fn) => fn(state), {}),
              filter(state => Object.keys(state).length != 0),
              distinctUntilChanged(noDoubleEmit),
              map(state => state.value),
            );
          }
          
          const result$ = toggleEmit(source1$, source2$);
          
          result$.subscribe(console.log);
          
          source2$.next(0);
          source1$.next(1);
          source2$.next(2);
          source2$.next(3);
          source1$.next(4);
          &lt;script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.5.3/rxjs.umd.min.js"&gt;&lt;/script&gt;

          【讨论】:

            【解决方案6】:

            您可以将您的来源放在一个数组中,并使用switchMapBehaviorSubject

            const sources = [a$, b$];
            const source$ = new BehaviorSubject<number>(0);
            
            const emit$ = source$.pipe(
              switchMap(n => sources[n]),
              map((val, i) => {
                const nextIndex = (i+1) % sources.length;
                source$.next(nextIndex);
                return val;
              })
            );
            

            细分:

            • source$ 发出您要收听的源的索引
            • switchMap 订阅源并发出其值
            • map 调用 source$.next() 发出下一个索引,然后简单地返回接收到的值

            请注意,这适用于 1 个或多个来源。

            这是一个有效的 StackBlitz 演示。

            【讨论】:

              【解决方案7】:

              这是另一个使用repeat 的有趣示例(无论如何在我看来)。从一个 observable 中取一个值,然后从第二个 observable 中取另一个值,然后重复:

              import { concat, interval, of } from 'rxjs';
              import { take, switchMap, repeat, mapTo } from 'rxjs/operators';
              
              const left = interval(10000).pipe(mapTo('left'));
              const right = interval(1000).pipe(mapTo('right'));
              
              const example = left.pipe(
                take(1),
                switchMap(leftVal => concat(of(leftVal), right.pipe(take(1)))),
                repeat()
              );
              console.clear();
              const subscribe = example.subscribe(val => console.log('output', val));
              

              stackblitz

              【讨论】:

                猜你喜欢
                • 1970-01-01
                • 1970-01-01
                • 2017-04-13
                • 2022-01-23
                • 2019-10-18
                • 2016-05-17
                • 2017-06-11
                • 2016-10-29
                • 1970-01-01
                相关资源
                最近更新 更多