【问题标题】:Hadoop 1 input file = 1 output file, map-onlyHadoop 1 个输入文件 = 1 个输出文件,仅映射
【发布时间】:2013-07-30 18:10:48
【问题描述】:

我是 Hadoop 新手,但这是我上个月的一个学习项目。

为了尽量保持这种模糊性以对其他人有用,让我先抛出基本目标....假设:

  1. 您有一个庞大的数据集(显然),包含数百万个基本 ASCII 文本文件。
    • 每个文件都是一个“记录”。
  2. 记录存储在目录结构中,以识别客户和日期
    • 例如/user/hduser/data/customer1/YYYY-MM-DD, /user/hduser/data/customer2/YYYY-MM-DD
  3. 您想模仿输出结构的输入结构
    • 例如/user/hduser/out/customer1/YYYY-MM-DD, /user/hduser/out/customer2/YYYY-MM-DD

我查看了多个线程:

还有更多……我也一直在阅读 Tom White 的 Hadoop 书籍。我一直在热切地尝试学习这一点。而且我经常在新 API 和旧 API 之间切换,这增加了尝试学习这一点的困惑。

很多人都指向MultipleOutputs(或旧的api版本),但我似乎无法产生我想要的输出——例如,MultipleOutputs 似乎不接受“/”来创建目录结构在写()

需要采取哪些步骤来创建具有所需输出结构的文件? 目前我有一个WholeFileInputFormat 类和相关的 RecordReader,它有一个 (NullWritable K, ByteWritable V) 对(如果需要可以更改)

我的地图设置:

public class MapClass extends Mapper<NullWritable, BytesWritable, Text, BytesWritable> {
    private Text filenameKey;
    private MultipleOutputs<NullWritable, Text> mos;

    @Override
    protected void setup(Context context) throws IOException, InterruptedException {
        InputSplit split = context.getInputSplit();
        Path path = ((FileSplit) split).getPath();
        filenameKey = new Text(path.toString().substring(38)); // bad hackjob, until i figure out a better way.. removes hdfs://master:port/user/hduser/path/
        mos = new MultipleOutputs(context);
    }
}

还有一个cleanup()函数调用mos.close()ma​​p()函数目前未知(我在这里需要帮助)

这些信息足以让新手找到答案吗?我接下来的想法是在每个 map() 任务中创建一个 MultipleOutputs() 对象,每个任务都有一个新的基本输出字符串,但我不确定它是否有效,甚至是正确的行动。

建议将不胜感激,此时程序中的任何内容都可以更改,除了输入 - 我只是想学习框架 - 但我想尽可能接近这个结果(稍后我可能会考虑将记录合并到更大的文件中,但它们已经是每条记录 20MB,我想确保它在我无法在记事本中阅读之前能够正常工作

编辑:这个问题可以通过修改/扩展 TextOutputFormat.class 来解决吗?似乎它可能有一些可行的方法,但我不确定我需要覆盖哪些方法......

【问题讨论】:

  • 我没有尝试过,但是“Hadoop 权威指南”一书说新 API 的 MultipleOutputs 支持使用文件路径分隔符 (/)。你是说它不起作用?
  • @Rags 我执行 MultipleOutputs 时可能出错

标签: hadoop mapreduce output


【解决方案1】:

如果您关闭推测执行,则没有什么可以阻止您在映射器中手动创建输出文件夹结构/文件,并将记录写入它们(忽略输出上下文/收集器)

例如,扩展 sn-p(设置方法),你可以做这样的事情(这基本上是多个输出正在做的事情,但假设关闭推测执行以避免两个映射任务正在尝试的文件冲突写入相同的输出文件):

import java.io.IOException;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class MultiOutputsMapper extends
        Mapper<LongWritable, Text, NullWritable, NullWritable> {
    protected String filenameKey;
    private RecordWriter<Text, Text> writer;
    private Text outputValue;
    private Text outputKey;

    @Override
    protected void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        // operate on the input record
        // ...

        // write to output file using writer rather than context
        writer.write(outputKey, outputValue);
    }

    @Override
    protected void setup(Context context) throws IOException,
            InterruptedException {
        InputSplit split = context.getInputSplit();
        Path path = ((FileSplit) split).getPath();

        // extract parent folder and filename
        filenameKey = path.getParent().getName() + "/" + path.getName();

        // base output folder
        final Path baseOutputPath = FileOutputFormat.getOutputPath(context);
        // output file name
        final Path outputFilePath = new Path(baseOutputPath, filenameKey);

        // We need to override the getDefaultWorkFile path to stop the file being created in the _temporary/taskid folder
        TextOutputFormat<Text, Text> tof = new TextOutputFormat<Text, Text>() {
            @Override
            public Path getDefaultWorkFile(TaskAttemptContext context,
                    String extension) throws IOException {
                return outputFilePath;
            }
        };

        // create a record writer that will write to the desired output subfolder
        writer = tof.getRecordWriter(context);
    }

    @Override
    protected void cleanup(Context context) throws IOException,
            InterruptedException {
        writer.close(context);
    }
}

需要考虑的几点:

  • customerx/yyyy-MM-dd 路径是文件还是文件文件夹(如果是文件文件夹,则需要相应修改 - 此实现假设每个日期有一个文件,文件名为 yyyy-MM-dd)
  • 您可能希望查看LazyOutputFormat 以防止创建空的输出映射文件

【讨论】:

  • 我使用了您的骨架并从中学到了很多东西...作为学习工具,您的代码优秀..您是对的,yyyy-MM-dd 是另一个文件夹,里面有一个文件。我玩了一会儿,但让它工作了,一个棘手的问题是输入源需要是/user/hduser/data/*(带有星号)才能将任务映射到子目录中的所有文件.我还在作业配置中实现了NullOutputFormat(而不是 Lazy),并在您设置时使用TextOutputFormat(虽然懒惰是一种方便的格式!)非常感谢克里斯的指点!
  • @Chris,只是为了澄清,这个问题也不能使用 MultipleOutputs(新 API)解决吗? (使用 WholeFileInputFormat(isSplittable 为 false 的自定义类并使用 FileSplit 中的路径)?
  • @Rags,可能,但我对尝试做同样的事情但在基本输出路径中的路径分隔符有问题有一些模糊的记忆。也许这已在更新的版本中得到修复。当然值得一试。
  • @Chris,谢谢克里斯。我试过了,它工作正常。这种方法的唯一问题是,您仍然会被后缀“-m/r-#####”所困。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2019-07-08
  • 1970-01-01
  • 1970-01-01
  • 2012-05-09
  • 2014-01-06
  • 2011-05-14
  • 1970-01-01
相关资源
最近更新 更多