【问题标题】:How to share an Observable when it's running and resubscribe when it's not如何在 Observable 运行时共享它并在它不运行时重新订阅
【发布时间】:2018-08-20 12:26:52
【问题描述】:

我有一个 Observable,当我订阅它时,我想要:

  • 如果正在执行,则返回当前执行的结果
  • 如果没有,请重新/执行 Observable。

例如:

我有一个wait 函数。该函数等待1秒,返回当前的time()

function wait(): Observable<number> {
  return new Observable<number>((observer) => {
    setTimeout(() => {
      observer.next((new Date).getTime());
      observer.complete();
    }, 1000);
  });
}

现在,我的代码每 0.4 秒调用一次 wait 函数。

  • 第一次调用wait(0.4 秒)将等待 1 秒并返回当前时间
  • wait 的第二次调用(0.8 秒),因为有wait 的执行,将返回与第一个点相同的可观察对象和相同的结果
  • 第三次调用wait(1.2 秒)将再次等待 1 秒,因为没有执行

【问题讨论】:

标签: rxjs rxjs6


【解决方案1】:

我不确定我是否理解您的示例,我认为第 3 次调用应该获得初始时间戳,而第 4 次调用应该获得新时间戳。

无论如何,您都可以通过像这样半共享 Observable 来做到这一点:

const timeStream = timer(1000)
    .pipe(map(() => (new Date).getTime()), share(), take(1));

function wait(): Observable<number> {
    return timeStream;
}

setTimeout(() => wait().subscribe(console.log), 400);
setTimeout(() => wait().subscribe(console.log), 800);
setTimeout(() => wait().subscribe(console.log), 1200);
setTimeout(() => wait().subscribe(console.log), 1600);

https://stackblitz.com/edit/typescript-wiqpbf

关键是 share() 和 take(1) 组合 - 本质上这将使 Observable 仅在“运行”时共享。


OP的最终解决方案:

import {Observable} from 'rxjs';
import {share, take} from 'rxjs/operators';

function myFunction(): Observable<number> {
  return new Observable<number>((observer) => {
    setTimeout(() => {
      observer.next((new Date).getTime());
      observer.complete();
    }, 1000);
  });
}

const task$ = myFunction().pipe(share(), take(1));

setInterval(() => {
  task$.subscribe(console.log);
}, 400);

【讨论】:

  • @JoniJnm 您的最终解决方案是混合 Observables 和标准 JS 调度原语。为什么?我建议坚持使用 Observables 或 vanilla JS - 例如,您的方法存在取消订阅/取消问题,其中 setTimeout 未取消。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-01-20
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多