【问题标题】:Get one event per 1000 milliseconds interval每 1000 毫秒间隔获取一个事件
【发布时间】:2020-04-01 16:26:16
【问题描述】:

我有无限的事件流,可以发出一些连续的事件部分,我想每毫秒每 1000 个事件发生一个。

我尝试了debounceTime/auditTime/throttleTime,但它们不包括我想要的所有事件 - 以演示我创建的行为 playground on stackblitz,它在 10 个事件的一部分中每 300 毫秒触发一次事件:

  • debounceTime(1000) 将只给出事件#10

  • throttleTime(1000) 将给出事件 1,5,9 但它会省略 #10 这是必不可少的

  • auditTime(1000) 将提供事件4,8

我在这里想要的是获取事件1,5,9,10(每 1000 毫秒间隔一个事件)。我如何做到这一点?

const events$ = interval(300).pipe(
  map(num => `Firing event ${num + 1}`)
);

const source = events$.pipe(
  tap(console.log),
  take(10),
  // make some debouncing??
  map(x => `Received ${x}!`)
);

source.subscribe(x =>
  console.log(
    "%c" + x,
    "background: purple; color: white; padding: 3px 5px; border-radius: 3px;"
  )
);

我还尝试使用zip / combineLatest 并通过间隔but no luck with that 发出值

【问题讨论】:

  • 你能解释一下最后一个是什么意思吗?你想在 1000ms 和最后一个已知值之后得到它的当前值吗?所以这将是 1 的历史链,对吗?
  • 我想每 1000 毫秒发出不超过 1 个事件,油门现在将省略 #10,因为它在 #9 之后太快了 - 我希望它触发 #10 作为它最后的唯一事件1000ms 间隔
  • 那应该怎么带10? 1 默认为输入,2/300 输出,3/600 输出,4/900 输出,5/1200 输入,6/300 输出,7/600 输出,8/900 输出,9/1200 输入,10/300 -出去。或者你想算作1/0进、2/300出、3/600出、4/900出、/1000复位、5/200进、6/500出、7/800出、1000/复位、8/100 进、9/400 出、10/700 出……也不行。
  • 如果你想要 10 个只是因为它是第 10 个并且你期望 10 个组发射 - 请告诉我。
  • 好的,明白了你的意思,我不希望正好有 10 个 - 我希望有一组事件,如果限制它们,我可能会丢失最后一个重要的事件。因此,如果在当前 1000 毫秒间隔内的最新事件之后没有更多事件发生 - 它应该被流式传输。我考虑过通过merge(events$.throttleTime(1000), events$.debounceTime(1000) 组合流,但我不确定这会涵盖所有情况

标签: rxjs reactivex


【解决方案1】:

更新

基于cmets中的讨论

const events$ = timer(0, 6000).pipe(
  take(3),
  switchMap(x =>
    timer(0, 300).pipe(
      map(num => `event #${num + 1}`),
      take(x > 1 ? 9 : 10)
    )
  )
);

const source = merge(
  events$.pipe(
    tap(e => console.log(`%cStream: ${e}`, "color: blue;")),
    debounceTime(1000),
    tap(x => console.log(`%cdebounceTime captured: ${x}`, "color: red;"))
  ),
  events$.pipe(
    throttleTime(1000),
    tap(x => console.log(`%cthrottleTime captured: ${x}`, "color: green;"))
  ),
).pipe(
  // we need to avoid duplicates (like case with 9).
  // if all events aren't unique you need to use the original solution below.
  distinctUntilChanged(), // <-- if all events are unique.
  map(x => `Received ${x}!`)
);

source.subscribe(x =>
  console.log(
    "%c" + x,
    "background: purple; color: white; padding: 3px 5px; border-radius: 3px;"
  )
);

原创

我希望这就是你想要的:https://take.ms/VP7tA

const events$ = interval(300).pipe(
    map(num => `Firing event ${num + 1}`)
);

const source = concat(events$.pipe(
    tap(console.log),
    take(10),
), timer(1000).pipe(switchMapTo(EMPTY)), events$.pipe(
    tap(console.log),
    take(10),
));

let lastTimer = 0;
const last$ = new Subject<number>();
merge(
    source.pipe(
      scan((state, event) => {
        state[1] = null;
        const stamp = new Date().getTime();
        clearTimeout(lastTimer);
        if (stamp - state[0] < 1000) {
          lastTimer = setTimeout(() => last$.next(event), (stamp - state[0]) + 50);
          return state;
        }
        state[0] = stamp;
        state[1] = event;
        return state;
      }, [0, null]),
      filter(([, event]) => event !== null),
      map(([, event]) => event || 0),
    ),
    last$,
).pipe(
    map(x => `Received ${JSON.stringify(x)}!`)
).subscribe(x =>
    console.log(
        "%c" + x,
        "background: purple; color: white; padding: 3px 5px; border-radius: 3px;"
    )
);

【讨论】:

  • 老兄,这真是太棒了,但它不是过于复杂吗?您如何看待同时使用 2 个流 - 我的解决方案有什么缺陷? stackblitz.com/edit/rxjs-take-once-per-second?file=index.ts
  • 我不会说它过于复杂。我会说它没有经过打磨:) 编写自己的 rx 运算符并将此逻辑放入其中肯定是有意义的,例如throttleWithLast(1000)。但为此我需要更多时间,你知道的。你需要记住的是last$ 没有完成,如果你不想实现throttleWithLast,你需要处理它。在您拥有的代码中 - 将take 的数量减少到take(9)。然后你得到第 9 个事件 2 次。除此之外它看起来比我的好多了:)
  • 看起来distinctUntilChanged 可以用 9 完成这项工作。如果没问题,我已经根据您的解决方案更新了响应。
  • 是的,这对于简单值来说就像一个魅力 - 我拥有的是代表整个应用程序状态的巨大 source$ 对象,我需要将完整更新传递给订阅者,我无法比较它们很容易-我不能通过distinctUntilChanged((prevState, nextState) =&gt; prevState.prop === nextState.prop)依赖一些state$.prop,因为它们中的任何一个都可以改变-只有deepEqual会在这里有所帮助,但我相信这会显着减慢应用程序(
  • 再次嗨 :) 它绝对有意义并简化了实现,一旦你有一种简单的方法来区分数据 debounced$ / throttled$ / distinctUntilChanged 将提供一个直接且可读的解决方案。
【解决方案2】:

这是解决此问题的一种方法:

const events$ = interval(300).pipe(
  map(num => `Firing event ${num + 1}`),
  share(),
);

const source$ = events$.pipe(
  take(10),
);

const lastValue$ = source$.pipe(last());

merge(source$.pipe(throttleTime(1000)), lastValue$).subscribe(console.log);

share() 将确保源(由 interval 生成的 observable)只订阅一次

last() 将在其源(本例中为source$)由于take(10) 而完成时返回最新的nexted

编辑

根据我对问题的理解,这将是另一种选择:

const events$ = merge(
  timer(300), // 1
  timer(600), // 2
  timer(900), // 3
  timer(1301), // 4
  timer(1500), // 5
  timer(1800), // 6
  timer(2302), // 7
).pipe(
  map((_, num) => `Firing event ${num + 1}`),
  share(),
);

const lastValue$ = events$.pipe(debounceTime(1000));

const source$ = events$.pipe(
  throttleTime(1000),
  buffer(lastValue$)
)
.subscribe(console.log)
  • debounceTime(1000) - 如果 1 秒过去而没有任何通知

  • buffer(lastValue$) - 当lastValue$ 发出时,发送收集的值

StackBlitz

【讨论】:

  • 对不起,我没有预先调整大小 - 编辑了我的问题,流是无限的,10 个事件只是事件部分的示例,所以我不能使用 last()。我无尽的流给出了部分事件 - 例如它提供了 10 个间隔为 300 毫秒的事件,我需要以 1000 毫秒的时间限制它们,但我还需要当前部分的最后一个事件,之后我将继续观察流
  • 我认为你仍然可以使用 last(),因为它会因为 take() 而完成。也因为 take(n),当达到 n 个排放量时,它会从源中取消订阅,所以如果你仍然想接收值,你可以使用 bufferCount;我会更新我的答案
  • @godblessstrawberry 已更新。
  • 浪费了很多时间,这值得一些反馈。如果我能对我的答案中的错误/误导性得到一些解释,我将不胜感激。谢谢。
  • mate 我很欣赏你的回答并且没有对此投反对票,尽管你的解决方案似乎不起作用 - 我在 stackblitz.com/edit/… 尝试了 9 个事件的 3 个部分 - 它省略了最后一个 #9,而且它提供了数组同时发生 3 个事件 - 而不是在它们实际触发时
猜你喜欢
  • 2013-09-27
  • 2013-05-26
  • 1970-01-01
  • 2011-05-27
  • 2018-11-16
  • 1970-01-01
  • 2013-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多