【问题标题】:How to convert a stream of variable length packets into fixed length packets in RxJS?如何在 RxJS 中将可变长度数据包流转换为固定长度数据包?
【发布时间】:2018-08-20 14:17:22
【问题描述】:

我是 RxJS 的新手。

给定以下流

[ 'foo ', 'bar', ' b', 'az 12', '3', '4567' ]

我想把它转换成一个固定大小的数据包(比如 3 个字节)+ 余数

['foo', ' ba', 'r b', 'az ', '123', '456', '7']

在现实生活中它实际上是一个二进制数据的缓冲区。

我想知道 RxJS 的惯用方式是什么。

我找到的简单方法是:

from([ 'foo ', 'bar', ' b', 'az 12', '3', '4567' ])
.pipe(
    Rx.concatMap(v => from(v)),
    Rx.bufferCount(3),
    Rx.map(v => v.join(''))
)
.subscribe(v => console.log(v))

将所有内容拆分为单个字符似乎很浪费,所以我发现另一种方法是使用 .slice() 可能更好,但更冗长。

const bufferToSize = (chunkSize) => (source) =>
    Observable.create(subscriber => {
        let buffer = new Buffer('')

        return source.subscribe({
            next: (value) => {
                buffer += value

                while (buffer.length > chunkSize) {
                    subscriber.next(buffer.slice(0, chunkSize))
                    buffer = buffer.slice(chunkSize, buffer.length)
                }
            },
            complete: () => {
                subscriber.next(buffer)
                subscriber.complete()
            }
        })
    });

from([ 'foo ', 'bar', ' b', 'az 12', '3', '4567' ])
    .pipe(bufferToSize(3))
    .subscribe(v => console.log(v))

两者都返回预期结果

foo
 ba
r b
az
123
456
7

有没有更好的方法来做到这一点?或者至少更惯用的方式?

谢谢

【问题讨论】:

  • 我认为您的第一个解决方案很棒,切片无论如何都不会表现得更好;-)
  • 我认为第一个解决方案也是最简单的。请注意,您可以使用 concatMap(v => from(v)) 而不是 concatMap(v => v)concatAll()
  • 如果你对发射每个字节感到困惑,那么你可以考虑here描述的方法。
  • 我认为第一个解决方案很好只能达到一点 - 它会将整个值输入然后输出它,这对于小值/流来说很好,但是如果它是一个持续的不确定长度的数据流,它永远不会发出。 buffer.slice 选项更具可扩展性 - 如果您不需要这种可扩展性,则重量更重,但无论流多长,基本上都可以工作。除了您列出的两个之外,我想不出其他选择...

标签: javascript node.js rxjs observable


【解决方案1】:

您的第一个选项是完美的(from() 除外,只需使用 v => v)。

@Mark 说它会等待 observable 完成获取整个值,但事实并非如此。它只是等待收集 3 个字符,然后发出缓冲区。

我创建了延迟版本来向您展示这是连续流。

https://stackblitz.com/edit/buffer-mxsltx?file=index.ts&devtoolsheight=50

【讨论】:

    猜你喜欢
    • 2017-06-05
    • 2010-10-04
    • 2014-08-30
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-09-30
    相关资源
    最近更新 更多