Monday, 27 August 2012

Hbase Map Reduce : Demo MR job - Import tsv file to hbase table

Map Reduce job to demonstrate how to import an tsv file from hdfs to hbase table:

Let us start by creating a hbase table:

We have a table 'sample3 '

create 'sample3','region','time','product','sale','profit'
Here 'region','time','product','sale','profit' are different colfamily of the table

Structue of the table is :
hbase(main):043:0> describe 'sample3'
DESCRIPTION                                                                                                                                              ENABLED                                                                           
 {NAME => 'sample3', FAMILIES => [{NAME => 'product', BLOOMFILTER => 'NONE', REPLICATION_SCOPE => '0', COMPRESSION => 'NONE', VERSIONS => '3', TTL => '2 true                                                                              
 147483647', BLOCKSIZE => '65536', IN_MEMORY => 'false', BLOCKCACHE => 'true'}, {NAME => 'profit', BLOOMFILTER => 'NONE', REPLICATION_SCOPE => '0', COMP                                                                                   
 RESSION => 'NONE', VERSIONS => '3', TTL => '2147483647', BLOCKSIZE => '65536', IN_MEMORY => 'false', BLOCKCACHE => 'true'}, {NAME => 'region', BLOOMFIL                                                                                   
 TER => 'NONE', REPLICATION_SCOPE => '0', COMPRESSION => 'NONE', VERSIONS => '3', TTL => '2147483647', BLOCKSIZE => '65536', IN_MEMORY => 'false', BLOCK                                                                                   
 CACHE => 'true'}, {NAME => 'sale', BLOOMFILTER => 'NONE', REPLICATION_SCOPE => '0', COMPRESSION => 'NONE', VERSIONS => '3', TTL => '2147483647', BLOCKS                                                                                   
 IZE => '65536', IN_MEMORY => 'false', BLOCKCACHE => 'true'}, {NAME => 'time', BLOOMFILTER => 'NONE', REPLICATION_SCOPE => '0', COMPRESSION => 'NONE', V                                                                                   
 ERSIONS => '3', TTL => '2147483647', BLOCKSIZE => '65536', IN_MEMORY => 'false', BLOCKCACHE => 'true'}]}                       e


Let us have a sample tsv(Tab seperated file) having sample data like shown below:
1    India    Haryana    Chandigarh    2009    April    P1    1    5
2    India    Haryana    Ambala    2009    May    P1    2    10
3    India    Haryana    Panipat    2010    June    P2    3    15
4    United States    California    Fresno    2009    April    P2    2    5
5    United States    California    Long Beach    2010    July    P2    4    10
6    United States    California    San Fransico    2011    August    P1    6    20

Note: Place this file in HDFS

Let us have a look at the map reduce job code:

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

public class ImportFromFile {
   
   
    static class ImportMapper extends
            Mapper<LongWritable, Text, ImmutableBytesWritable, Writable> {
   
        @Override
        public void map(LongWritable offset, Text line, Context context)
                throws IOException {
            try {
                String lineString = line.toString();
                String[] arr = lineString.split("\t");
                //Apply boundary checks according to your tsv file
                Put put = new Put(arr[0].getBytes());
                put.add("region".getBytes(), "country".getBytes(), Bytes.toBytes(arr[1]));
                put.add("region".getBytes(),"state".getBytes(), Bytes.toBytes(arr[2]));
                put.add("region".getBytes(),"city".getBytes(), Bytes.toBytes(arr[3]));
                put.add("time".getBytes(),"year".getBytes(), Bytes.toBytes(arr[4]));
                put.add("time".getBytes(),"month".getBytes(), Bytes.toBytes(arr[5]));
                put.add("product".getBytes(),"productid".getBytes(), Bytes.toBytes(arr[6]));
                put.add("sale".getBytes(),"unit".getBytes(), Bytes.toBytes(arr[7]));
                put.add("profit".getBytes(),"total".getBytes(), Bytes.toBytes(arr[8]));
                       context.write(new ImmutableBytesWritable(arr[0].getBytes()), put);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }


    public static void main(String[] args) throws Exception {
        Configuration conf = HBaseConfiguration.create();
        String table = "sample3";
        String input = "/home/impadmin/testdata/hbaseolap/olapdata";
        String column = "";

        conf.set("conf.column", column);
        Job job = new Job(conf, "Import from file " + input + " into table "
                + table);
        job.setJarByClass(ImportFromFile.class);
        job.setMapperClass(ImportMapper.class);
        job.setOutputFormatClass(TableOutputFormat.class);
        job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, table);
        job.setOutputKeyClass(ImmutableBytesWritable.class);
        job.setOutputValueClass(Writable.class);
        job.setNumReduceTasks(0);
        FileInputFormat.addInputPath(job, new Path(input));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

}
After running this, file is imported in hbase table.

Cheers
Geetanjali

For any query please drop a comment......

8 comments:

  1. The guide looks good. However, I'm a bit of a newbie to Hbase, and I'm getting a whole bunch of errors while compiling that look like: "class ImportFromFile is public, should be declared in a file named ImportFromFile.java" and "package org.apache.hadoop.hbase does not exist".

    Answer to the above Query:
    Please share the steps how you are trying to execute this.Simple steps to execute the above program are:
    1. Hadoop and Hbase must be running in your machine.
    2. Create a class with name "ImportFromFile" in eclipse
    3. Table 'sample3' should preexist in running hbase
    4. Run this from eclipse using 'Run as Java application'

    Hope this will work for you now

    ReplyDelete
  2. To remove all classpath related errors i found the best way was to copy all files from hbase lib directory to hadoop/lib dir, as when hadoop loads it directly includes the jar present in its lib directory. Also errors like the one above "package org.apache.hadoop.hbase does not exist" should is solved by including hbase.jar in ur hadoop/lib directory from hbase directory(usually present outside when u open the dir).

    ReplyDelete
  3. For running this we require only hbase core jar and zookeeper jar in classpath.
    By copying all hbase jars into hadoop/lib will be extra load
    Any of the below mentioned solutions can be used for including the jar:
    1. Add the jars which are required
    2. Create a library in your eclipse project with the name as hadoop-lib and hbase-lib which includes hadoop and hbase specific jars respectively.
    3. If running in distributed mode you can add the external/third party jars in HADOOP_CLASSPATH environment variable as shown below:
    export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/path/to/jars

    Please restart your cluster after making changes in any of the conf files.

    ReplyDelete
  4. Ty for the prompt reply!!

    ReplyDelete
  5. Hi Geetanjali,

    Thanks for posting this example and it helped me kick start my HBase problem solving. I am basically new to Java but have some complex need to build data in HBase.
    In the above example you showed how to input the data from a file to HBase table.
    I want some additional check before I put the data.

    my row key is,
    String myRowKey = arr[0] + "|" + arr[1] + "|" + arr[2];

    basically it is
    ||

    So, before I put the data in table,
    1) I want to retrieve the row for above rowkey if available.
    2) If we get no rows, simply put the data
    3) If we get any row, then further check the value for column 2 (all cols are in same family) which is "id".
    4) increment value of "id", add the data to same rowkey, such that dynamically new column names are created based on "id" value.


    for e.g.
    rowkey,id,price1
    goog|nyse|20130101, 1, 234.5

    now we get same data in next run for goog and date 20130101.

    so the input shouls be
    rowkey,id,price1,price2
    goog|nyse|21030101,2,234.5,234.67

    where 234.67 is new value.

    Can you please assist with sample code?

    Also let me know if you require any further clarification.

    ReplyDelete
  6. Thanks Mihir for your comment.

    There are two ways to check if row with same rowKey exists:

    1.
    Get g = new Get(rowKey);
    Result r = table.get(g);
    if(r.isEmpty) {
    Put p = new Put(rowKey);// Preapre put object as per your requirement
    table.put(p);
    }else{
    byte[] value = r.getValue(family1, qualifier1);// As per your column defn to get id field
    int id = Bytes.toInt(value))
    id++:
    //Preapre a put object with update id value and insert it in table
    }

    2.
    Put p = new Put(row1);
    p.add(family1, qualifier2, cellData);
    boolean isExist = p.has(family1, qualifier1);
    if(isExist){
    Put p = new Put(rowKey);// Preapre put object as per your requirement
    table.put(p);
    }else{
    byte[] value = r.getValue(family1, qualifier1);// As per your column defn to get id field
    int id = Bytes.toInt(value))
    id++:
    //Preapre a put object with update id value and insert it in table
    }

    P.S has(byte[] family, byte[] qualifier): method helps to determine if this object's familyMap contains a value assigned to the given family & qualifier.

    I hope this solution works for you. Please let me know for any other clarification

    ReplyDelete