【问题标题】:Merging small files in hadoop在hadoop中合并小文件
【发布时间】:2016-12-30 10:35:55
【问题描述】:

我在 HDFS 中有一个目录(最终目录),其中每分钟加载一些文件(例如:10 mb)。 一段时间后,我想将所有小文件合并为一个大文件(例如:100 mb)。但用户不断将文件推送到最终目录。这是一个连续的过程。

所以我第一次需要将前 10 个文件合并为一个大文件(例如:large.txt)并将文件保存到 Finaldir。

现在我的问题是如何获取除前 10 个文件之外的下 10 个文件?

可以帮帮我吗

【问题讨论】:

标签: hadoop mapreduce hive hdfs hadoop2


【解决方案1】:

@Andrew 为您指出了一个 6 年前在面向批处理的世界中适合的解决方案。
但现在是 2016 年,您正在运行微批量数据流,需要非阻塞解决方案。

我就是这样做的:

  • 创建一个包含 3 个分区的 EXTERNAL 表,映射到 3 个目录 例如new_datareorghistory
  • 将新文件输入new_data
  • 实现一个作业来运行批处理压缩,并定期运行它

现在批量压缩逻辑:

  1. 确保在压缩运行时不会执行任何 SELECT 查询,否则会返回重复项
  2. 选择所有适合压缩的文件(定义你自己的 条件) 并将它们从new_data 目录移动reorg
  3. 合并所有这些reorg 文件的内容,在history 目录中的一个新文件中(随意GZip 它,Hive 将识别.gz 扩展)
  4. 删除 reorg 中的文件

所以这基本上是 2010 年的旧故事,除了您现有的数据流可以继续将新文件转储到 new_data 中,同时压缩安全地在单独的目录中运行。如果压缩作业崩溃,您可以安全地调查/清理/恢复压缩,而不会影响数据流。


顺便说一句,我不是 2010 年基于“Hadoop 流”工作的解决方案的忠实拥护者——一方面,“流”现在有非常不同的含义;另一方面,“Hadoop 流式处理”在过去很有用,但现在已经不为人知了;在紧握的手 [*] 上,您可以使用 Hive 查询非常简单地做到这一点,例如
INSERT INTO TABLE blahblah PARTITION (stage='history')
SELECT a, b, c, d
FROM blahblah
WHERE stage='reorg'
;

在查询之前使用几个SET some.property = somevalue,您可以定义将在结果文件上应用的压缩编解码器、您想要多少个文件(或者更准确地说,您想要多大的文件to be - Hive 将相应地运行合并)等。

查看hive.merge.mapfileshive.merge.mapredfiles 下的https://cwiki.apache.org/confluence/display/Hive/Configuration+Properties(或hive.merge.tezfiles,如果您使用TEZ)和hive.merge.smallfiles.avgsize,然后查看hive.exec.compress.outputmapreduce.output.fileoutputformat.compress.codec - 加上hive.hadoop.supports.splittable.combineinputformat 以减少数量映射容器,因为您的输入文件非常小。


[*] 这里是非常古老的 SF 参考 :-)

【讨论】:

    【解决方案2】:

    这里还有一个替代方案,这仍然是@Andrew 在他的 cmets 中指出的传统方法,但有额外的步骤将您的输入文件夹作为缓冲区来接收及时将它们推送到 tmp 目录的小文件,并且合并它们并将结果推送回输入。

    第一步:创建一个tmp目录

    hadoop fs -mkdir tmp
    

    第2步:将所有小文件在某个时间点移动到tmp目录

    hadoop fs -mv input/*.txt tmp
    

    第 3 步 - 在 hadoop-streaming jar 的帮助下合并小文件

    hadoop jar $HADOOP_HOME/share/hadoop/tools/lib/hadoop-streaming-2.6.0.jar \
                       -Dmapred.reduce.tasks=1 \
                       -input "/user/abc/input" \
                       -output "/user/abc/output" \
                       -mapper cat \
                       -reducer cat
    

    第 4 步 - 将输出移动到输入文件夹

    hadoop fs -mv output/part-00000 input/large_file.txt
    

    第 5 步 - 删除输出

     hadoop fs -rm -R output/
    

    第 6 步 - 从 tmp 中删除所有文件

    hadoop fs -rm tmp/*.txt
    

    从第 2 步到第 6 步创建一个 shell 脚本,并安排它定期运行以定期合并较小的文件(根据您的需要可能每分钟)

    安排 cron 作业以合并小文件的步骤

    第1步:在上述步骤(2到6)的帮助下创建一个shell脚本/home/abc/mergejob.sh

    重要说明:需要在脚本中指定hadoop的绝对路径才能被cron理解

    #!/bin/bash
    /home/abc/hadoop-2.6.0/bin/hadoop fs -mv input/*.txt tmp
    wait
    /home/abc/hadoop-2.6.0/bin/hadoop jar /home/abc/hadoop-2.6.0/share/hadoop/tools/lib/hadoop-streaming-2.6.0.jar \
                       -Dmapred.reduce.tasks=1 \
                       -input "/user/abc/input" \
                       -output "/user/abc/output" \
                       -mapper cat \
                       -reducer cat
    wait
    /home/abc/hadoop-2.6.0/bin/hadoop fs -mv output/part-00000 input/large_file.txt
    wait
    /home/abc/hadoop-2.6.0/bin/hadoop fs -rm -R output/
    wait
    /home/abc/hadoop-2.6.0/bin/hadoop fs -rm tmp/*.txt
    

    第 2 步:使用 cron 安排脚本每分钟运行一次,使用 cron 表达式

    a) 通过选择编辑器来编辑 crontab

    >crontab -e
    

    b) 在末尾添加以下行并退出编辑器

    * * * * * /bin/bash /home/abc/mergejob.sh > /dev/null 2>&1
    

    合并作业将安排为每分钟运行一次。

    希望这对您有所帮助。

    【讨论】:

    • 感谢 Addy、Samson 和 Andrew 抽出宝贵时间发布答案... Andy 如果可能,请您发布 shell 脚本和调度部分。我对 shell 脚本和调度非常陌生。
    • @Raj- 更新了答案中的调度部分,希望对您有所帮助
    • @Aditya 我将 hadoop-streaming-2.6.0.jar 复制到 HDFS 并在脚本中给出了它的路径。但是,在执行上述脚本时,我得到了 Not a valid JAR 异常。
    猜你喜欢
    • 2013-10-30
    • 1970-01-01
    • 2012-09-17
    • 2018-03-12
    • 2019-05-08
    • 1970-01-01
    • 1970-01-01
    • 2011-04-02
    • 1970-01-01
    相关资源
    最近更新 更多