【问题标题】:RxJS reduce doesn't continueRxJS减少不继续
【发布时间】:2015-11-08 16:04:45
【问题描述】:

为什么 flatMap 不会触发下游缩减?

我得到如下代码:

handleFiles.flatMap(files =>
  Rx.Observable.from(files).
  flatMap((file, i) => fileReader(file, i)).
  reduce((form, file, i) => {
    form.append('file[' + i + ']', result);
    console.log('reduce step', file);
    return form;
  }, new FormData()).
  tap(console.log.bind(console, 'after reduce'))
).
subscribe(console.log.bind(console, 'response'));

问题是“减少后”点击永远不会被击中。为什么?

日志是这样的:

reduce step [data]
reduce step [data]

截图:

【问题讨论】:

    标签: javascript reactive-programming rxjs reactive-extensions-js


    【解决方案1】:

    问题不在flatMap;这妨碍了reduce 的工作方式。

    reduce 读取整个流并将其缩减为单个值,仅在源流关闭时发出。如果您的from(files) 流没有结束,那么reduce 将永远不会输出它的值。

    尝试改用scan;它会发出每个中间步骤,似乎正是您要寻找的。​​p>

    【讨论】:

    • Files 是一个 JS 数组,简单明了。我添加了一个屏幕截图。我之前进行过扫描并且确实有效...但是由于它是一个 JS 数组聚合应该执行 onComplete,对吗?
    • 不过,这是一个不错的选择。我发现了我的错误;未完成 fileReader 中的主题。呵呵!
    【解决方案2】:

    如果 files 是一个数组,那么如果从 fileReader 返回的 observable 存在,reduce 应该终止。所以对于这段代码,问题在于 fileReader 返回了一个未完成的 observable。

    【讨论】:

      【解决方案3】:

      这是一个使用 reduce 和非终止 observable 的示例;

      使用窗口时间:

      import { fromEvent, interval, timer } from 'rxjs';
      import { reduce, filter, windowTime, map, mergeMap } from 'rxjs/operators';
      
      const interval$ = interval(1000);
      const observable = interval$.pipe(
        windowTime(2000), // each window is 2s
        mergeMap(window$ => window$.pipe(
          reduce((a,x) => { // reduce to array
            return [...a,x];
          }, []),
          filter(x => !!x.length) // in the background timer is still running so suppress empty events
        )), // flatten the Observable-of-Observables
      );
      const subscription = observable.subscribe(x => console.log(x));
      setTimeout(() => subscription.unsubscribe(), 10000);
      

      使用缓冲时间:

      import { fromEvent } from 'rxjs';
      import { bufferTime, filter, map } from 'rxjs/operators';
      
      let count = 1;
      const clicks = fromEvent(document, 'click');
      const observable = clicks.pipe(
        bufferTime(1000), // batch into array every 1s
        filter(x => !!x.length), // ignore events without clicks
        map(x => x.reduce((a,y) => ({...a, [count++]: y}), {})),
      );
      observable.subscribe(x => console.log(x));
      

      使用审计时间:

      import { fromEvent } from 'rxjs';
      import { tap, auditTime, map } from 'rxjs/operators';
      
      let buffer = [];
      const clicks = fromEvent(document, 'click');
      const observable = clicks.pipe(
        tap((event) => buffer.push(event)),
        auditTime(1000), // buffer every 1s after 1st click is detected
        map((_lastEvent) => { // ignore last event
          const events = buffer; // save off buffer
          buffer = []; // clear buffer
          return events.reduce((a,e,i) => ({...a, [i]: e}),{});
        }),
      );
      observable.subscribe((events) => console.log(events));
      

      使用 takeUntil 并重复:

      注意:take/repeat 将重置 observable(即,interval-counter 保持在 0 并且事件可能会丢失)

      import { fromEvent, timer, interval } from 'rxjs';
      import { takeUntil, reduce, repeat, filter } from 'rxjs/operators';
      
      const interval$ = interval(1000);
      const timer$ = timer(2000);
      
      const observable = interval$.pipe(
        takeUntil(timer$), // unsubscribe from stream every 2s so reduce terminates
        reduce((acc, event) => [...acc, event], []), // reduce to array of events
        filter(x => !!x.length), // suppress emission of empty stream
        repeat(), // resubscribe to stream
      );
      // console will only show array of [0] since takeUntil stops right when interval emits
      const subscription = observable.subscribe(x => console.log(x));
      setTimeout(() => subscription.unsubscribe(), 10000);
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2019-09-19
        • 1970-01-01
        • 2018-11-30
        • 1970-01-01
        相关资源
        最近更新 更多