【问题标题】:Merging a new rxjs observable into existing subscribed observable将新的 rxjs observable 合并到现有的订阅 observable 中
【发布时间】:2019-04-15 20:05:15
【问题描述】:

我有一个已订阅的 rxjs observable:

const o = new rxjs.Subject();
o.subscribe( (v) => console.log("value: " + v);

由于一些不相关的事件,例如用户单击按钮,我需要从服务器获取数据,并将该数据放入现有的 observable o。使用 axios,我有这样的东西:

const aPromise = axios.get("someurl...")

然后我执行以下操作:

const secondObservable = rxjs.of(aPromise)
                             .subscribe( (response) => o.next(response.data) )

我的问题是,是否有更好的方法将响应数据放入已订阅的现有 observable 中,也许使用运算符?

【问题讨论】:

  • 或许axios.get(...).then(response => o.next(response.data)).catch(error => o.error(error);

标签: rxjs


【解决方案1】:

如果您知道您需要来自原始流的数据并且可能会通过单击按钮获得更新 - 那么我们将需要创建一个按钮单击流,例如

const btnClick$ = new rxjs.Subject();

onClick(){
  this.btnClick$.next(void 0);
}

然后我们将把这个btnClick$流变成一个http-get请求流,使用switchMap

const request$ = btnClick$.pipe(
  // each click will be turned into a http-get
  switchMap(() => rxjs.of(axios.get("someurl...")))
)

我们已经准备好在点击按钮时使用来自服务器的原始数据流和可能的数据更新流的结果:

// we'll take results both from original stream
// and our new possible source of data
merge(original$, request$).subscribe(data => {
  console.log(data);
})

这是一个使用模拟的小应用:

const { fromEvent, of, merge } = rxjs;
const { map, delay, switchMap } = rxjs.operators;

// create a stream of btn clicks
const theButton = document.getElementById('the-btn');
const btnClicks$ = fromEvent(theButton, 'click');

console.log('Processing...');

// clicks turned into http-gets
const request$ = btnClicks$.pipe(
  switchMap(() => makeRequest())
);

// original source of data
const original$ = of('Please, click the button!').pipe(delay(500));

merge(original$, request$)
  .subscribe(data => {
    console.log(data);
  });

// mock for requests
function makeRequest(){
  return of(Date.now()).pipe(delay(500))
}
<script src="https://unpkg.com/rxjs@6.4.0/bundles/rxjs.umd.min.js"></script>

<button
  id="the-btn"
  >Click me!</button>

希望对你有帮助

【讨论】:

  • 这比他们希望简化的 OP 的原始解决方案更加复杂。
  • @AdrianBrand,我想我认为Subject 只是作为示例给出的,而在实际代码中,OP 有Observable。此外,恕我直言,merge 的方法具有更好的关注点分离。虽然我同意你和 Richard Matsen 建议的方法看起来更简单。
  • 虽然比较复杂,但我喜欢使用mergeswitchMap。我是否认为switchMap 用于将新的可观察/承诺“转换”为值,因为仅使用map 不会将其从可观察/承诺变为值?
  • @AntKutschera,是的,你是对的。需要switchMap 将值替换为值流(在我们的例子中是带有http-get 请求流的点击事件)。另请注意,有类似的运算符,如mergeMapconcatMapexhaustMap,请检查this comparison 以选择适合您需要的运算符。总帐
【解决方案2】:

您可以直接将调用结果传递给您的 observable。

const axios = {
  get: url => Promise.resolve('some value')
};

const o = new rxjs.Subject();
o.subscribe(v => { console.log("value: " + v); });

axios.get('url').then(data => { o.next(data); });
&lt;script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.4.0/rxjs.umd.min.js"&gt;&lt;/script&gt;

【讨论】:

  • 我认为,使用这种方法,from(axios.get('url')).subscribe(o) 会很酷。 UPD 呃,但它会完成主题,该死的。忽略这个,对不起。但是,您还需要一些错误推送。通过.subscribe(o)... :)
  • 我最初确实尝试过。
  • 我的意思是说您还需要添加错误传播,例如.then(o.next.bind(o), o.error.bind(o)).
  • 此解决方案不会将 axios 承诺包装在 observable 中,但仍会在第一个 observable 上调用 next。我可以接受,如果它是标准的并且被社区接受,但我想知道是否有更可接受的方式来做这件事,而无需致电next
  • 无需包装承诺只是为了打开它。
猜你喜欢
  • 2017-10-10
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2016-07-07
  • 1970-01-01
  • 2021-11-16
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多