【问题标题】:Rxjs Subject#next asynchronous or not?Rxjs Subject#next 是否异步?
【发布时间】:2018-06-28 08:41:11
【问题描述】:

您好,请问以下代码是否按顺序执行?即日志“Worker has finished task C”是否总是在日志“Finished 3 task(s)”之后发生?

长问题:使用scan 运算符,我可以确定任务是按顺序执行的,我并不担心。让我担心的是,我希望从上次订阅中仅在任务 C 完成后才做某事,我不确定 o.complete() 的位置是否能保证这一点。例如将start() 运行do.("A") -> do.("B") -> do.("C") 而不等待扫描完成并立即运行o.complete(),给出输出:

Worker has finished task C
Doing task A
Finished 1 task(s)
Doing task B
Finished 2 task(s)
Doing task C
Finished 3 task(s)

如果是这样,你如何修复代码以实现我所描述的?

https://stackblitz.com/edit/typescript-xhhwme

class Worker {
  private tasks: Subject<string>;
  public init(): Observable<number> {
    this.tasks = new Subject<string>();
    return this.tasks.scan((count, task) => {
      console.log("Doing task " + task);
      return ++count;
    }, 0).asObservable();
  }
  public do(task: string): void {
    this.tasks.next(task);
  }
}

function start(worker: Worker): Observable<void> {
  return Observable.create(o => {
    const monitor = worker.init();
    monitor.subscribe(c => console.log("Finished " + c + " task(s)"));
    worker.do("A");
    worker.do("B");
    worker.do("C");
    o.complete();
    worker.do("D");
  });
}

const worker = new Worker();
start(worker).subscribe({
  complete: () => console.log("Worker has finished task C")
});

【问题讨论】:

    标签: typescript rxjs


    【解决方案1】:

    TLDR:Subject.next 是同步的。

    如果源是同步的,则响应式流是同步的,除非您明确将它们设为异步或将它们与异步流混合。这些都不会在您的代码中发生。一些例子:

    //Synchronous
    of(1,2)
      .subscribe(console.log);
    
    //asynchronous because of async source
    interval(1000)
      .subscribe(console.log);
    
    //aynchronous because one stream is async (interval)
    of(1,2)
      .pipe(
        mergeMap(x => interval(1000).pipe(take(2)))
      )
      .subscribe(console.log);
    
    //async because we make it async
    of(1,2, asyncScheduler)
      .subscribe(console.log);
    

    在您的示例中发生了什么? Observable.create 中的所有内容都将立即执行。当您调用 worker.do("A"); 时,this.tasks.next(task); 会发出一个新值,并(同步)执行 tasks 流链。 BC 也是如此。

    当您调用o.complete(); 时,start(worker) 流完成并打印"Worker has finished task C"。然后Dtasks流执行。

    您可以在这些文章中找到有关异步/同步行为的更多详细信息:

    【讨论】:

    • 这是 SO 答案应该是什么的完美示例。不仅解决了这些问题,而且提供了相关的进修资料。你可以成为一个真正的好老师,谢谢!
    猜你喜欢
    • 1970-01-01
    • 2019-12-19
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-11-04
    • 2017-03-12
    • 1970-01-01
    • 2016-08-10
    相关资源
    最近更新 更多