【问题标题】:How to combine two Observables into one Observable Array using RxJs如何使用 RxJs 将两个 Observable 组合成一个 Observable Array
【发布时间】:2019-12-18 20:24:34
【问题描述】:

我正在尝试将两个 Observable 组合成一个 Observable 数组,我的 Angular 应用程序可以随着时间的推移订阅和接收值。该场景正在使用聊天机器人。用户提供输入,机器人做出响应,然后将两者组合在一个数组中以用于聊天流。

见下文:

聊天输入组件获取用户输入:

  onNewComment(value: string): void {
    this.chatService.addComment(value);
  }

然后这会与chat.service.ts 通话

import { Injectable, Output } from '@angular/core';
import { Observable, BehaviorSubject, Subject, combineLatest, from, merge, concat } from 'rxjs';
import { environment } from '../../environments/environment';
import { HttpClient, HttpHeaders } from '@angular/common/http';
import { UserInput } from '../interfaces/user-input';
import { map, tap, switchMap, mergeMap, toArray, scan, shareReplay, take } from 'rxjs/operators';
import { of } from 'rxjs';
import { text } from '@angular/core/src/render3';
import { Division } from '../interfaces/division';


const headers = new HttpHeaders();
headers.append(
    'Content-Type', 'application/json'
);

@Injectable({
  providedIn: 'root'
})
export class ChatService {
  apiUrl = environment.apiUrl;
  conversationSubject: BehaviorSubject<any[]> = new BehaviorSubject<any>([]);
  conversationArray$: Observable<any> =  this.conversationSubject.asObservable();
  private commentAddedSubject = new Subject<any>();
  commentAddedAction$ = this.commentAddedSubject.asObservable();


  sessionToken$ = this.http.get<any>(`${this.apiUrl}/api/session`).pipe(
    map(data => data.session_id),
    tap(data => this.addComment('')),
    shareReplay(1)
  );

  commentWithSessionToken$ = combineLatest([
    this.sessionToken$,
    this.commentAddedAction$
  ]).pipe(
    map(([sessionToken, comment]) => {
      return {
        session_id: sessionToken,
        input: {
          text: comment
        },
        };
      }
    ),
  );


  botResponse$ = this.commentWithSessionToken$
    .pipe(
      switchMap(comment =>
        of(comment)
          .pipe(
            mergeMap(data => this.http.post<any>(`${this.apiUrl}/api/message`,
              {
                userInput: data
              },
              {
                headers: headers
              })
              ),
            )
          )
      );

  addComment(newComment?: string) {
    this.commentAddedSubject.next(newComment);
  }

  constructor(
    private http: HttpClient,
  ) { }
}


我尝试了多种方法将 commentWithSessionToken$botResponse$ 组合成一组用户输入和机器人响应,但没有任何运气。我需要的最终结果是这样的:

[commentWithSessionToken$, botResponse$, commentWithSessionToken$, etc, etc]

任何建议将不胜感激。谢谢。

【问题讨论】:

  • 请使用 rxjs forkJoin
  • 您可能应该mergeconcat 两个流,然后scanreduce 它们。

标签: angular rxjs


【解决方案1】:

请使用 rxjs forkJoin

forkJoin(commentWithSessionToken$, botResponse$, commentWithSessionToken$, etc, etc)

返回类型为observable&lt;[commentWithSessionToken$, botResponse$,...]&gt;

【讨论】:

    【解决方案2】:

    我认为你应该合并你的 observables 并扫描它们

    chat$ = merge(this.commentWithSessionToken$, this.botResponse$).pipe(
      scan((acc, curr) => ([...acc, curr]), []),
    );
    

    【讨论】:

    • 谢谢,这正是我想要的。
    猜你喜欢
    • 1970-01-01
    • 2019-03-05
    • 1970-01-01
    • 2017-08-04
    • 2018-11-16
    • 1970-01-01
    • 2020-06-19
    • 1970-01-01
    • 2019-01-23
    相关资源
    最近更新 更多