【问题标题】:How to read binary-protobuf gz files in Spark / Spark Streaming?如何在 Spark / Spark Streaming 中读取 binary-protobuf gz 文件?
【发布时间】:2016-05-05 12:18:11
【问题描述】:

我要从local/hdfs/kafka中读取gz文件,解压解析。谁有这方面的经验?

或者其他类型如 bin.tar.gz

【问题讨论】:

  • 是的,我已经读过了。但看起来它只支持文本类型。特别是,如果gz文件合并了几个bin文件?
  • 我可以更正反序列化 bin 数据。但是gz文件没有办法。
  • 嗨@steven,在读取 protobuf gz 文件方面有什么进展吗?

标签: apache-spark gzip protocol-buffers


【解决方案1】:

这就是我所做的: 1. 读取二进制数据 = sc.binaryFiles(path) 2.提取内容

data = (data
        .map(lambda x: (x[0], ungzip(x[1])))
        )


def ungzip(df):
    compressed_file = io.BytesIO(df)
    decompressed_file = gzip.GzipFile(fileobj=compressed_file)

    return decompressed_file.read()
  1. 解析消息

def _VarintDecoder(掩码):

    local_ord = ord

    def DecodeVarint(buffer, pos):
        result = 0
        shift = 0
        while 1:
            if pos > len(buffer) - 1:
                raise NotEnoughDataExcption("Not enough data to decode varint")
            b = local_ord(buffer[pos])
            result |= ((b & 0x7f) << shift)
            pos += 1
            if not (b & 0x80):
                result &= mask
                return (result, pos)
            shift += 7
            if shift >= 64:
                raise ValueError('Too many bytes when decoding varint.')

    return DecodeVarint

.

def parse_binary(data):
    decoder = _VarintDecoder((1 << 64) - 1)
    next_pos, pos = 0, 0
    messages = []
    try:
        while 1:
            next_pos, pos = decoder(data[1], pos)
            messages.append((data[0], data[1][pos:pos + next_pos]))
            pos += next_pos
    except:
        return messages

.

data = (data
        .flatMap(lambda x: parse_binary(x))
        )

在此之后,您每行有一个 protobuf 消息,您可以并行应用您的 protobuf_parsing 函数

【讨论】:

    【解决方案2】:

    您可以使用sc.binaryFiles 读取二进制文件并对内容字节做任何您喜欢的事情。

    关于tar.gz,见Read whole text files from a compression in Spark

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-09-04
      • 1970-01-01
      • 2020-01-03
      • 2018-01-28
      • 2016-09-26
      相关资源
      最近更新 更多