【问题标题】:Partitioner does not seems to work on single node?分区器似乎不适用于单节点?
【发布时间】:2016-06-23 15:45:37
【问题描述】:

我已经编写了 map reduce 代码以及自定义分区。自定义分区按某些条件对键进行排序。我在驱动程序类中设置了 setNumReduceTasks=6。 但是我正在我的单机上测试这段代码。我只得到一个减速器输出文件,而不是 6 个减速器文件。 分区器不能在单机上工作吗?是否需要多节点集群才能看到自定义分区器的效果? 对此的任何见解将不胜感激。

【问题讨论】:

    标签: hadoop mapreduce hdfs reduce bigdata


    【解决方案1】:

    我在一台机器上有一个双节点集群。这里是exactly what I did。从那里你可以看到我是这样做的(在执行时):

    指定reducer的个数,例如两个

    -D mapred.reduce.tasks=2
    

    【讨论】:

    • @gsmaras-谢谢回答。但我只是在 netbeans 中测试我的代码。之后我将捆绑一个 jar 并在多节点上执行它。分区器是否可以在单台机器上从 IDE 工作。
    • 不客气。我也使用过 Eclipse,但我对 Netbeans 和 IDE 无能为力,可能有一些内部因素会影响这种情况。无论如何,好问题,我会投票。
    【解决方案2】:

    当你将reducer的数量设置为大于1时,Partitioner总是工作,即使它是一个单节点集群。

    我已经在单节点集群上测试了以下代码,它按预期工作:

    public final class SortMapReduce extends Configured implements Tool {
    
    public static void main(final String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new SortMapReduce(), args);
        System.exit(res);
    }
    
    public int run(final String[] args) throws Exception {
    
        Path inputPath = new Path(args[0]);
        Path outputPath = new Path(args[1]);
    
        Configuration conf = super.getConf();
    
        Job job = Job.getInstance(conf);
    
        job.setJarByClass(SortMapReduce.class);
        job.setMapperClass(Map.class);
        job.setReducerClass(Reduce.class);
    
        job.setInputFormatClass(KeyValueTextInputFormat.class);
    
        job.setMapOutputKeyClass(Person.class);
        job.setMapOutputValueClass(Text.class);
    
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(Text.class);
    
        job.setPartitionerClass(PersonNamePartitioner.class);
    
        job.setNumReduceTasks(5);
    
        FileInputFormat.setInputPaths(job, inputPath);
        FileOutputFormat.setOutputPath(job, outputPath);
    
        if (job.waitForCompletion(true)) {
            return 0;
        }
        return 1;
    }
    
    public static class Map extends Mapper<Text, Text, Person, Text> {
    
        private Person outputKey = new Person();
    
        @Override
        protected void map(Text pointID, Text firstName, Context context) throws IOException, InterruptedException {
            outputKey.set(pointID.toString(), firstName.toString());
            context.write(outputKey, firstName);
        }
    }
    
    public static class Reduce extends Reducer<Person, Text, Text, Text> {
    
        Text pointID = new Text();
    
        @Override
        public void reduce(Person key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
            pointID.set(key.getpointID());
            for (Text firstName : values) {
                context.write(pointID, firstName);
            }
        }
    }
    

    }

    分区类:

    public class PersonNamePartitioner extends Partitioner<Person, Text> {
    
    @Override
    public int getPartition(Person key, Text value, int numPartitions) {
    
        return Math.abs(key.getpointID().hashCode() * 127) % numPartitions;
    }
    

    }

    运行命令:

    hadoop jar /home/hdfs/SecondarySort.jar org.test.SortMapReduce /demo/data/Customer/acct.txt /demo/data/Customer/output2

    谢谢,

    【讨论】:

      【解决方案3】:

      仔细查看您的自定义分区器。它可能会为传递给它的所有键返回相同的分区值。

      在这种情况下,它是一个低效的分区器,它将所有键发送到同一个减速器。因此,即使您将 reducer 的数量设置为 6,也只有一个 reducer 将拥有所有 key-value,而其余 5 个 reducer 将没有任何处理。

      因此,您将获得处理所有记录的唯一 reducer 的输出。

      分区器不能在单机上工作 机器? 分区器也可以在单机伪集群中工作。

      是否需要多节点集群才能看到自定义的效果 分区器? 没有。

      【讨论】:

        猜你喜欢
        • 2011-10-02
        • 1970-01-01
        • 1970-01-01
        • 2015-10-31
        • 2020-04-04
        • 2023-04-06
        • 2017-08-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多