Wednesday 19 December 2012

Hadoop MapReduce Chaining


Small data processing tasks can be accomplished by a single MapReduce job but complex tasks need to be broken down into simpler subtasks, and each should be accomplished by an individual MapReduce job 

Chaining MapReduce jobs in a sequence :
Two jobs can be executed manually one after the other, it’s more convenient to automate the execution sequence. You can chain MR jobs to run sequentially, with the output of one MapReduce job being the input to the next. 
Chaining MapReduce jobs is analogous to Unix pipes.

mapreduce-1 | mapreduce-2 | mapreduce-3 | ...

Chaining MapReduce jobs involves calling the driver of one MapReduce job after another. The driver at each job will have to create a new JobConf object and set its input path to be the output path of the previous job. You can delete the intermediate data generated at each step of the chain at the end.

Chaining MapReduce jobs with complex dependency :

Sometimes the sub tasks of a complex data processing task don’t run sequentially, and their MapReduce jobs are therefore not chained in a linear fashion. For example,mapreduce1 may process one data set while mapreduce2 independently processes another data set. The third job, mapreduce3, performs an inner join of the first two jobs output.It’s dependent on the other two and can execute only after both mapreduce1 and mapreduce2 are completed. But mapreduce1 and mapreduce2 aren’t dependent on each other.

Hadoop has a mechanism to simplify the management of such (nonlinear) job dependencies via the Job and JobControl classes. A Job object is a representationof a MapReduce job. You instantiate a Job object by passing a JobConf object to its constructor. In addition to holding job configuration information, Job also holds dependency information, specified through the addDependingJob() method. For
Job objects x and y,  
x.addDependingJob(y) 
means x will not start until y has finished. Whereas Job objects store the configuration and dependency information, JobControl objects do the managing and monitoring of the job execution. You can add jobs to a JobControl object via the addJob() method.

After adding all the jobs and dependencies, call JobControl’s run() method to spawn a thread to submit and monitor jobs for execution. JobControl has methods like allFinished() and getFailedJobs() to track the execution of various jobs within the batch.

You can think of chaining MapReduce jobs, using the pseudo-regular expression: 

[MAP | REDUCE]+
where a reducer REDUCE comes after a mapper MAP, and this [MAP | REDUCE] sequence can repeat itself one or more times, one right after another. 

The analogous expression for a job using ChainMapper and ChainReducer would be
MAP+ | REDUCE | MAP*
The job runs multiple mappers in sequence to preprocess the data, and after running reduce it can optionally run multiple mappers in sequence to postprocess the data.The beauty of this mechanism is that you write the pre- and postprocessing steps as standard mappers. You can run each one of them individually if you want. 


Let’s look at the signature of the ChainMapper.addMapper() method to understand in detail how to add each step to the chained job. The signature and function of ChainReducer.setReducer() and ChainReducer.addMapper() are analogous 

public static <K1,V1,K2,V2> void addMapper(JobConf job,
             Class<? extends Mapper<K1,V1,K2,V2>> klass,
             Class<? extends K1> inputKeyClass,
             Class<? extends V1> inputValueClass,
             Class<? extends K2> outputKeyClass,
             Class<? extends V2> outputValueClass,
             boolean byValue,
             JobConf mapperConf)

This method has eight arguments. The first and last are the global and local JobConf objects, respectively. The second argument (klass) is the Mapper class that will do the data processing. The four arguments inputValueClass, inputKeyClass, outputKeyClass, and outputValueClass are the input/output class types of the Mapper class.
 
Please see the next blog which demonstrates the usage of Chaining in MapReduce jobs

For any query please drop a comment...... 

Hadoop MapReduce Chaining Example

As discussed in previous post [Hadoop MapReduce Chaining ]Here i will apply the mapper/reducer chaining to wordcount example

I will follow the following sequence of chaining in my job:
MAP+ | REDUCE | MAP* 
The job runs multiple mappers in sequence to preprocess the data, and after running reducer, it will run multiple mappers in sequence to postprocess the data.Mappers before reduce phase can be called preprocessing of data and Mappers after reduce phase can be called postprocessing of data

This job Consists of following classes:
  • ChainWordCountDriver 
  • TokenizerMapper
  • UpperCaserMapper
  • WordCountReducer
  • LastMapper
ChainWordCountDriver will take input file which should be seperated on token. It will call different mappers and reducers in following sequence
TokenizerMapper -- > UpperCaserMapper -- > WordCountReducer -->  LastMapper
Here the output of one phase will become the input of next phase

public class ChainWordCountDriver extends Configured implements Tool {

    public int run(String[] args) throws Exception {
        JobConf conf = new JobConf(getConf(), ChainWordCountDriver.class);
        conf.setJobName("wordcount");
     
        Path outputPath = new Path("/home/impadmin/testdata/CustomerOutput");
        FileSystem  fs = FileSystem.get(new URI(outputPath.toString()), conf);
        //It will delete the output directory if it already exists. don't need to delete it  manually  
        fs.delete(outputPath);
      
        //Setting the input and output path
        FileInputFormat.setInputPaths(conf, "/home/impadmin/testdata/Customer");
        FileOutputFormat.setOutputPath(conf, outputPath);

        //Considering the input and output as text file set the input & output format to TextInputFormat
        conf.setInputFormat(TextInputFormat.class);
        conf.setOutputFormat(TextOutputFormat.class);

        JobConf mapAConf = new JobConf(false);
        ChainMapper.addMapper(conf, TokenizerMapper.class, LongWritable.class, Text.class, Text.class, IntWritable.class, true, mapAConf);     
        
            //addMapper will take global conf object and mapper class ,input and output type for this mapper and output key/value have to be sent by value or by reference and localJObconf specific to this call
       
        JobConf mapBConf = new JobConf(false);
        ChainMapper.addMapper(conf, UpperCaserMapper.class, Text.class, IntWritable.class, Text.class, IntWritable.class, true, mapBConf);

        JobConf reduceConf = new JobConf(false);
        ChainReducer.setReducer(conf, WordCountReducer.class, Text.class, IntWritable.class, Text.class, IntWritable.class, true, reduceConf);

       JobConf mapCConf = new JobConf(false);
       ChainReducer.addMapper(conf, LastMapper.class, Text.class, IntWritable.class, Text.class, IntWritable.class, true, mapCConf);

        JobClient.runJob(conf);
        return 0;
    }

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new ChainWordCountDriver(), args);
        System.exit(res);
    }
} 

TokenizerMapper  -  Parse the input file record for every token
public class TokenizerMapper extends MapReduceBase implements Mapper<LongWritable, Text,Text, IntWritable> {
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(LongWritable key, Text value,OutputCollector output,Reporter reporter) throws IOException {
        String line = value.toString();
        System.out.println("Line:"+line);
        StringTokenizer itr = new StringTokenizer(line);
        while (itr.hasMoreTokens()) {
            word.set(itr.nextToken());
            output.collect(word, one);
        }
    }
}

UpperCaserMapper - It will uppercase the passed token from TokenizerMapper
 public class UpperCaserMapper extends MapReduceBase implements Mapper<Text, IntWritable,Text, IntWritable> {

    public void map(Text key, IntWritable value,OutputCollector output,Reporter reporter) throws IOException {
        String word = key.toString().toUpperCase();
        System.out.println("Upper Case:"+word);
        output.collect(new Text(word), value);   
    }
}

WordCountReducer - is doing nothing special just writing the key in the context
 public  class WordCountReducer extends MapReduceBase implements Reducer<Text, IntWritable,Text, IntWritable> {

    public void reduce(Text key, Iterator values,OutputCollector output, Reporter reporter) throws IOException {
        int sum = 0;
        output.collect(key, new IntWritable(sum));
    }
}

LastMapper - will spilt the record sent from reducer and write into the final output file
 public class LastMapper extends MapReduceBase implements Mapper<Text, IntWritable,Text, IntWritable> {
   
    public void map(Text key, IntWritable value,OutputCollector output,Reporter reporter) throws IOException {
        String[] word = key.toString().split(",");
        System.out.println("Upper Case:"+word);
        output.collect(new Text(word[0]), new Text(word[1]));   
    }
}

Input data is like this:
customerId,customerName,contactNumber
1,Stephanie Leung,555-555-5555
2,Edward Kim,123-456-7890
3,Jose Madriz,281-330-8004
4,David Stork,408-555-0000

Here the order of execution will be :
1.Driver will call the mappers and reducers in the following sequence.
2. Record will be read in TokenizerMapper, it will parse and split the record on each token[space] and sent it to UpperCaserMapper
3. UpperCaserMapper will do the uppercase of record and send it to  WordCountReducer
4. WordCountReducer will just write the key
5. LastMapper will again split the key written by reducer on comma and write this


For any query please drop a comment......