As discussed in previous post [Hadoop MapReduce Chaining ]Here i will apply the mapper/reducer chaining to wordcount example
I will follow the following sequence of chaining in my job:
MAP+ | REDUCE | MAP*
The job runs multiple
mappers in sequence to preprocess the data, and after running reducer, it will run multiple mappers in sequence to postprocess the data.Mappers before reduce phase can be called preprocessing of data and Mappers after reduce phase can be called postprocessing of data
This job Consists of following classes:
- ChainWordCountDriver
- TokenizerMapper
- UpperCaserMapper
- WordCountReducer
- LastMapper
ChainWordCountDriver will take input file which should be seperated on token. It will call different mappers and reducers in following sequence
TokenizerMapper -- > UpperCaserMapper -- > WordCountReducer --> LastMapper
Here the output of one phase will become the input of next phase
public class ChainWordCountDriver extends Configured implements Tool {
public int run(String[] args) throws Exception {
JobConf conf = new JobConf(getConf(), ChainWordCountDriver.class);
conf.setJobName("wordcount");
Path outputPath = new Path("/home/impadmin/testdata/CustomerOutput");
FileSystem fs = FileSystem.get(new URI(outputPath.toString()), conf);
//It will delete the output directory if it already exists. don't need to delete it manually
fs.delete(outputPath);
//Setting the input and output path
FileInputFormat.setInputPaths(conf, "/home/impadmin/testdata/Customer");
FileOutputFormat.setOutputPath(conf, outputPath);
//Considering the input and output as text file set the input & output format to TextInputFormat
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
JobConf mapAConf = new JobConf(false);
ChainMapper.addMapper(conf, TokenizerMapper.class, LongWritable.class, Text.class, Text.class, IntWritable.class, true, mapAConf);
Here the output of one phase will become the input of next phase
public class ChainWordCountDriver extends Configured implements Tool {
public int run(String[] args) throws Exception {
JobConf conf = new JobConf(getConf(), ChainWordCountDriver.class);
conf.setJobName("wordcount");
Path outputPath = new Path("/home/impadmin/testdata/CustomerOutput");
FileSystem fs = FileSystem.get(new URI(outputPath.toString()), conf);
//It will delete the output directory if it already exists. don't need to delete it manually
fs.delete(outputPath);
//Setting the input and output path
FileInputFormat.setInputPaths(conf, "/home/impadmin/testdata/Customer");
FileOutputFormat.setOutputPath(conf, outputPath);
//Considering the input and output as text file set the input & output format to TextInputFormat
conf.setInputFormat(TextInputFormat.class);
conf.setOutputFormat(TextOutputFormat.class);
JobConf mapAConf = new JobConf(false);
ChainMapper.addMapper(conf, TokenizerMapper.class, LongWritable.class, Text.class, Text.class, IntWritable.class, true, mapAConf);
//addMapper will take global conf object and mapper class ,input and output type for this mapper and output key/value have to be sent by value or by reference and localJObconf specific to this call
JobConf mapBConf = new JobConf(false);
ChainMapper.addMapper(conf, UpperCaserMapper.class, Text.class, IntWritable.class, Text.class, IntWritable.class, true, mapBConf);
JobConf reduceConf = new JobConf(false);
ChainReducer.setReducer(conf, WordCountReducer.class, Text.class, IntWritable.class, Text.class, IntWritable.class, true, reduceConf);
JobConf mapCConf = new JobConf(false);
ChainReducer.addMapper(conf, LastMapper.class, Text.class, IntWritable.class, Text.class, IntWritable.class, true, mapCConf);
JobClient.runJob(conf);
return 0;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new ChainWordCountDriver(), args);
System.exit(res);
}
}
ChainMapper.addMapper(conf, UpperCaserMapper.class, Text.class, IntWritable.class, Text.class, IntWritable.class, true, mapBConf);
JobConf reduceConf = new JobConf(false);
ChainReducer.setReducer(conf, WordCountReducer.class, Text.class, IntWritable.class, Text.class, IntWritable.class, true, reduceConf);
JobConf mapCConf = new JobConf(false);
ChainReducer.addMapper(conf, LastMapper.class, Text.class, IntWritable.class, Text.class, IntWritable.class, true, mapCConf);
JobClient.runJob(conf);
return 0;
}
public static void main(String[] args) throws Exception {
int res = ToolRunner.run(new Configuration(), new ChainWordCountDriver(), args);
System.exit(res);
}
}
TokenizerMapper - Parse the input file record for every token
public class TokenizerMapper extends MapReduceBase implements Mapper<LongWritable, Text,Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value,OutputCollector output,Reporter reporter) throws IOException {
String line = value.toString();
System.out.println("Line:"+line);
StringTokenizer itr = new StringTokenizer(line);
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
output.collect(word, one);
}
}
}
public class TokenizerMapper extends MapReduceBase implements Mapper<LongWritable, Text,Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(LongWritable key, Text value,OutputCollector output,Reporter reporter) throws IOException {
String line = value.toString();
System.out.println("Line:"+line);
StringTokenizer itr = new StringTokenizer(line);
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
output.collect(word, one);
}
}
}
UpperCaserMapper - It will uppercase the passed token from TokenizerMapper
public class UpperCaserMapper extends MapReduceBase implements Mapper<Text, IntWritable,Text, IntWritable> {
public void map(Text key, IntWritable value,OutputCollector output,Reporter reporter) throws IOException {
String word = key.toString().toUpperCase();
System.out.println("Upper Case:"+word);
output.collect(new Text(word), value);
}
}
public class UpperCaserMapper extends MapReduceBase implements Mapper<Text, IntWritable,Text, IntWritable> {
public void map(Text key, IntWritable value,OutputCollector output,Reporter reporter) throws IOException {
String word = key.toString().toUpperCase();
System.out.println("Upper Case:"+word);
output.collect(new Text(word), value);
}
}
WordCountReducer - is doing nothing special just writing the key in the context
public class WordCountReducer extends MapReduceBase implements Reducer<Text, IntWritable,Text, IntWritable> {
public void reduce(Text key, Iterator values,OutputCollector output, Reporter reporter) throws IOException {
int sum = 0;
output.collect(key, new IntWritable(sum));
}
}
public class WordCountReducer extends MapReduceBase implements Reducer<Text, IntWritable,Text, IntWritable> {
public void reduce(Text key, Iterator values,OutputCollector output, Reporter reporter) throws IOException {
int sum = 0;
output.collect(key, new IntWritable(sum));
}
}
LastMapper - will spilt the record sent from reducer and write into the final output file
public class LastMapper extends MapReduceBase implements Mapper<Text, IntWritable,Text, IntWritable> {
public void map(Text key, IntWritable value,OutputCollector output,Reporter reporter) throws IOException {
String[] word = key.toString().split(",");
System.out.println("Upper Case:"+word);
output.collect(new Text(word[0]), new Text(word[1]));
}
}
Input data is like this:
customerId,customerName,contactNumber
1,Stephanie Leung,555-555-5555
2,Edward Kim,123-456-7890
3,Jose Madriz,281-330-8004
4,David Stork,408-555-0000
Here the order of execution will be :
1.Driver will call the mappers and reducers in the following sequence.
2. Record will be read in TokenizerMapper, it will parse and split the record on each token[space] and sent it to UpperCaserMapper
3. UpperCaserMapper will do the uppercase of record and send it to WordCountReducer
4. WordCountReducer will just write the key
5. LastMapper will again split the key written by reducer on comma and write this
For any query please drop a comment......
public class LastMapper extends MapReduceBase implements Mapper<Text, IntWritable,Text, IntWritable> {
public void map(Text key, IntWritable value,OutputCollector output,Reporter reporter) throws IOException {
String[] word = key.toString().split(",");
System.out.println("Upper Case:"+word);
output.collect(new Text(word[0]), new Text(word[1]));
}
}
Input data is like this:
customerId,customerName,contactNumber
1,Stephanie Leung,555-555-5555
2,Edward Kim,123-456-7890
3,Jose Madriz,281-330-8004
4,David Stork,408-555-0000
Here the order of execution will be :
1.Driver will call the mappers and reducers in the following sequence.
2. Record will be read in TokenizerMapper, it will parse and split the record on each token[space] and sent it to UpperCaserMapper
3. UpperCaserMapper will do the uppercase of record and send it to WordCountReducer
4. WordCountReducer will just write the key
5. LastMapper will again split the key written by reducer on comma and write this
For any query please drop a comment......
Good Work, Helpful for beginners
ReplyDeleteThanks a lot
Can you show the final output?
ReplyDeleteFinal Output would be:
ReplyDelete1 STEPHANIE
2 EDWARD
3 JOSE
4 DAVID
KIM 123-456-7890
LEUNG 555-555-5555
MADRIZ 281-330-8004
STORK 408-555-0000
It reads first row from file and split the line on token. It tokenize the "Stephanie Leung" and pass (1,Stephanie) To UppercaseMapper
and then pass (Leung,555-555-5555) to UpperCaseMapper and LastMapper will just write these values
Output from different phases:
Line:1,Stephanie Leung,555-555-5555
TokenizerMapper - 1,Stephanie --- 1
UpperCaserMapper output - 1,STEPHANIE --- 1
TokenizerMapper - Leung,555-555-5555 --- 1
UpperCaserMapper output - LEUNG,555-555-5555 --- 1
WordCountReducer output - 1,STEPHANIE --- 0
LastMapper output - 1 --- STEPHANIE
WordCountReducer output - LEUNG,555-555-5555 --- 0
LastMapper output - LEUNG --- 555-555-5555
Line:2,Edward Kim,123-456-7890
TokenizerMapper - 2,Edward --- 1
UpperCaserMapper output - 2,EDWARD --- 1
TokenizerMapper - Kim,123-456-7890 --- 1
UpperCaserMapper output - KIM,123-456-7890 --- 1
WordCountReducer output - 2,EDWARD --- 0
LastMapper output - 2 --- EDWARD
WordCountReducer output - KIM,123-456-7890 --- 0
LastMapper output - KIM --- 123-456-7890
same for different set of rows
I want to read multiple files parallel and compare both the files. I meant Mapper1 wil read file1 and Mapper2 will read file2 and pass the mapper1 file value to the mapper2 for comparision. Will that work or Is there any other option available
ReplyDeleteThis comment has been removed by the author.
DeleteAs per your problem statement, you can try the following solution
DeleteSuppose file1 has content:
red
blue
yellow
file2's content
red
pink
When these files are processed using mapper, you can prepare the following key/value pair
Mapper's (key,val) --- (file's content,file's name)
In this way, mappers'output would be:
(red,file1)
(blue,file1)
(yellow,file1)
(red,file2)
(pink,file2)
With this set of mapper's output, reducers input would be:
(red,(file1,file2))
(blue,file1)
(yellow,file1)
(red,file2)
(pink,file2)
Check for file1 and file2 value in each pair....
Having only file1 in val list means particular word is of file1
Having only file2 in val list means particular word is of file2
Having file1,file2 in val list means particular word is common in both files
Hope, this will work for you.
Crystal clear and very helpful, looking for same kind of for secondary sorting, very much confused in understanding the SS.
DeleteGood Job..!!
ReplyDeleteCan you please send me the .java file?
Thanks Dhaval for your comment...
ReplyDeleteRegarding .java file .. you can create it at your end:
Create the java file is eclipse with name
ChainWordCountDriver TokenizerMapper UpperCaserMapper WordCountReducer LastMapper
And paste the code as given in above post to respective files...
And you are ready with your java files
Getting run time exceptional in ChainMapper Example..
ReplyDeletejava.lang.ArrayIndexOutOfBoundsException
help me out.
With the log which you have shared, it seems you are getting this error in LastMapper... But i am not able to identify the exact line which is throwing this exception
ReplyDeleteHere, I am also assuming that your input file has data like
1,Stephanie Leung,555-555-5555
2,Edward Kim,123-456-7890
3,Jose Madriz,281-330-8004
4,David Stork,408-555-0000
In LastMapper, we are spliting the key into array and array might not have proper values with your input data..so might be one of the reason for this.
Another way you can try is add the neccessary debug points in these java classes in eclipse and debug it to identify the issue
Thanks Getanjali..!
DeleteGot it..!
Good job.Thank you
ReplyDeleteThank you for your great tutorial. I have made some changes in your code to make it workable with mapreduce new api. Code works and outputs the correct solution. The problem is that, it does not write the outputted solution to the file. What can be the problem ?
ReplyDelete@Ceyhun Karimov
ReplyDeletePlease make sure you have set the correct output path in below lines of code
//Setting the input and output path
FileInputFormat.setInputPaths(conf, "/home/impadmin/testdata/Customer");
FileOutputFormat.setOutputPath(conf, outputPath);
As you have rewrite the code using new API. Add the paths with this API
Thank you for the wonderful example. I am trying to chain mappers with Composite key. Composite key needs custom implementation of Comparator, Grouping Comparator, Partitioner. Is there a way I can specify them in ChainMapper and ChainReducer ?
ReplyDeleteYes, all this can be set in the job object:
ReplyDeletejob.setMapOutputKeyClass(CompositeKey.class);
job.setPartitionerClass(CustomPartitioner.class);
job.setGroupingComparatorClass(CustomGroupingComparator.class);
I followed your blog to implement Chaining in my project but I am gettting lots of error and I am not able to rectify it.
ReplyDeletePlease do help me. Below is the error log.
java.lang.Exception: java.lang.RuntimeException: Error in configuring object
at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:354)
Caused by: java.lang.RuntimeException: Error in configuring object
at org.apache.hadoop.util.ReflectionUtils.setJobConf(ReflectionUtils.java:93)
at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:64)
at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:117)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:426)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:366)
at org.apache.hadoop.mapred.LocalJobRunner$Job$MapTaskRunnable.run(LocalJobRunner.java:223)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
One reason might be all the required jars related to hadoop have not been configured.
Delete