【问题标题】:Nestjs @Sse : return result of a promise in rxjs observableNestjs @Sse :在 rxjs observable 中返回一个承诺的结果
【发布时间】:2021-04-27 08:01:38
【问题描述】:

我试图在 nest doc example 之外更简单地在控制器中实现 @Sse(),但直到现在我才使用 rxjs,所以我有点困惑。

流程是:

  1. 客户端发送带有文件负载的POST 请求
  2. 服务器(希望)发回新创建的project,并带有一个道具status:UPLOADED
  3. 客户端订阅下面描述的 sse 路由作为参数传递它刚刚从服务器收到的projectId
  4. 与此同时,服务器是doingSomeStuff,这可能需要 10 秒到一分钟。当doingSomeStuff 完成时,db 中的项目状态从UPLOADED 更新为PARSED

我需要 @Sse 修饰函数以 x 时间间隔执行“状态检查”并返回 project.status(当时可能已更新,也可能未更新)

我现在的代码:

  @Sse('sse/:projectId')
  sse(@Param('projectId') projectId: string): Observable<any> {
    const projId$ = from(this.projectService.find(projectId)).pipe(
      map((p) => ({
        data: {
          status: p.status,
        },
      })),
    );
    return interval(1000).pipe(switchMap(() => projId$));
  }

我没有把服务代码放在这里,因为它是一个简单的mongooseModel.findById 包装器。

我的问题是返回的状态仍然是UPLOADED 并且永远不会更新。似乎承诺不会在每个滴答声中重新执行。如果我在我的服务中使用 console.log,我可以看到我的日志只打印一次,初始值为 project,而我希望在每次滴答时看到一个新日志。

【问题讨论】:

    标签: rxjs nestjs server-sent-events


    【解决方案1】:

    这是一个两步过程。

    1. 我们使用 rxjs 中的 from 运算符从 this.service.findById() 生成的 Promise 中创建一个 observable。我们还使用map 运算符来设置当有人订阅这个 observable 时我们需要的对象的格式。

    2. 我们希望每 x 秒返回一次这个 observable。 interval(x) 创建一个 observable,它在每 x 毫秒后发出一个值。因此,只要间隔发出一个值,我们就使用它,然后使用 switchMapprojId$。只要外部 observable 发出值,switchMap 运算符就会切换到内部 observable。

    请注意:由于您的服务器可能需要 10 秒才能执行此操作,因此您应该相应地设置 intervalValue。在下面的代码 sn-p 中,我将其设置为 10,000 毫秒,即 10 秒。

    const intervalValue = 10000;
    
    @Sse('sse/:projectId')
    sse(@Param('projectId') projectId: string): Observable < any > {
      return interval(intervalValue).pipe(
        switchMap(() => this.projectService.find(projectId)),
        map((p) => ({
          data: {
            status: p.status,
          }
        })));
    }
    
    
    // OR
    
    @Sse('sse/:projectId')
    sse(@Param('projectId') projectId: string): Observable < any > {
      const projId$ = defer(() => this.service.findById(projectId)).pipe(
        map(() => ({
          data: {
            _: projectId
          }
        }))
      );
    
      return interval(intervalValue).pipe(switchMap(() => projId$));
    }

    【讨论】:

    • 非常清楚,就像一个魅力......在map anon 函数参数中添加了 return project 以访问返回值并执行map((p) =&gt; ({ data: { _: p.anyProperty } })) ,这似乎可以解决问题。谢谢。只是为了确定:在间隔的每个“滴答”处,findById 都会重新执行,并且返回的值是新承诺的结果,而不是每次都返回相同的结果?
    • 没错。在间隔的每个滴答声中,findById 将重新执行。切换映射迫使它在每个刻度上切换到 findById observable。
    • 对不起,我“无法解决”这个问题,因为它在进一步实施后似乎不起作用。我编辑了我最初的问题。
    • @Pierre_T 已编辑答案。请立即验证并告诉我它是否有效。
    • 没有明确的间隔就没有办法做到这一点吗?我只想在获取一些数据时向客户端发送事件,但是 nextjs 迫使我们使用令人讨厌的 Observeble
    【解决方案2】:

    @softmarshmallow 您可以观看模型更改并使用可观察流来发送它。 像这样的

    import { Controller, Param, Sse } from '@nestjs/common'
    import { filter, map, Observable, Subject } from 'rxjs'
    
    @Controller('project')
    export class ProjectStatusController {
      private project$ = new Subject()
    
      // watch model event
      // this method should be called when project is changed
      onProjectChange(project) {
        this.project$.next(project)
      }
    
      @Sse('sse/:projectId')
      sse(@Param('projectId') projectId: string): Observable<any> {
        return this.project$.pipe(
          filter((project) => project.projectId === projectId),
          map((project) => ({
            data: {
              status: project.status,
            },
          })),
        )
      }
    }
    
    

    【讨论】:

      猜你喜欢
      • 2017-09-10
      • 1970-01-01
      • 1970-01-01
      • 2017-05-16
      • 2015-11-27
      • 2018-02-03
      • 1970-01-01
      • 1970-01-01
      • 2015-06-05
      相关资源
      最近更新 更多