【问题标题】:Mapreduce - reducer class results not correctMapreduce - 减速器类结果不正确
【发布时间】:2016-05-19 14:51:49
【问题描述】:

我有一个 Adcampaign 驱动程序、映射器和减速器类。前两节课运行良好。 reducer 类也运行良好,但结果不正确。这是我从网上下载的一个示例项目,用于练习mapreduce程序。

此程序的简要说明: 问题陈述:

在本文中,假设我们正在经营一家在线广告公司。我们为客户(如百事可乐、索尼)开展广告活动,广告展示在新闻网站(CNN、Fox)和社交媒体网站(Facebook)等热门网站上。为了跟踪广告活动的效果,我们会跟踪我们投放的广告和用户点击的广告。

场景

以下是事件的顺序: 1. 我们向用户投放广告 2. 如果广告出现在用户浏览器上,也就是用户看到了广告。我们将此事件跟踪为 VIEWED_EVENT 3. 如果用户点击广告,我们将此事件跟踪为 CLICKED_EVENT

样本数据:

293868800864,319248,1,flickr.com,12 
 1293868801728,625828,1,npr.org,19 
 1293868802592,522177,2,wikipedia.org,16 
 1293868803456,535052,2,cnn.com,20 
 1293868804320,287430,2,sfgate.com,2 
 1293868805184,616809,2,sfgate.com,1 
 1293868806048,704032,1,nytimes.com,7 
 1293868806912,631825,2,amazon.com,11 
 1293868807776,610228,2,npr.org,6 
 1293868808640,454108,2,twitter.com,18 

Input Log files format and description:
Log Files: The log files are in the following format: 
times- tamp, user_id, view/click, domain, campaign_id. 
E.g: 1262332801728, 899523, 1, npr.org, 19
◾timestamp : unix time stamp in milliseconds 
◾user_id : each user has a unique id
◾action_id : 1=view, 2=click
◾domain : which domain the ad was served
◾campaign_id: identifies the campaign the ad was part of

reducer 的预期输出是: 广告系列ID、总观看次数、总点击次数 示例:

12, 3,2 13,100,23 14, 23,12

我查看了 Mapper 的日志。输出很好。但是Reducer的最终输出并不好。

减速器类:

  public class AdcampaignReducer extends Reducer<IntWritable, IntWritable, IntWritable, Text>
{

    //  Key/value : IntWritable/List of IntWritables for every campaign, we are getting all actions for that 
    //  campaign as an iterable list. We are iterating through action_ids and calculating views and click 
    //  Once we are done calculating, we write out the results. This is possible because all actions for a campaign are grouped and sent to one reducer. 

    //Text k= new Text(); 

    public void reduce(IntWritable key, Iterable<IntWritable> results, Context context) throws IOException, InterruptedException 
   { 

        int campaign = key.get();
        //k = key.get();

        int clicks = 0;
        int views = 0;

        for(IntWritable i:results)
        {
                int action = i.get();
                if (action ==1)
                    views = views+1;
                else if (action == 2)
                    clicks = clicks + 1;


        }

        String statistics = "Total Clicks =" +clicks + "and Views =" + views;

        context.write(new IntWritable(campaign), new Text(statistics));


    }

}

映射器类:

  public class AdcampaignMapper extends Mapper<LongWritable, Text, IntWritable, IntWritable> {  

     private long numRecords = 0; 

     @Override
     public void map(LongWritable key, Text record, Context context) throws IOException, InterruptedException {


         String[] tokens = record.toString().split(",");

         if (tokens.length !=5)
         {
             System.out.println("*** invalid record  : " + record);

         }

         String actionStr = tokens[2];
         String campaignStr = tokens[4];     


         try{

             //System.out.println("during parseint"); //used to debug 
             System.out.println("actionStr =" + actionStr + "and campaign str = " + campaignStr);

             int actionid = Integer.parseInt(actionStr.trim());                      
             int campaignid = Integer.parseInt(campaignStr.trim());


             //System.out.println("during intwritable"); //used to debug
             IntWritable outputKeyFromMapper = new IntWritable(actionid);
             IntWritable outputValueFromMapper = new IntWritable(campaignid);


             context.write(outputKeyFromMapper, outputValueFromMapper);

         }
         catch(Exception e){
             System.out.println("*** there is exception"); 
             e.printStackTrace(); 
         }

         numRecords = numRecords+1;


     }



}

驱动程序:

 public class Adcampaign {

     public static void main(String[] args) throws Exception {
            if (args.length != 2) {
                System.err.println("Usage: MaxClosePrice <input path> <output path>");
                System.exit(-1);
            }


            //reads the default configuration of cluster from the configuration xml files
            // https://www.quora.com/What-is-the-use-of-a-configuration-class-and-object-in-Hadoop-MapReduce-code

            Configuration conf = new Configuration();


            //Initializing the job with the default configuration of the cluster          

            Job job = new Job(conf, "Adcampaign");

            //first argument is job itself
            //second argument is location of the input dataset
            FileInputFormat.addInputPath(job, new Path(args[0]));

            //first argument is the job itself
            //second argument is the location of the output path        
            FileOutputFormat.setOutputPath(job, new Path(args[1]));        


            //Defining input Format class which is responsible to parse the dataset into a key value pair   
            //Configuring the input/output path from the filesystem into the job
            // InputFormat is responsible for 3 main tasks.
            //      a. Validate inputs - meaning the dataset exists in the location specified.
            //      b. Split up the input files into logical input splits. Each input split will be assigned a mapper.
            //      c. Recordreader implementation to extract logical records

            job.setInputFormatClass(TextInputFormat.class);

            //Defining output Format class which is responsible to parse the final key-value output from MR framework to a text file into the hard disk    
            //OutputFomat does 2 mains things
            //  a. Validate output specifications. Like if the output directory already exists? If the directory exist, it will throw an error.
            //  b. Recordwriter implementation to write output files of the job
            //Hadoop comes with several output format implemenations.

            job.setOutputFormatClass(TextOutputFormat.class);

            //Assigning the driver class name           
            job.setJarByClass(Adcampaign.class);

            //Defining the mapper class name            
            job.setMapperClass(AdcampaignMapper.class);

            //Defining the Reducer class name
            job.setReducerClass(AdcampaignReducer.class);

            //setting the second argument as a path in a path variable           
            Path outputPath = new Path(args[1]);

            //deleting the output path automatically from hdfs so that we don't have delete it explicitly            
            outputPath.getFileSystem(conf).delete(outputPath);

            job.setMapOutputKeyClass(IntWritable.class);

            job.setMapOutputValueClass(IntWritable.class);

            ///exiting the job only if the flag value becomes false

            System.exit(job.waitForCompletion(true) ? 0 : 1);


     }

}

【问题讨论】:

  • reducer output not good 是什么意思可以分享输出中的那几行吗?

标签: hadoop mapreduce


【解决方案1】:

您希望根据campaign_id 输出。所以 Campaign_id 应该是映射器代码中的关键。然后在reducer代码中,检查它是view还是click。

     String actionStr = tokens[2];
     String campaignStr = tokens[4];     

     int actionid = Integer.parseInt(actionStr.trim());                      
     int campaignid = Integer.parseInt(campaignStr.trim());

     IntWritable outputKeyFromMapper = new IntWritable(actionid);
     IntWritable outputValueFromMapper = new IntWritable(campaignid);

    Here outputKeyFromMapper should be campaignid as the sorting will be done on campaignid.

如果有帮助,请告诉我。

【讨论】:

  • Ishan,会尽力让你知道
  • 非常感谢伊山。
【解决方案2】:
  1. 映射器的输出键应为campaignid,值应为actionid
  2. 如果要计算映射器中的记录数,请使用counters

【讨论】:

    【解决方案3】:

    您的映射器和减速器看起来不错。 将以下行添加到您的 Driver 类并尝试一下:

            job.setOutputKeyClass( IntWritable.class );
            job.setOutputValueClass( Text.class );
    

    【讨论】:

    • 将在 10 小时内尝试让您了解 Mahendra
    猜你喜欢
    • 2014-05-21
    • 2016-02-15
    • 2011-07-25
    • 1970-01-01
    • 2016-05-02
    • 1970-01-01
    • 2016-07-02
    • 2016-08-13
    • 2010-12-31
    相关资源
    最近更新 更多