【问题标题】:RXJS: Optimal processing a particular event in an Observable streamRXJS:优化处理 Observable 流中的特定事件
【发布时间】:2017-07-20 17:03:54
【问题描述】:

我正在订阅一个可观察的“无限”事件流(“infiniteObservable”)。我想对事件做一些特别的事情,例如#1,然后继续。我可以想象有两种方法。 1:使用布尔标志从流中查找事件并将其设置为使用布尔值处理并继续处理流。 2:对同一个infiniteObservable使用两个observable引用,一个用于正常处理,另一个使用elementAt(1)处理感兴趣的项目。

有没有办法在没有布尔值、计数器或多个观察者的情况下优雅地做到这一点?

下面的工作示例代码 1。这段代码可以折叠成一个流(不带布尔值吗?):

//current (non-elegant? uses boolean, etc) code:
infiniteObservable.subscribe((item) => {
  this.item = item;
  if (this.item && !boolItemFound) {
    // ... do some processing
    boolItemFound = true;
  }
});

我相信必须有一种更优雅的方式来使用 rxjs 运算符的强大功能,而无需求助于布尔值。我在下面有一个替代方案,我发现它同样不优雅,因为它使用了对同一个 infinitObservable 的两个引用(但没有布尔值)。下面的工作示例代码 2(可以折叠成单个流吗?):

const normalInfiniteObservable  = infiniteObservable;
const firstInfiniteObservable= infiniteObservable;

normalInfiniteObservable .subscribe((item) => this.item = item); //process items normally

// meanwhile, grab item(1) from special processing
firstInfiniteObservable.elementAt(1).subscribe((item) => {
  // do something with item(1) only...
});

(注意:尽管上述方法有效,但单元测试/业力会引发 ArgumentOutOfRangeError) 关于如何组合上述两个流并避免使用布尔值的任何建议(以上)?

感谢您的想法!

【问题讨论】:

  • and then continue. - 你是什么意思?
  • @Maximus,编辑并添加了详细信息。

标签: javascript angular rxjs observable


【解决方案1】:

如果您需要一些特殊情况处理,并且它不应该编辑流(副作用),您可以使用范围压缩第一个流发射,这样您就可以拥有一个流和一个索引,并执行逻辑对于第 N 个元素。

// given a source observable 
src$.map( ( src, index ) => { src, index } )
.do(
    ( srcWithIndex ) => {
        //get index from zippedSrc.index
        // do write custom logic here
        if srcWithIndex.index == 3 [...]
    }
 )
// stream is not interrupted

【讨论】:

  • 我的信息流有“无限事件”(已编辑),仅供参考。我正在研究你的回复。
  • 抱歉,我记得地图 fn 接受第二个参数,即发出的索引,我会尽快编辑我的回复
猜你喜欢
  • 2019-08-08
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2020-02-05
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2018-01-11
相关资源
最近更新 更多