【问题标题】:Storm - Writing to HDFS using compressionStorm - 使用压缩写入 HDFS
【发布时间】:2017-04-18 12:57:53
【问题描述】:

我想将风暴拓扑中传入的所有原始数据存储在 HDFS 集群中。 这是 JSON 或二进制数据,以 2k / 秒的速率传入。

我试图使用 HDFS 螺栓 (http://storm.apache.org/releases/0.10.0/storm-hdfs.htmlà ,但它不允许使用普通的 hdfs 螺栓进行压缩 只能使用序列文件螺栓进行压缩。 我不想使用序列文件,因为我没有真正的密钥。

另外,我已经有 Cassandra 用于存储我的密钥/值内容并满足我的请求。 使用 Cassandra 存储我的原始数据需要太多磁盘(开销)(不是这篇文章的目标来讨论这个问题)。

谁能帮我解决这个问题? 我可以使用java Hadoop驱动客户端来实现吗? 有没有人的代码sn-p?

【问题讨论】:

  • 并不是我会推荐序列文件,但缺少密钥不应该阻止你。您可以使用 NullWritable 作为键。

标签: hadoop hdfs apache-storm


【解决方案1】:

好吧,没有办法像我想要的那样即时压缩。 不过我找到了解决方案,如果有人需要,我在这里分享。

这个问题不仅与 Storm 相关,而且是一个更普遍的 Hadoop 问题。

我所有的数据都是使用 HdfsBolt 写入的:

    RecordFormat format = new DelimitedRecordFormat().withFieldDelimiter("|");

    //Synchronize data buffer with the filesystem every 1000 tuples
    // Need to be configurable
    SyncPolicy syncPolicy = new CountSyncPolicy(1000);

    // Rotate data files when they reach five MB
    // need to be configuration
    FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(10.0f, FileSizeRotationPolicy.Units.MB);

    // Use default, Storm-generated file names
    FileNameFormat fileNameFormat = new DefaultFileNameFormat().withPath("/datadir/in_progress") ;

    // Instantiate the HdfsBolt
    HdfsBolt bolt = new HdfsBolt()
        .withFsUrl("hdfs://"+dfsHost+":"+dfsPort)
        .withFileNameFormat(fileNameFormat)
        .withRecordFormat(format)
        .withRotationPolicy(rotationPolicy)
        .withSyncPolicy(syncPolicy)
        .addRotationAction(new MoveFileAction().withDestination("/datadir/finished"));

这给我的螺栓的每个执行者一个文件.. 不容易处理,但没关系 :)

然后我使用 hadoop 流(在 namenode 上的 cron 或类似的东西中)安排自动压缩:

    hadoop jar share/hadoop/tools/lib/hadoop-streaming-2.7.3.jar \
        -Dmapred.reduce.tasks=0 \
        -Dmapred.output.compress=true \
        -Dmapred.compress.map.output=true \
        -Dmapred.output.compression.codec=org.apache.hadoop.io.compress.BZip2Codec \
        -input /datadir/finished \
        -output /datadir/archives \
        -mapper /bin/cat \
        -inputformat org.apache.hadoop.mapred.TextInputFormat \
        -outputformat org.apache.hadoop.mapred.TextOutputFormat

这里我还有一个问题: 一个输入文件被压缩成一个档案。 所以我的 10MB 输入文件(每个用于一个工人)被压缩成一个 1MB 的 gzip(或 bzip)-> 这会产生这么多的小文件,这是 hadoop 中的一个问题

为了解决这个问题,我将尝试查看 hadoop 归档 (HAR) 功能。

我还需要清除 /datadir/finished 中已经压缩的文件

希望我能得到你们的反馈 保持联系

问候, 巴斯蒂安

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2013-06-18
    • 1970-01-01
    • 1970-01-01
    • 2016-10-25
    • 1970-01-01
    • 2023-03-09
    • 1970-01-01
    相关资源
    最近更新 更多