【发布时间】:2014-09-11 21:13:26
【问题描述】:
我正在编写一个如下所示的 Pig 脚本:
...
myGroup = group simplifiedJoinData by (dir1, dir2, dir3, dir4);
betterGroup = foreach myGroup {
value1Value2 = foreach simplifiedJoinedGroup generate value1, value2;
distinctValue1Value2 = DISTINCT value1Value2; generate group, distinctValue1Value2;
}
store betterGroup into '/myHdfsPath/myMultiStorageTest' using MyMultiStorage('output', '0', 'none' );
请注意simplifiedJoinData的schema是simplifiedJoinedGroup: {dir1: long,dir2: long,dir3: chararray,dir4: chararray,value1: chararray,value2: chararray}
它使用一个自定义存储类(MyMultiStorage - 基本上是储钱罐中 MultiStorage 的修改版本),它可以写入多个输出文件。自定义存储类期望传递给它的值采用以下格式:
{group:(dir1:long,dir2:long,dir3:chararray,dir4:chararray), bag:{(value1:chararrary,value2:chararray)}}
我希望自定义存储类做的是输出多个文件,如下所示: dir/dir2/dir3/dir4/value1_values.txt dir/dir2/dir3/dir4/value2_values.txt
其中 value1_values.txt 包含所有 value1 值,value2_values.txt 包含所有 value2 值。理想情况下,我不希望编写多个必须在以后组合的部分文件(请注意,为了讨论的目的,该示例已被简化。真正的输出文件是二进制结构,不能与简单的 cat 组合)。我有这个适用于小型数据集;但是,当我使用更大的数据集运行时,我遇到了在 Hadoop 中出现输出文件名已存在或已被创建的异常的问题:
java.io.IOException: org.apache.hadoop.ipc.RemoteException: org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException
我怀疑这是因为多个映射器或化简器试图写入同一个文件,而我没有像 PigStorage 那样在文件名中使用部件 ID。但是,我本来希望通过对数据进行分组,对于每个 dir1、dir2、dir3、dir4 组合只有一个记录,因此,只有一个映射器或减速器会尝试为一个特定的文件编写一个给定运行。我已经尝试在没有推测执行的情况下运行 map 和 reduce 任务,但这似乎没有效果。显然我不明白这里发生了什么。
我的问题是:为什么我会收到 AlreadyBeingCreatedException?
如果我无法让单个 reducer 为每条记录写入所有数据,则必须在一个目录中写入多个部分输出文件(每个 reducer 一个)并在事后将它们组合起来是可以接受的。它只是不理想。但是,到目前为止,我还无法确定让自定义存储类确定唯一文件名的正确方法,并且我仍然会遇到多个 reducer 尝试创建/写入同一个文件的情况。作业配置或上下文中是否有特定方法可以让我在作业中协调各个部分?
提前感谢您提供的任何帮助。
【问题讨论】:
标签: java hadoop apache-pig