好吧,没有办法像我想要的那样即时压缩。
不过我找到了解决方案,如果有人需要,我在这里分享。
这个问题不仅与 Storm 相关,而且是一个更普遍的 Hadoop 问题。
我所有的数据都是使用 HdfsBolt 写入的:
RecordFormat format = new DelimitedRecordFormat().withFieldDelimiter("|");
//Synchronize data buffer with the filesystem every 1000 tuples
// Need to be configurable
SyncPolicy syncPolicy = new CountSyncPolicy(1000);
// Rotate data files when they reach five MB
// need to be configuration
FileRotationPolicy rotationPolicy = new FileSizeRotationPolicy(10.0f, FileSizeRotationPolicy.Units.MB);
// Use default, Storm-generated file names
FileNameFormat fileNameFormat = new DefaultFileNameFormat().withPath("/datadir/in_progress") ;
// Instantiate the HdfsBolt
HdfsBolt bolt = new HdfsBolt()
.withFsUrl("hdfs://"+dfsHost+":"+dfsPort)
.withFileNameFormat(fileNameFormat)
.withRecordFormat(format)
.withRotationPolicy(rotationPolicy)
.withSyncPolicy(syncPolicy)
.addRotationAction(new MoveFileAction().withDestination("/datadir/finished"));
这给我的螺栓的每个执行者一个文件.. 不容易处理,但没关系 :)
然后我使用 hadoop 流(在 namenode 上的 cron 或类似的东西中)安排自动压缩:
hadoop jar share/hadoop/tools/lib/hadoop-streaming-2.7.3.jar \
-Dmapred.reduce.tasks=0 \
-Dmapred.output.compress=true \
-Dmapred.compress.map.output=true \
-Dmapred.output.compression.codec=org.apache.hadoop.io.compress.BZip2Codec \
-input /datadir/finished \
-output /datadir/archives \
-mapper /bin/cat \
-inputformat org.apache.hadoop.mapred.TextInputFormat \
-outputformat org.apache.hadoop.mapred.TextOutputFormat
这里我还有一个问题:
一个输入文件被压缩成一个档案。
所以我的 10MB 输入文件(每个用于一个工人)被压缩成一个 1MB 的 gzip(或 bzip)-> 这会产生这么多的小文件,这是 hadoop 中的一个问题
为了解决这个问题,我将尝试查看 hadoop 归档 (HAR) 功能。
我还需要清除 /datadir/finished 中已经压缩的文件
希望我能得到你们的反馈
保持联系
问候,
巴斯蒂安