【问题标题】:How do I sequence actions in RxJS/redux-observable vs redux-saga?如何在 RxJS/redux-observable 和 redux-saga 中对操作进行排序?
【发布时间】:2020-07-08 04:47:15
【问题描述】:

我已经开始深入学习 RxJs,其中一个原因是掌握了redux-observable 副作用的方法,但我发现 sagas 更方便和“声明性”。我已经学习了merge/flat/concat/switchMap 操作符,但它并没有帮助我弄清楚如何在 rxjs 中对事物进行排序。

这是我所说的“排序”的示例,在 Timer 应用程序的实例上,可能会在一段时间后安排启动,使用 redux-saga 实现:

export function* timerSaga() {
  while (true) {
    yield take('START');

    const { startDelay } = yield select(); // scheduled delay

    const [cancelled] = yield race([
      take('CANCEL_START'),
      delay(startDelay)
    ]);

    if (!cancelled) {
      yield race([
        call(function*() {
           while (true) {
             yield delay(10);
             yield put({ type: 'TICK' });
           }
        }),
        take(['STOP', 'RESET']
      ]);
    }
  }
}

我发现这个例子在逻辑上非常一致和清晰。我不知道如何用 redux-observable 来实现它。请简单地给我一些代码,它可以重现相同的逻辑,但使用 rxjs 运算符。

【问题讨论】:

    标签: javascript reactjs rxjs redux-saga redux-observable


    【解决方案1】:

    在 sagas(生成器)和 epics(observables)之间,改变很重要 你思考事件如何到达你的代码的方式。

    生成器满足迭代器和可迭代协议,其中涉及拉动 来自源的值/事件(在本例中为 Redux 操作)和阻塞 执行直到这些事件到达。

    Observables 是 push 而不是 pull。我们描述和命名事件流 我们感兴趣的,然后我们订阅它们。没有阻塞 调用,因为我们所有的代码都是由事件发生时触发的。

    此代码复制了 saga 示例中的行为。

    import { interval, timer } from 'rxjs';
    import { withLatestFrom, mapTo, exhaustMap, takeUntil } from 'rxjs/operators';
    import { ofType } from 'redux-observable';
    
    const myEpic = (action$, state$) => {
      // A stream of all the "cancel start" actions
      const cancelStart$ = action$.pipe(ofType('CANCEL_START'));
    
      // This observable will emit delayed start events that are not cancelled.
      const delayedCancellableStarts$ = action$.pipe(
        // When a start action occurs...
        ofType('START'), 
    
        // Grab the latest start delay value from state...
        withLatestFrom(state$, (_, { startDelay }) => startDelay),
    
        exhaustMap(
          // ...and emit an event after our delay, unless our cancel stream
          // emits first, then do nothing until the next start event arrives.
    
          // exhaustMap means we ignore all other start events while we handle
          // this one.
          (startDelay) => timer(startDelay).pipe(takeUntil(cancelStart$))
        )
      );
    
      // On subscribe, emit a tick action every 10ms
      const tick$ = interval(10).pipe(mapTo({ type: 'TICK' }));
    
      // On subscribe, emit only STOP or RESET actions
      const stopTick$ = action$.pipe(ofType('STOP', 'RESET'));
    
      // When a start event arrives, start ticking until we get a message to
      // stop. Ignore all start events until we stop ticking.
      return delayedCancellableStarts$.pipe(
        exhaustMap(() => tick$.pipe(takeUntil(stopTick$)))
      );
    };
    

    重要的是,即使我们正在创建和命名这些可观察的流,它们的行为也是惰性的 - 在订阅之前它们都不会被“激活”,而当您向 redux-observable 中间件提供这个史诗般的函数时就会发生这种情况。

    【讨论】:

    • 非常感谢您提供这样一个解释性的答案!现在一切都清楚了!
    【解决方案2】:

    我假设 take() 返回一个 observable,没有测试代码。它可能可以转换为如下所示的 rx 时尚。

    这里的关键是repeat()takeUntil()

    // outter condition for starting ticker
    forkJoin(take('START'), select())
        .pipe(
            switchMap(([, startDelay]) =>
                // inner looping ticker
                timer(10).pipe(switchMap(_ => put({type: 'TICK'})), repeat(),
                    takeUntil(race(
                        take('CANCEL_START'),
                        delay(startDelay)
                    ))
                )
                /////////////////////
            )
        )
    

    【讨论】:

    • 此解决方案涉及 RxJS 运算符和 redux-saga 实用程序函数(selecttakeputdelay)的混合,并且不会按预期工作。跨度>
    • 您可以将 take 替换为 hot observable 并将 select 替换为 state observable,答案是演示如何链接和组合您想要的逻辑。
    猜你喜欢
    • 2020-01-17
    • 1970-01-01
    • 2018-02-24
    • 2018-06-29
    • 1970-01-01
    • 2023-03-14
    • 1970-01-01
    • 2018-03-16
    • 2019-02-24
    相关资源
    最近更新 更多