【问题标题】:Pause RxJS stream based on a value in the stream根据流中的值暂停 RxJS 流
【发布时间】:2019-06-25 05:53:56
【问题描述】:

我有一个简单的组件,带有一个按钮,可以启动和暂停由 RxJS 计时器生成的数字流。

import { Component, OnInit } from '@angular/core';
import { BehaviorSubject, Observable, timer, merge } from 'rxjs';
import { filter, bufferToggle, windowToggle, mergeMap, mergeAll, share } from 'rxjs/operators';

@Component({
  selector: 'my-app',
  template: `<button (click)="toggle()">{{ (active$ | async) ? 'Pause' : 'Play' }}</button>`,
  styleUrls: [ './app.component.css' ]
})
export class AppComponent implements OnInit {
  active$ = new BehaviorSubject<boolean>(true);

  ngOnInit(): void {
    const on$ = this.active$.pipe(filter(v => v));
    const off$ = this.active$.pipe(filter(v => !v));

    const stream$ = timer(500, 500).pipe(share());

    const out$ = merge(
      stream$.pipe(
        bufferToggle(off$, () => on$),
        mergeAll(),
      ),
      stream$.pipe(
        windowToggle(on$, () => off$),
        mergeAll(),
      ),
    );

    out$.subscribe(v => console.log(v));
  }

  toggle(): void {
    this.active$.next(!this.active$.value);
  }
}

这很好用,但我需要再添加一项功能!

我需要根据流中满足条件的值自动暂停流。

例如,如果最新值为 5 的倍数,则暂停流。


你有什么想法吗?

这是 stackblitz https://stackblitz.com/edit/angular-6hjznn 上的一个可运行示例

【问题讨论】:

  • 那么预期的输出应该是什么?
  • 顺序应该一样。不仅通过单击按钮 bat 还应通过接收流中的某些值来暂停流。

标签: javascript angular rxjs


【解决方案1】:

可以 (1) 扩展您当前的 bufferToggle / windowToggle 方法或 (2) 使用自定义缓冲区实现。

1。扩展bufferToggle / windowToggle方法

你可以在bufferToggle之后添加一个数组到算子队列中。

  1. bufferToggle 发出时,将这些值附加到数组中。
  2. 从数组中取值,直到数组中的某个元素与停止条件匹配。
  3. 发出这些值并暂停您的流。

可暂停 (Demo)

pausable 运算符将发出与停止条件匹配的值,然后立即停止流。

export function pausable<T, O>(
  on$: Observable<any>, // when on$ emits 'pausable' will emit values from the buffer and all incoming values 
  off$: Observable<O>, // when off$ emits 'pausable' will stop emitting and buffer incoming values
  haltCondition: (value: T) => boolean, // if 'haltCondition' returns true for a value in the stream the stream will be paused
  pause: () => void, // pauses the stream by triggering the given on$ and off$ observables
  spread: boolean = true // if true values from the buffer will be emitted separately, if 'false' values from the buffer will be emitted in an array
) {
  return (source: Observable<T>) => defer(() => { // defer is used so that each subscription gets its own buffer
    let buffer: T[] = [];
    return merge(
      source.pipe(
        bufferToggle(off$, () => on$),
        tap(values => buffer = buffer.concat(values)), // append values to your custom buffer
        map(_ => buffer.findIndex(haltCondition)), // find the index of the first element that matches the halt condition
        tap(haltIndex => haltIndex >= 0 ? pause() : null), // pause the stream when a value matching the halt condition was found
        map(haltIndex => buffer.splice(0, haltIndex === -1 ? customBuffer.length : haltIndex + 1)), // get all values from your custom buffer until a haltCondition is met
        mergeMap(toEmit => spread ? from(toEmit) : toEmit.length > 0 ? of(toEmit) : EMPTY) // optional value spread (what your mergeAll did)
      ),
      source.pipe(
        windowToggle(on$, () => off$),
        mergeMap(x => x),
        tap(value => haltCondition(value) ? pause() : null), // pause the stream when an unbuffered value matches the halt condition
      ),
    );
  });
}

您可以根据您的特定需求调整此运算符,例如使用较少的输入参数并将share合并到其中,请参阅this version with less parameters

用法

active$ = new BehaviorSubject<boolean>(true);
on$ = this.active$.pipe(filter(v => v));
off$ = this.active$.pipe(filter(v => !v));

interval(500).pipe(
  share(),
  pausable(on$, off$, v => this.active$.value && this.pauseOn(v), () => this.active$.next(false))
).subscribe(console.log);

pauseOn = (value: number) => value > 0 && value % 10 === 0

2。完全自定义的缓冲区

您可以采用完全自定义的方法,只使用一个类似于Brandon's approach 的可观察输入。

bufferIf (Demo)

bufferIf 将在给定的 condition 发出 true 时缓冲传入的值,并在 conditionfalse 时从缓冲区发出所有值或传递新值。

export function bufferIf<T>(condition: Observable<boolean>) {
  return (source: Observable<T>) => defer(() => {
    const buffer: T[] = [];
    let paused = false;
    let sourceTerminated = false;
    return merge( // add a custon streamId to values from the source and the condition so that they can be differentiated later on
      source.pipe(map(v => [v, 0]), finalize(() => sourceTerminated = true)),
      condition.pipe(map(v => [v, 1]))
    ).pipe( // add values from the source to the buffer or set the paused variable
      tap(([value, streamId]) => streamId === 0 ? buffer.push(value as T) : paused = value as boolean), 
      switchMap(_ => new Observable<T>(s => {
        setTimeout(() => { // map to a stream of values taken from the buffer, setTimeout is used so that a subscriber to the condition outside of this function gets the values in the correct order (also see Brandons answer & comments)
          while (buffer.length > 0 && !paused) s.next(buffer.shift())
        }, 0)
      })), // complete the stream when the source terminated and the buffer is empty
      takeWhile(_ => !sourceTerminated || buffer.length > 0, true) 
    );
  })
} 

用法

pause$ = new BehaviorSubject<boolean>(false);

interval(500).pipe(
  bufferIf(this.pause$),
  tap(value => this.pauseOn(value) ? this.pause$.next(true) : null)
).subscribe(console.log);

pauseOn = (value: number) => value > 0 && value % 10 === 0

【讨论】:

    【解决方案2】:

    这是一个自定义的暂停操作符,当暂停信号为true 时,它只会将值累积到缓冲区中,并在暂停信号为false 时将它们一一发出。

    将它与一个简单的tap 运算符结合使用,以在值达到特定条件时切换行为主体暂停信号,并且您有一些东西会在按钮单击时暂停,并且在值满足条件时也会暂停(12 的倍数)本例):

    这里是pause 运算符:

    function pause<T>(pauseSignal: Observable<boolean>) {
      return (source: Observable<T>) => Observable.create(observer => {
        const buffer = [];
        let paused = false;
        let error;
        let isComplete = false;
    
        function notify() {
          while (!paused && buffer.length) {
            const value = buffer.shift();
            observer.next(value);
          }
    
          if (!buffer.length && error) {
            observer.error(error);
          }
    
          if (!buffer.length && isComplete) {
            observer.complete();
          }
        }
    
        const subscription = pauseSignal.subscribe(
          p => {
            paused = !p;
            setTimeout(notify, 0);
          },
          e => {
            error = e;
            setTimeout(notify, 0);
          },
          () => {});
    
        subscription.add(source.subscribe(
          v => {
            buffer.push(v);
            notify();
          },
          e => {
            error = e;
            notify();
          },
          () => {
            isComplete = true;
            notify();
          }
        ));
    
        return subscription;
      });
    }
    

    这是它的用法:

    const CONDITION = x => (x > 0) && ((x % 12) === 0); // is multiple
    this.active$ = new BehaviorSubject<boolean>(true);
    const stream$ = timer(500, 500);
    const out$ = stream$.pipe(
      pause(this.active$),
      tap(value => {
        if (CONDITION(value)) {
          this.active$.next(false);
        }
      }));
    
    this.d = out$.subscribe(v => console.log(v));
    

    还有一个工作示例:https://stackblitz.com/edit/angular-bvxnbf

    【讨论】:

    • 有一个小错误。如果在暂停后我等待大约 10 秒然后按“播放”它会发出接下来的 12 个项目并停止(这没关系),但按钮文本是“暂停”(应该是“播放”)
    • 谢谢。所以 BehaviorSubject 有一个“特性”,如果你递归地设置值,那么观察者会收到乱序的值。我想我通过在暂停信号触发时在对notify 的调用周围使用setTimeout 解决了这个问题。可能有一种方法可以使用 observeOn 和调度程序来解决这个问题,但是 shrug
    【解决方案3】:

    这是一个简单的方法。将timer() 用作发射器,并单独增加计数。这使您可以更直接地进行控制。

    export class AppComponent implements OnInit {
      active = true;
      out$: Observable<number>;
    
      count = 0;
    
      ngOnInit(): void {
    
        const stream$ = timer(500, 500);
    
        this.out$ = stream$.pipe(
          filter(v => this.active),
          map(v => {
            this.count += 1;
            return this.count;
          }),
          tap(v => {
            if (this.count % 5 === 0) {
              this.active = false;
            }
          })
        )
      }
    
    }
    

    https://stackblitz.com/edit/angular-nzs7zh

    【讨论】:

    • 这错过了有缓冲区的部分。我们不必停止价值的释放。我们必须将它们保存在缓冲区中并暂停打印。
    • @TarasHupalo 使用缓冲区只是一种不同的方式。你从使用缓冲区中得到了什么,而你这样做没有得到什么?
    • 这种方法过滤流。我不想失去价值。发射器可以是其他东西然后timer()
    【解决方案4】:

    我假设所需的行为与获取计时器本身发出的值无关,而不是暂停对正在进行的流的通知(在您的示例中,即使我们没有看到,计时器也会继续正在打印的值),暂停时实际上停止发射是可以的。

    我的解决方案灵感来自Stopwatch recipe

    下面的解决方案使用两个单独的按钮来播放和暂停,但您可以根据自己的喜好进行调整。我们在组件的 ngAfterViewInit 钩子中将 (ViewChild) 按钮传递给服务,然后订阅流。

    // pausable.component.ts
      ngAfterViewInit() {
        this.pausableService.initPausableStream(this.start.nativeElement, this.pause.nativeElement);
    
        this.pausableService.counter$
          .pipe(takeUntil(this.unsubscribe$)) // don't forget to unsubscribe :)
          .subscribe((state: State) => {
            console.log(state.value); // whatever you need
        });
      }
    
    // pausable.service.ts
    import { Injectable } from '@angular/core';
    
    import { merge, fromEvent, Subject, interval, NEVER } from 'rxjs';
    import { mapTo, startWith, scan, switchMap, tap, map } from 'rxjs/operators';
    
    export interface State {
      active: boolean;
      value: number;
    }
    
    @Injectable({
      providedIn: 'root'
    })
    export class PausableService {
    
      public counter$;
    
      constructor() { }
    
      initPausableStream(start: HTMLElement, pause: HTMLElement) {
    
        // convenience functions to map an element click to a result
        const fromClick = (el: HTMLElement) => fromEvent(el, 'click');
        const clickMapTo = (el: HTMLElement, obj: {}) => fromClick(el).pipe(mapTo(obj));
    
        const pauseByCondition$ = new Subject();
        const pauseCondition = (state: State): boolean => state.value % 5 === 0 && state.value !== 0;
    
        // define the events that may trigger a change
        const events$ = merge(
          clickMapTo(start, { active: true }),
          clickMapTo(pause, { active: false }),
          pauseByCondition$.pipe(mapTo({ active: false }))
        );
    
        // switch the counter stream based on events
        this.counter$ = events$.pipe(
          startWith({ active: true, value: 0 }),
          scan((state: State, curr) => ({ ...state, ...curr }), {}),
          switchMap((state: State) => state.active
            ? interval(500).pipe(
              tap(_ => ++state.value),
              map(_ => state))
            : NEVER),
          tap((state: State) => {
            if (pauseCondition(state)) {
              pauseByCondition$.next(); // trigger pause
            }
          })
        );
      }
    
    }
    
    

    【讨论】:

      【解决方案5】:

      使用windowToggle 并使用 active.next(false) 尽可能简单 工作示例:https://stackblitz.com/edit/angular-pdw7kw

       defer(() => {
            let count = 0;
            return stream$.pipe(
              windowToggle(on$, () => off$),
              exhaustMap(obs => obs),
              mergeMap(_ => {
                if ((++count) % 5 === 0) {
                  this.active$.next(false)
                  return never()
                }
                return of(count)
              }),
            )
          }).subscribe(console.log)
      

      【讨论】:

      • 条件可以通过始终基于值而不是计数器而有所不同。 5 的倍数只是一个例子。
      • 你可以在 defer 运算符中定义任何局部变量来控制你的逻辑
      【解决方案6】:

      您的示例实际上非常接近工作解决方案,不需要新的自定义运算符。

      请参阅此处的“缓冲”部分:

      https://medium.com/@kddsky/pauseable-observables-in-rxjs-58ce2b8c7dfd

      这里的工作示例:

      https://thinkrx.io/gist/cef1572743cbf3f46105ec2ba56228cd

      它使用您已经拥有的相同方法,使用 bufferTogglewindowToggle,看起来主要区别是您需要 share 您的暂停/活动主题-

      【讨论】:

      • 这不能回答问题。 OP 想要一个基于流中满足给定条件的值暂停的流。该值也可能来自缓冲区,因此不会在每次取消暂停时释放整个缓冲区。
      猜你喜欢
      • 2021-06-12
      • 2015-06-15
      • 1970-01-01
      • 2021-02-19
      • 2016-12-17
      • 2012-09-18
      • 1970-01-01
      • 1970-01-01
      • 2019-05-06
      相关资源
      最近更新 更多