Wednesday, 19 December 2012

Hadoop MapReduce Chaining


Small data processing tasks can be accomplished by a single MapReduce job but complex tasks need to be broken down into simpler subtasks, and each should be accomplished by an individual MapReduce job 

Chaining MapReduce jobs in a sequence :
Two jobs can be executed manually one after the other, it’s more convenient to automate the execution sequence. You can chain MR jobs to run sequentially, with the output of one MapReduce job being the input to the next. 
Chaining MapReduce jobs is analogous to Unix pipes.

mapreduce-1 | mapreduce-2 | mapreduce-3 | ...

Chaining MapReduce jobs involves calling the driver of one MapReduce job after another. The driver at each job will have to create a new JobConf object and set its input path to be the output path of the previous job. You can delete the intermediate data generated at each step of the chain at the end.

Chaining MapReduce jobs with complex dependency :

Sometimes the sub tasks of a complex data processing task don’t run sequentially, and their MapReduce jobs are therefore not chained in a linear fashion. For example,mapreduce1 may process one data set while mapreduce2 independently processes another data set. The third job, mapreduce3, performs an inner join of the first two jobs output.It’s dependent on the other two and can execute only after both mapreduce1 and mapreduce2 are completed. But mapreduce1 and mapreduce2 aren’t dependent on each other.

Hadoop has a mechanism to simplify the management of such (nonlinear) job dependencies via the Job and JobControl classes. A Job object is a representationof a MapReduce job. You instantiate a Job object by passing a JobConf object to its constructor. In addition to holding job configuration information, Job also holds dependency information, specified through the addDependingJob() method. For
Job objects x and y,  
x.addDependingJob(y) 
means x will not start until y has finished. Whereas Job objects store the configuration and dependency information, JobControl objects do the managing and monitoring of the job execution. You can add jobs to a JobControl object via the addJob() method.

After adding all the jobs and dependencies, call JobControl’s run() method to spawn a thread to submit and monitor jobs for execution. JobControl has methods like allFinished() and getFailedJobs() to track the execution of various jobs within the batch.

You can think of chaining MapReduce jobs, using the pseudo-regular expression: 

[MAP | REDUCE]+
where a reducer REDUCE comes after a mapper MAP, and this [MAP | REDUCE] sequence can repeat itself one or more times, one right after another. 

The analogous expression for a job using ChainMapper and ChainReducer would be
MAP+ | REDUCE | MAP*
The job runs multiple mappers in sequence to preprocess the data, and after running reduce it can optionally run multiple mappers in sequence to postprocess the data.The beauty of this mechanism is that you write the pre- and postprocessing steps as standard mappers. You can run each one of them individually if you want. 


Let’s look at the signature of the ChainMapper.addMapper() method to understand in detail how to add each step to the chained job. The signature and function of ChainReducer.setReducer() and ChainReducer.addMapper() are analogous 

public static <K1,V1,K2,V2> void addMapper(JobConf job,
             Class<? extends Mapper<K1,V1,K2,V2>> klass,
             Class<? extends K1> inputKeyClass,
             Class<? extends V1> inputValueClass,
             Class<? extends K2> outputKeyClass,
             Class<? extends V2> outputValueClass,
             boolean byValue,
             JobConf mapperConf)

This method has eight arguments. The first and last are the global and local JobConf objects, respectively. The second argument (klass) is the Mapper class that will do the data processing. The four arguments inputValueClass, inputKeyClass, outputKeyClass, and outputValueClass are the input/output class types of the Mapper class.
 
Please see the next blog which demonstrates the usage of Chaining in MapReduce jobs

For any query please drop a comment...... 

Hadoop MapReduce Chaining Example

As discussed in previous post [Hadoop MapReduce Chaining ]Here i will apply the mapper/reducer chaining to wordcount example

I will follow the following sequence of chaining in my job:
MAP+ | REDUCE | MAP* 
The job runs multiple mappers in sequence to preprocess the data, and after running reducer, it will run multiple mappers in sequence to postprocess the data.Mappers before reduce phase can be called preprocessing of data and Mappers after reduce phase can be called postprocessing of data

This job Consists of following classes:
  • ChainWordCountDriver 
  • TokenizerMapper
  • UpperCaserMapper
  • WordCountReducer
  • LastMapper
ChainWordCountDriver will take input file which should be seperated on token. It will call different mappers and reducers in following sequence
TokenizerMapper -- > UpperCaserMapper -- > WordCountReducer -->  LastMapper
Here the output of one phase will become the input of next phase

public class ChainWordCountDriver extends Configured implements Tool {

    public int run(String[] args) throws Exception {
        JobConf conf = new JobConf(getConf(), ChainWordCountDriver.class);
        conf.setJobName("wordcount");
     
        Path outputPath = new Path("/home/impadmin/testdata/CustomerOutput");
        FileSystem  fs = FileSystem.get(new URI(outputPath.toString()), conf);
        //It will delete the output directory if it already exists. don't need to delete it  manually  
        fs.delete(outputPath);
      
        //Setting the input and output path
        FileInputFormat.setInputPaths(conf, "/home/impadmin/testdata/Customer");
        FileOutputFormat.setOutputPath(conf, outputPath);

        //Considering the input and output as text file set the input & output format to TextInputFormat
        conf.setInputFormat(TextInputFormat.class);
        conf.setOutputFormat(TextOutputFormat.class);

        JobConf mapAConf = new JobConf(false);
        ChainMapper.addMapper(conf, TokenizerMapper.class, LongWritable.class, Text.class, Text.class, IntWritable.class, true, mapAConf);     
        
            //addMapper will take global conf object and mapper class ,input and output type for this mapper and output key/value have to be sent by value or by reference and localJObconf specific to this call
       
        JobConf mapBConf = new JobConf(false);
        ChainMapper.addMapper(conf, UpperCaserMapper.class, Text.class, IntWritable.class, Text.class, IntWritable.class, true, mapBConf);

        JobConf reduceConf = new JobConf(false);
        ChainReducer.setReducer(conf, WordCountReducer.class, Text.class, IntWritable.class, Text.class, IntWritable.class, true, reduceConf);

       JobConf mapCConf = new JobConf(false);
       ChainReducer.addMapper(conf, LastMapper.class, Text.class, IntWritable.class, Text.class, IntWritable.class, true, mapCConf);

        JobClient.runJob(conf);
        return 0;
    }

    public static void main(String[] args) throws Exception {
        int res = ToolRunner.run(new Configuration(), new ChainWordCountDriver(), args);
        System.exit(res);
    }
} 

TokenizerMapper  -  Parse the input file record for every token
public class TokenizerMapper extends MapReduceBase implements Mapper<LongWritable, Text,Text, IntWritable> {
    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(LongWritable key, Text value,OutputCollector output,Reporter reporter) throws IOException {
        String line = value.toString();
        System.out.println("Line:"+line);
        StringTokenizer itr = new StringTokenizer(line);
        while (itr.hasMoreTokens()) {
            word.set(itr.nextToken());
            output.collect(word, one);
        }
    }
}

UpperCaserMapper - It will uppercase the passed token from TokenizerMapper
 public class UpperCaserMapper extends MapReduceBase implements Mapper<Text, IntWritable,Text, IntWritable> {

    public void map(Text key, IntWritable value,OutputCollector output,Reporter reporter) throws IOException {
        String word = key.toString().toUpperCase();
        System.out.println("Upper Case:"+word);
        output.collect(new Text(word), value);   
    }
}

WordCountReducer - is doing nothing special just writing the key in the context
 public  class WordCountReducer extends MapReduceBase implements Reducer<Text, IntWritable,Text, IntWritable> {

    public void reduce(Text key, Iterator values,OutputCollector output, Reporter reporter) throws IOException {
        int sum = 0;
        output.collect(key, new IntWritable(sum));
    }
}

LastMapper - will spilt the record sent from reducer and write into the final output file
 public class LastMapper extends MapReduceBase implements Mapper<Text, IntWritable,Text, IntWritable> {
   
    public void map(Text key, IntWritable value,OutputCollector output,Reporter reporter) throws IOException {
        String[] word = key.toString().split(",");
        System.out.println("Upper Case:"+word);
        output.collect(new Text(word[0]), new Text(word[1]));   
    }
}

Input data is like this:
customerId,customerName,contactNumber
1,Stephanie Leung,555-555-5555
2,Edward Kim,123-456-7890
3,Jose Madriz,281-330-8004
4,David Stork,408-555-0000

Here the order of execution will be :
1.Driver will call the mappers and reducers in the following sequence.
2. Record will be read in TokenizerMapper, it will parse and split the record on each token[space] and sent it to UpperCaserMapper
3. UpperCaserMapper will do the uppercase of record and send it to  WordCountReducer
4. WordCountReducer will just write the key
5. LastMapper will again split the key written by reducer on comma and write this


For any query please drop a comment......

Tuesday, 4 September 2012

HDFS Client API

In this post, let us explore Hadoop file system basic APIs

  • Create a directory in hdfs
  • Copy a file from local files system to hdfs
  • Read the hdfs file
  • Delete the hdfs file
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

public class HDFSClient {
    public static Configuration conf = new Configuration();
   
    static{
        conf.addResource(new Path("/home/impadmin/sws/hadoop-0.20.2/conf/core-site.xml"));
    }
   
    public static void createHdfsfile(String fromLocalFile,String hdfsFile) throws IOException {
        FileSystem hdfs = FileSystem.get(conf);
        Path localDir = new Path(fromLocalFile);
        Path hdfsDir = new Path(hdfsFile);
        hdfs.copyFromLocalFile(localDir, hdfsDir);
    }

    public static void readFile(String file) {
        FileSystem fileSystem;
        try {
           fileSystem = FileSystem.get(conf);
            Path path = new Path(file);
        if (!fileSystem.exists(path)) {
            System.out.println("File " + file + " does not exists");
            return;
        }
        FSDataInputStream in = fileSystem.open(path);
        String filename = file.substring(file.lastIndexOf('/') + 1,
                file.length());
        OutputStream out = new BufferedOutputStream(new FileOutputStream(
                new File(filename)));
        byte[] b = new byte[1024];
        int numBytes = 0;
        while ((numBytes = in.read(b)) > 0) {
            out.write(b, 0, numBytes);
        }
        String s = new String(b);
        System.out.println(s);
        in.close();
        out.close();
        fileSystem.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public static void deleteFile(String file) throws IOException {
        FileSystem fileSystem = FileSystem.get(conf);
        Path path = new Path(file);
        if (!fileSystem.exists(path)) {
            System.out.println("File " + file + " does not exists");
            return;
        }
        boolean success = fileSystem.delete(new Path(file), true);
        fileSystem.close();
    }

    public static void mkdir(String dir) throws IOException {
        FileSystem fileSystem = FileSystem.get(conf);
        Path path = new Path(dir);
        if (fileSystem.exists(path)) {
            System.out.println("Dir " + dir + " already exists...");
            return;
        }
        boolean success = fileSystem.mkdirs(path);
        fileSystem.close();
    }

    public static void main(String[] args) throws IOException {
        //Create a Directory in HDFS
        HDFSClient.mkdir("hdfs://localhost:9000/user/impadmin/testDir");
       
        String srcFile = "file:///home/impadmin/testdata/Order";
        String destFile = "hdfs://localhost:9000/user/impadmin/testDir";
      
       //Copy File from local file system to hdfs
        HDFSClient.createHdfsfile(srcFile,destFile);

        //Read the file
        HDFSClient.readFile("hdfs://localhost:9000/user/impadmin/testDir/Order");
       
        //Delete The File
        HDFSClient.deleteFile("hdfs://localhost:9000/user/impadmin/testDir/Order");
        }
}

For any query please drop a comment......

Monday, 27 August 2012

Hbase Map Reduce : Demo MR job - Import tsv file to hbase table

Map Reduce job to demonstrate how to import an tsv file from hdfs to hbase table:

Let us start by creating a hbase table:

We have a table 'sample3 '

create 'sample3','region','time','product','sale','profit'
Here 'region','time','product','sale','profit' are different colfamily of the table

Structue of the table is :
hbase(main):043:0> describe 'sample3'
DESCRIPTION                                                                                                                                              ENABLED                                                                           
 {NAME => 'sample3', FAMILIES => [{NAME => 'product', BLOOMFILTER => 'NONE', REPLICATION_SCOPE => '0', COMPRESSION => 'NONE', VERSIONS => '3', TTL => '2 true                                                                              
 147483647', BLOCKSIZE => '65536', IN_MEMORY => 'false', BLOCKCACHE => 'true'}, {NAME => 'profit', BLOOMFILTER => 'NONE', REPLICATION_SCOPE => '0', COMP                                                                                   
 RESSION => 'NONE', VERSIONS => '3', TTL => '2147483647', BLOCKSIZE => '65536', IN_MEMORY => 'false', BLOCKCACHE => 'true'}, {NAME => 'region', BLOOMFIL                                                                                   
 TER => 'NONE', REPLICATION_SCOPE => '0', COMPRESSION => 'NONE', VERSIONS => '3', TTL => '2147483647', BLOCKSIZE => '65536', IN_MEMORY => 'false', BLOCK                                                                                   
 CACHE => 'true'}, {NAME => 'sale', BLOOMFILTER => 'NONE', REPLICATION_SCOPE => '0', COMPRESSION => 'NONE', VERSIONS => '3', TTL => '2147483647', BLOCKS                                                                                   
 IZE => '65536', IN_MEMORY => 'false', BLOCKCACHE => 'true'}, {NAME => 'time', BLOOMFILTER => 'NONE', REPLICATION_SCOPE => '0', COMPRESSION => 'NONE', V                                                                                   
 ERSIONS => '3', TTL => '2147483647', BLOCKSIZE => '65536', IN_MEMORY => 'false', BLOCKCACHE => 'true'}]}                       e


Let us have a sample tsv(Tab seperated file) having sample data like shown below:
1    India    Haryana    Chandigarh    2009    April    P1    1    5
2    India    Haryana    Ambala    2009    May    P1    2    10
3    India    Haryana    Panipat    2010    June    P2    3    15
4    United States    California    Fresno    2009    April    P2    2    5
5    United States    California    Long Beach    2010    July    P2    4    10
6    United States    California    San Fransico    2011    August    P1    6    20

Note: Place this file in HDFS

Let us have a look at the map reduce job code:

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

public class ImportFromFile {
   
   
    static class ImportMapper extends
            Mapper<LongWritable, Text, ImmutableBytesWritable, Writable> {
   
        @Override
        public void map(LongWritable offset, Text line, Context context)
                throws IOException {
            try {
                String lineString = line.toString();
                String[] arr = lineString.split("\t");
                //Apply boundary checks according to your tsv file
                Put put = new Put(arr[0].getBytes());
                put.add("region".getBytes(), "country".getBytes(), Bytes.toBytes(arr[1]));
                put.add("region".getBytes(),"state".getBytes(), Bytes.toBytes(arr[2]));
                put.add("region".getBytes(),"city".getBytes(), Bytes.toBytes(arr[3]));
                put.add("time".getBytes(),"year".getBytes(), Bytes.toBytes(arr[4]));
                put.add("time".getBytes(),"month".getBytes(), Bytes.toBytes(arr[5]));
                put.add("product".getBytes(),"productid".getBytes(), Bytes.toBytes(arr[6]));
                put.add("sale".getBytes(),"unit".getBytes(), Bytes.toBytes(arr[7]));
                put.add("profit".getBytes(),"total".getBytes(), Bytes.toBytes(arr[8]));
                       context.write(new ImmutableBytesWritable(arr[0].getBytes()), put);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }


    public static void main(String[] args) throws Exception {
        Configuration conf = HBaseConfiguration.create();
        String table = "sample3";
        String input = "/home/impadmin/testdata/hbaseolap/olapdata";
        String column = "";

        conf.set("conf.column", column);
        Job job = new Job(conf, "Import from file " + input + " into table "
                + table);
        job.setJarByClass(ImportFromFile.class);
        job.setMapperClass(ImportMapper.class);
        job.setOutputFormatClass(TableOutputFormat.class);
        job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, table);
        job.setOutputKeyClass(ImmutableBytesWritable.class);
        job.setOutputValueClass(Writable.class);
        job.setNumReduceTasks(0);
        FileInputFormat.addInputPath(job, new Path(input));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

}
After running this, file is imported in hbase table.

Cheers
Geetanjali

For any query please drop a comment......