Wednesday, 19 December 2012

Hadoop MapReduce Chaining Example

As discussed in previous post [Hadoop MapReduce Chaining ]Here i will apply the mapper/reducer chaining to wordcount example

I will follow the following sequence of chaining in my job:
MAP+ | REDUCE | MAP* 
The job runs multiple mappers in sequence to preprocess the data, and after running reducer, it will run multiple mappers in sequence to postprocess the data.Mappers before reduce phase can be called preprocessing of data and Mappers after reduce phase can be called postprocessing of data

This job Consists of following classes:
  • ChainWordCountDriver 
  • TokenizerMapper
  • UpperCaserMapper
  • WordCountReducer
  • LastMapper
ChainWordCountDriver will take input file which should be seperated on token. It will call different mappers and reducers in following sequence
TokenizerMapper -- > UpperCaserMapper -- > WordCountReducer -->  LastMapper
Here the output of one phase will become the input of next phase

public class ChainWordCountDriver extends Configured implements Tool {

    public int run(String[] args) throws Exception {
        JobConf conf = new JobConf(getConf(), ChainWordCountDriver.class);
        conf.setJobName("wordcount");
     
        Path outputPath = new Path("/home/impadmin/testdata/CustomerOutput");
        FileSystem  fs = FileSystem.get(new URI(outputPath.toString()), conf);
        //It will delete the output directory if it already exists. don't need to delete it  manually  
        fs.delete(outputPath);
      
        //Setting the input and output path
        FileInputFormat.setInputPaths(conf, "/home/impadmin/testdata/Customer");
        FileOutputFormat.setOutputPath(conf, outputPath);

        //Considering the input and output as text file set the input & output format to TextInputFormat
        conf.setInputFormat(TextInputFormat.class);
        conf.setOutputFormat(TextOutputFormat.class);

        JobConf mapAConf = new JobConf(false);
        ChainMapper.addMapper(conf, TokenizerMapper.class, LongWritable.class, Text.class, Text.class, IntWritable.class, true, mapAConf);     
        
            //addMapper will take global conf object and mapper class ,input and output type for this mapper and output key/value have to be sent by value or by reference and localJObconf specific to this call
       
        JobConf mapBConf = new JobConf(false);
        ChainMapper.addMapper(conf, UpperCaserMapper.class, Text.class, IntWritable.class, Text.class, IntWritable.class, true, mapBConf);

        JobConf reduceConf = new JobConf(false);
        ChainReducer.setReducer(conf, WordCountReducer.class, Text.class, IntWritable.class, Text.class, IntWritable.class, true, reduceConf);

       JobConf mapCConf = new JobConf(false);
       ChainReducer.addMapper(conf, LastMapper.class, Text.class, IntWritable.class, Text.class, IntWritable.class, true, mapCConf);

        JobClient.runJob(conf);
        return 0;
    }

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new ChainWordCountDriver(), args);
        System.exit(res);
    }
} 

TokenizerMapper  -  Parse the input file record for every token
public class TokenizerMapper extends MapReduceBase implements Mapper<LongWritable, Text,Text, IntWritable> {
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(LongWritable key, Text value,OutputCollector output,Reporter reporter) throws IOException {
        String line = value.toString();
        System.out.println("Line:"+line);
        StringTokenizer itr = new StringTokenizer(line);
        while (itr.hasMoreTokens()) {
            word.set(itr.nextToken());
            output.collect(word, one);
        }
    }
}

UpperCaserMapper - It will uppercase the passed token from TokenizerMapper
 public class UpperCaserMapper extends MapReduceBase implements Mapper<Text, IntWritable,Text, IntWritable> {

    public void map(Text key, IntWritable value,OutputCollector output,Reporter reporter) throws IOException {
        String word = key.toString().toUpperCase();
        System.out.println("Upper Case:"+word);
        output.collect(new Text(word), value);   
    }
}

WordCountReducer - is doing nothing special just writing the key in the context
 public  class WordCountReducer extends MapReduceBase implements Reducer<Text, IntWritable,Text, IntWritable> {

    public void reduce(Text key, Iterator values,OutputCollector output, Reporter reporter) throws IOException {
        int sum = 0;
        output.collect(key, new IntWritable(sum));
    }
}

LastMapper - will spilt the record sent from reducer and write into the final output file
 public class LastMapper extends MapReduceBase implements Mapper<Text, IntWritable,Text, IntWritable> {
   
    public void map(Text key, IntWritable value,OutputCollector output,Reporter reporter) throws IOException {
        String[] word = key.toString().split(",");
        System.out.println("Upper Case:"+word);
        output.collect(new Text(word[0]), new Text(word[1]));   
    }
}

Input data is like this:
customerId,customerName,contactNumber
1,Stephanie Leung,555-555-5555
2,Edward Kim,123-456-7890
3,Jose Madriz,281-330-8004
4,David Stork,408-555-0000

Here the order of execution will be :
1.Driver will call the mappers and reducers in the following sequence.
2. Record will be read in TokenizerMapper, it will parse and split the record on each token[space] and sent it to UpperCaserMapper
3. UpperCaserMapper will do the uppercase of record and send it to  WordCountReducer
4. WordCountReducer will just write the key
5. LastMapper will again split the key written by reducer on comma and write this


For any query please drop a comment......

19 comments:

  1. Good Work, Helpful for beginners
    Thanks a lot

    ReplyDelete
  2. Can you show the final output?

    ReplyDelete
  3. Final Output would be:

    1 STEPHANIE
    2 EDWARD
    3 JOSE
    4 DAVID
    KIM 123-456-7890
    LEUNG 555-555-5555
    MADRIZ 281-330-8004
    STORK 408-555-0000

    It reads first row from file and split the line on token. It tokenize the "Stephanie Leung" and pass (1,Stephanie) To UppercaseMapper
    and then pass (Leung,555-555-5555) to UpperCaseMapper and LastMapper will just write these values

    Output from different phases:
    Line:1,Stephanie Leung,555-555-5555
    TokenizerMapper - 1,Stephanie --- 1
    UpperCaserMapper output - 1,STEPHANIE --- 1
    TokenizerMapper - Leung,555-555-5555 --- 1
    UpperCaserMapper output - LEUNG,555-555-5555 --- 1
    WordCountReducer output - 1,STEPHANIE --- 0
    LastMapper output - 1 --- STEPHANIE
    WordCountReducer output - LEUNG,555-555-5555 --- 0
    LastMapper output - LEUNG --- 555-555-5555

    Line:2,Edward Kim,123-456-7890
    TokenizerMapper - 2,Edward --- 1
    UpperCaserMapper output - 2,EDWARD --- 1
    TokenizerMapper - Kim,123-456-7890 --- 1
    UpperCaserMapper output - KIM,123-456-7890 --- 1
    WordCountReducer output - 2,EDWARD --- 0
    LastMapper output - 2 --- EDWARD
    WordCountReducer output - KIM,123-456-7890 --- 0
    LastMapper output - KIM --- 123-456-7890

    same for different set of rows

    ReplyDelete
  4. I want to read multiple files parallel and compare both the files. I meant Mapper1 wil read file1 and Mapper2 will read file2 and pass the mapper1 file value to the mapper2 for comparision. Will that work or Is there any other option available

    ReplyDelete
    Replies
    1. This comment has been removed by the author.

      Delete
    2. As per your problem statement, you can try the following solution
      Suppose file1 has content:
      red
      blue
      yellow

      file2's content
      red
      pink

      When these files are processed using mapper, you can prepare the following key/value pair
      Mapper's (key,val) --- (file's content,file's name)
      In this way, mappers'output would be:
      (red,file1)
      (blue,file1)
      (yellow,file1)
      (red,file2)
      (pink,file2)

      With this set of mapper's output, reducers input would be:
      (red,(file1,file2))
      (blue,file1)
      (yellow,file1)
      (red,file2)
      (pink,file2)

      Check for file1 and file2 value in each pair....
      Having only file1 in val list means particular word is of file1
      Having only file2 in val list means particular word is of file2
      Having file1,file2 in val list means particular word is common in both files

      Hope, this will work for you.

      Delete
    3. Crystal clear and very helpful, looking for same kind of for secondary sorting, very much confused in understanding the SS.

      Delete
  5. Good Job..!!
    Can you please send me the .java file?

    ReplyDelete
  6. Thanks Dhaval for your comment...

    Regarding .java file .. you can create it at your end:
    Create the java file is eclipse with name
    ChainWordCountDriver TokenizerMapper UpperCaserMapper WordCountReducer LastMapper

    And paste the code as given in above post to respective files...
    And you are ready with your java files

    ReplyDelete
  7. Getting run time exceptional in ChainMapper Example..
    java.lang.ArrayIndexOutOfBoundsException

    help me out.

    ReplyDelete
  8. With the log which you have shared, it seems you are getting this error in LastMapper... But i am not able to identify the exact line which is throwing this exception
    Here, I am also assuming that your input file has data like

    1,Stephanie Leung,555-555-5555
    2,Edward Kim,123-456-7890
    3,Jose Madriz,281-330-8004
    4,David Stork,408-555-0000

    In LastMapper, we are spliting the key into array and array might not have proper values with your input data..so might be one of the reason for this.
    Another way you can try is add the neccessary debug points in these java classes in eclipse and debug it to identify the issue

    ReplyDelete
  9. Good job.Thank you

    ReplyDelete
  10. Thank you for your great tutorial. I have made some changes in your code to make it workable with mapreduce new api. Code works and outputs the correct solution. The problem is that, it does not write the outputted solution to the file. What can be the problem ?

    ReplyDelete
  11. @Ceyhun Karimov

    Please make sure you have set the correct output path in below lines of code

    //Setting the input and output path
    FileInputFormat.setInputPaths(conf, "/home/impadmin/testdata/Customer");
    FileOutputFormat.setOutputPath(conf, outputPath);

    As you have rewrite the code using new API. Add the paths with this API

    ReplyDelete
  12. Thank you for the wonderful example. I am trying to chain mappers with Composite key. Composite key needs custom implementation of Comparator, Grouping Comparator, Partitioner. Is there a way I can specify them in ChainMapper and ChainReducer ?

    ReplyDelete
  13. Yes, all this can be set in the job object:

    job.setMapOutputKeyClass(CompositeKey.class);
    job.setPartitionerClass(CustomPartitioner.class);
    job.setGroupingComparatorClass(CustomGroupingComparator.class);

    ReplyDelete
  14. I followed your blog to implement Chaining in my project but I am gettting lots of error and I am not able to rectify it.

    Please do help me. Below is the error log.

    java.lang.Exception: java.lang.RuntimeException: Error in configuring object
    at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:354)
    Caused by: java.lang.RuntimeException: Error in configuring object
    at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:93)
    at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:64)
    at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:117)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:426)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:366)
    at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:223)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

    ReplyDelete
    Replies
    1. One reason might be all the required jars related to hadoop have not been configured.

      Delete