Monday 27 August 2012

Hbase Map Reduce : Demo MR job - Import tsv file to hbase table

Map Reduce job to demonstrate how to import an tsv file from hdfs to hbase table:

Let us start by creating a hbase table:

We have a table 'sample3 '

create 'sample3','region','time','product','sale','profit'
Here 'region','time','product','sale','profit' are different colfamily of the table

Structue of the table is :
hbase(main):043:0> describe 'sample3'
DESCRIPTION                                                                                                                                              ENABLED                                                                           
 {NAME => 'sample3', FAMILIES => [{NAME => 'product', BLOOMFILTER => 'NONE', REPLICATION_SCOPE => '0', COMPRESSION => 'NONE', VERSIONS => '3', TTL => '2 true                                                                              
 147483647', BLOCKSIZE => '65536', IN_MEMORY => 'false', BLOCKCACHE => 'true'}, {NAME => 'profit', BLOOMFILTER => 'NONE', REPLICATION_SCOPE => '0', COMP                                                                                   
 RESSION => 'NONE', VERSIONS => '3', TTL => '2147483647', BLOCKSIZE => '65536', IN_MEMORY => 'false', BLOCKCACHE => 'true'}, {NAME => 'region', BLOOMFIL                                                                                   
 TER => 'NONE', REPLICATION_SCOPE => '0', COMPRESSION => 'NONE', VERSIONS => '3', TTL => '2147483647', BLOCKSIZE => '65536', IN_MEMORY => 'false', BLOCK                                                                                   
 CACHE => 'true'}, {NAME => 'sale', BLOOMFILTER => 'NONE', REPLICATION_SCOPE => '0', COMPRESSION => 'NONE', VERSIONS => '3', TTL => '2147483647', BLOCKS                                                                                   
 IZE => '65536', IN_MEMORY => 'false', BLOCKCACHE => 'true'}, {NAME => 'time', BLOOMFILTER => 'NONE', REPLICATION_SCOPE => '0', COMPRESSION => 'NONE', V                                                                                   
 ERSIONS => '3', TTL => '2147483647', BLOCKSIZE => '65536', IN_MEMORY => 'false', BLOCKCACHE => 'true'}]}                       e


Let us have a sample tsv(Tab seperated file) having sample data like shown below:
1    India    Haryana    Chandigarh    2009    April    P1    1    5
2    India    Haryana    Ambala    2009    May    P1    2    10
3    India    Haryana    Panipat    2010    June    P2    3    15
4    United States    California    Fresno    2009    April    P2    2    5
5    United States    California    Long Beach    2010    July    P2    4    10
6    United States    California    San Fransico    2011    August    P1    6    20

Note: Place this file in HDFS

Let us have a look at the map reduce job code:

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

public class ImportFromFile {
   
   
    static class ImportMapper extends
            Mapper<LongWritable, Text, ImmutableBytesWritable, Writable> {
   
        @Override
        public void map(LongWritable offset, Text line, Context context)
                throws IOException {
            try {
                String lineString = line.toString();
                String[] arr = lineString.split("\t");
                //Apply boundary checks according to your tsv file
                Put put = new Put(arr[0].getBytes());
                put.add("region".getBytes(), "country".getBytes(), Bytes.toBytes(arr[1]));
                put.add("region".getBytes(),"state".getBytes(), Bytes.toBytes(arr[2]));
                put.add("region".getBytes(),"city".getBytes(), Bytes.toBytes(arr[3]));
                put.add("time".getBytes(),"year".getBytes(), Bytes.toBytes(arr[4]));
                put.add("time".getBytes(),"month".getBytes(), Bytes.toBytes(arr[5]));
                put.add("product".getBytes(),"productid".getBytes(), Bytes.toBytes(arr[6]));
                put.add("sale".getBytes(),"unit".getBytes(), Bytes.toBytes(arr[7]));
                put.add("profit".getBytes(),"total".getBytes(), Bytes.toBytes(arr[8]));
                       context.write(new ImmutableBytesWritable(arr[0].getBytes()), put);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }


    public static void main(String[] args) throws Exception {
        Configuration conf = HBaseConfiguration.create();
        String table = "sample3";
        String input = "/home/impadmin/testdata/hbaseolap/olapdata";
        String column = "";

        conf.set("conf.column", column);
        Job job = new Job(conf, "Import from file " + input + " into table "
                + table);
        job.setJarByClass(ImportFromFile.class);
        job.setMapperClass(ImportMapper.class);
        job.setOutputFormatClass(TableOutputFormat.class);
        job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, table);
        job.setOutputKeyClass(ImmutableBytesWritable.class);
        job.setOutputValueClass(Writable.class);
        job.setNumReduceTasks(0);
        FileInputFormat.addInputPath(job, new Path(input));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

}
After running this, file is imported in hbase table.

Cheers
Geetanjali

For any query please drop a comment......