【问题标题】:In rxjs how can I create groups of sequential values?在 rxjs 中,如何创建连续值组?
【发布时间】:2025-11-22 22:05:02
【问题描述】:

我有一个这种形式的值流:

[1,2,3,1,2,1,1,1,2...]

我想将其转换为以下形式的组流:

[[1,2,3],[1,2],[1],[1],[1,2]...]

每次值变为 1 时都应创建新组。

【问题讨论】:

    标签: rxjs reactive-streams


    【解决方案1】:

    您可以使用bufferWhen() 运算符收集发出的值,直到可观察对象发出一个值。在此示例中,我使用主题将流的浅表副本发送到缓冲区。

    每当发出数字1 时,缓冲区就会发出。如果流以1 开头,则发出一个空数组。所以我把它过滤掉了。

    const {from, Subject} = rxjs;
    const {filter, bufferWhen, tap, skip} = rxjs.operators;
    
    const stream = from([1,2,3,1,2,1,1,1,2]);
    const trigger = new Subject();
    
    stream.pipe(
       tap(v => trigger.next(v)),
       bufferWhen(() => trigger.pipe(filter(v => v === 1))),
       filter(v => v.length)
    ).subscribe(x => console.log(x));
    <script src="https://unpkg.com/@reactivex/rxjs@6.x/dist/global/rxjs.umd.js"></script>

    如果您想在值减少时发出触发器,您可以使用scan()。这在逻辑上可能要好一些,但使用1 作为触发器符合问题。

    【讨论】:

    • 工作正常!我想不出这样的解决方案:)。如果数组从1以外的其他地方开始,也可以使用过滤器而不是skip(1)
    【解决方案2】:

    为了记录,我发布了一个替代已接受答案的解决方案。

    注意:对于有限流,concat(of(1)) 必须在 scan() 之前,distinctUntilChanged 才能发出最后一个 observable。

    const {of, from, ReplaySubject} = rxjs;
    const {map, concat, scan, distinctUntilChanged, concatAll, toArray, delay} = rxjs.operators;
    
    //considering infinite stream
    const stream = from([1,2,3,1,2,1,1,1,2]);
    stream.pipe(
        scan( (acc, val) => {
            if(val===1){
                acc.complete()
                acc=new ReplaySubject();
            }
            acc.next(val);
            return acc;
        }, new ReplaySubject()), 
        distinctUntilChanged(), 
        map(toArray()), 
        concatAll() );
    

    很高兴收集有关首选解决方案的一些反馈。

    【讨论】:

      最近更新 更多