Map Reduce job to demonstrate how to import an tsv file from hdfs to hbase table:
Let us start by creating a hbase table:
We have a table 'sample3 '
create 'sample3','region','time','product','sale','profit'
Here 'region','time','product','sale','profit' are different colfamily of the table
Structue of the table is :
hbase(main):043:0> describe 'sample3'
DESCRIPTION ENABLED
{NAME => 'sample3', FAMILIES => [{NAME => 'product', BLOOMFILTER => 'NONE', REPLICATION_SCOPE => '0', COMPRESSION => 'NONE', VERSIONS => '3', TTL => '2 true
147483647', BLOCKSIZE => '65536', IN_MEMORY => 'false', BLOCKCACHE => 'true'}, {NAME => 'profit', BLOOMFILTER => 'NONE', REPLICATION_SCOPE => '0', COMP
RESSION => 'NONE', VERSIONS => '3', TTL => '2147483647', BLOCKSIZE => '65536', IN_MEMORY => 'false', BLOCKCACHE => 'true'}, {NAME => 'region', BLOOMFIL
TER => 'NONE', REPLICATION_SCOPE => '0', COMPRESSION => 'NONE', VERSIONS => '3', TTL => '2147483647', BLOCKSIZE => '65536', IN_MEMORY => 'false', BLOCK
CACHE => 'true'}, {NAME => 'sale', BLOOMFILTER => 'NONE', REPLICATION_SCOPE => '0', COMPRESSION => 'NONE', VERSIONS => '3', TTL => '2147483647', BLOCKS
IZE => '65536', IN_MEMORY => 'false', BLOCKCACHE => 'true'}, {NAME => 'time', BLOOMFILTER => 'NONE', REPLICATION_SCOPE => '0', COMPRESSION => 'NONE', V
ERSIONS => '3', TTL => '2147483647', BLOCKSIZE => '65536', IN_MEMORY => 'false', BLOCKCACHE => 'true'}]} e
Let us have a sample tsv(Tab seperated file) having sample data like shown below:
1 India Haryana Chandigarh 2009 April P1 1 5
2 India Haryana Ambala 2009 May P1 2 10
3 India Haryana Panipat 2010 June P2 3 15
4 United States California Fresno 2009 April P2 2 5
5 United States California Long Beach 2010 July P2 4 10
6 United States California San Fransico 2011 August P1 6 20
Note: Place this file in HDFS
Let us have a look at the map reduce job code:
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
public class ImportFromFile {
static class ImportMapper extends
Mapper<LongWritable, Text, ImmutableBytesWritable, Writable> {
@Override
public void map(LongWritable offset, Text line, Context context)
throws IOException {
try {
String lineString = line.toString();
String[] arr = lineString.split("\t");
//Apply boundary checks according to your tsv file
Put put = new Put(arr[0].getBytes());
put.add("region".getBytes(), "country".getBytes(), Bytes.toBytes(arr[1]));
put.add("region".getBytes(),"state".getBytes(), Bytes.toBytes(arr[2]));
put.add("region".getBytes(),"city".getBytes(), Bytes.toBytes(arr[3]));
put.add("time".getBytes(),"year".getBytes(), Bytes.toBytes(arr[4]));
put.add("time".getBytes(),"month".getBytes(), Bytes.toBytes(arr[5]));
put.add("product".getBytes(),"productid".getBytes(), Bytes.toBytes(arr[6]));
put.add("sale".getBytes(),"unit".getBytes(), Bytes.toBytes(arr[7]));
put.add("profit".getBytes(),"total".getBytes(), Bytes.toBytes(arr[8]));
context.write(new ImmutableBytesWritable(arr[0].getBytes()), put);
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
String table = "sample3";
String input = "/home/impadmin/testdata/hbaseolap/olapdata";
String column = "";
conf.set("conf.column", column);
Job job = new Job(conf, "Import from file " + input + " into table "
+ table);
job.setJarByClass(ImportFromFile.class);
job.setMapperClass(ImportMapper.class);
job.setOutputFormatClass(TableOutputFormat.class);
job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, table);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(Writable.class);
job.setNumReduceTasks(0);
FileInputFormat.addInputPath(job, new Path(input));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
After running this, file is imported in hbase table.
Cheers
Geetanjali
For any query please drop a comment......
Let us start by creating a hbase table:
We have a table 'sample3 '
create 'sample3','region','time','product','sale','profit'
Here 'region','time','product','sale','profit' are different colfamily of the table
Structue of the table is :
hbase(main):043:0> describe 'sample3'
DESCRIPTION ENABLED
{NAME => 'sample3', FAMILIES => [{NAME => 'product', BLOOMFILTER => 'NONE', REPLICATION_SCOPE => '0', COMPRESSION => 'NONE', VERSIONS => '3', TTL => '2 true
147483647', BLOCKSIZE => '65536', IN_MEMORY => 'false', BLOCKCACHE => 'true'}, {NAME => 'profit', BLOOMFILTER => 'NONE', REPLICATION_SCOPE => '0', COMP
RESSION => 'NONE', VERSIONS => '3', TTL => '2147483647', BLOCKSIZE => '65536', IN_MEMORY => 'false', BLOCKCACHE => 'true'}, {NAME => 'region', BLOOMFIL
TER => 'NONE', REPLICATION_SCOPE => '0', COMPRESSION => 'NONE', VERSIONS => '3', TTL => '2147483647', BLOCKSIZE => '65536', IN_MEMORY => 'false', BLOCK
CACHE => 'true'}, {NAME => 'sale', BLOOMFILTER => 'NONE', REPLICATION_SCOPE => '0', COMPRESSION => 'NONE', VERSIONS => '3', TTL => '2147483647', BLOCKS
IZE => '65536', IN_MEMORY => 'false', BLOCKCACHE => 'true'}, {NAME => 'time', BLOOMFILTER => 'NONE', REPLICATION_SCOPE => '0', COMPRESSION => 'NONE', V
ERSIONS => '3', TTL => '2147483647', BLOCKSIZE => '65536', IN_MEMORY => 'false', BLOCKCACHE => 'true'}]} e
Let us have a sample tsv(Tab seperated file) having sample data like shown below:
1 India Haryana Chandigarh 2009 April P1 1 5
2 India Haryana Ambala 2009 May P1 2 10
3 India Haryana Panipat 2010 June P2 3 15
4 United States California Fresno 2009 April P2 2 5
5 United States California Long Beach 2010 July P2 4 10
6 United States California San Fransico 2011 August P1 6 20
Note: Place this file in HDFS
Let us have a look at the map reduce job code:
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
public class ImportFromFile {
static class ImportMapper extends
Mapper<LongWritable, Text, ImmutableBytesWritable, Writable> {
@Override
public void map(LongWritable offset, Text line, Context context)
throws IOException {
try {
String lineString = line.toString();
String[] arr = lineString.split("\t");
//Apply boundary checks according to your tsv file
Put put = new Put(arr[0].getBytes());
put.add("region".getBytes(), "country".getBytes(), Bytes.toBytes(arr[1]));
put.add("region".getBytes(),"state".getBytes(), Bytes.toBytes(arr[2]));
put.add("region".getBytes(),"city".getBytes(), Bytes.toBytes(arr[3]));
put.add("time".getBytes(),"year".getBytes(), Bytes.toBytes(arr[4]));
put.add("time".getBytes(),"month".getBytes(), Bytes.toBytes(arr[5]));
put.add("product".getBytes(),"productid".getBytes(), Bytes.toBytes(arr[6]));
put.add("sale".getBytes(),"unit".getBytes(), Bytes.toBytes(arr[7]));
put.add("profit".getBytes(),"total".getBytes(), Bytes.toBytes(arr[8]));
context.write(new ImmutableBytesWritable(arr[0].getBytes()), put);
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
String table = "sample3";
String input = "/home/impadmin/testdata/hbaseolap/olapdata";
String column = "";
conf.set("conf.column", column);
Job job = new Job(conf, "Import from file " + input + " into table "
+ table);
job.setJarByClass(ImportFromFile.class);
job.setMapperClass(ImportMapper.class);
job.setOutputFormatClass(TableOutputFormat.class);
job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, table);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(Writable.class);
job.setNumReduceTasks(0);
FileInputFormat.addInputPath(job, new Path(input));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
After running this, file is imported in hbase table.
Cheers
Geetanjali
For any query please drop a comment......
nicely done
ReplyDeleteThe guide looks good. However, I'm a bit of a newbie to Hbase, and I'm getting a whole bunch of errors while compiling that look like: "class ImportFromFile is public, should be declared in a file named ImportFromFile.java" and "package org.apache.hadoop.hbase does not exist".
ReplyDeleteAnswer to the above Query:
Please share the steps how you are trying to execute this.Simple steps to execute the above program are:
1. Hadoop and Hbase must be running in your machine.
2. Create a class with name "ImportFromFile" in eclipse
3. Table 'sample3' should preexist in running hbase
4. Run this from eclipse using 'Run as Java application'
Hope this will work for you now
To remove all classpath related errors i found the best way was to copy all files from hbase lib directory to hadoop/lib dir, as when hadoop loads it directly includes the jar present in its lib directory. Also errors like the one above "package org.apache.hadoop.hbase does not exist" should is solved by including hbase.jar in ur hadoop/lib directory from hbase directory(usually present outside when u open the dir).
ReplyDeleteFor running this we require only hbase core jar and zookeeper jar in classpath.
ReplyDeleteBy copying all hbase jars into hadoop/lib will be extra load
Any of the below mentioned solutions can be used for including the jar:
1. Add the jars which are required
2. Create a library in your eclipse project with the name as hadoop-lib and hbase-lib which includes hadoop and hbase specific jars respectively.
3. If running in distributed mode you can add the external/third party jars in HADOOP_CLASSPATH environment variable as shown below:
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/path/to/jars
Please restart your cluster after making changes in any of the conf files.
Ty for the prompt reply!!
ReplyDeleteHi Geetanjali,
ReplyDeleteThanks for posting this example and it helped me kick start my HBase problem solving. I am basically new to Java but have some complex need to build data in HBase.
In the above example you showed how to input the data from a file to HBase table.
I want some additional check before I put the data.
my row key is,
String myRowKey = arr[0] + "|" + arr[1] + "|" + arr[2];
basically it is
||
So, before I put the data in table,
1) I want to retrieve the row for above rowkey if available.
2) If we get no rows, simply put the data
3) If we get any row, then further check the value for column 2 (all cols are in same family) which is "id".
4) increment value of "id", add the data to same rowkey, such that dynamically new column names are created based on "id" value.
for e.g.
rowkey,id,price1
goog|nyse|20130101, 1, 234.5
now we get same data in next run for goog and date 20130101.
so the input shouls be
rowkey,id,price1,price2
goog|nyse|21030101,2,234.5,234.67
where 234.67 is new value.
Can you please assist with sample code?
Also let me know if you require any further clarification.
Thanks Mihir for your comment.
ReplyDeleteThere are two ways to check if row with same rowKey exists:
1.
Get g = new Get(rowKey);
Result r = table.get(g);
if(r.isEmpty) {
Put p = new Put(rowKey);// Preapre put object as per your requirement
table.put(p);
}else{
byte[] value = r.getValue(family1, qualifier1);// As per your column defn to get id field
int id = Bytes.toInt(value))
id++:
//Preapre a put object with update id value and insert it in table
}
2.
Put p = new Put(row1);
p.add(family1, qualifier2, cellData);
boolean isExist = p.has(family1, qualifier1);
if(isExist){
Put p = new Put(rowKey);// Preapre put object as per your requirement
table.put(p);
}else{
byte[] value = r.getValue(family1, qualifier1);// As per your column defn to get id field
int id = Bytes.toInt(value))
id++:
//Preapre a put object with update id value and insert it in table
}
P.S has(byte[] family, byte[] qualifier): method helps to determine if this object's familyMap contains a value assigned to the given family & qualifier.
I hope this solution works for you. Please let me know for any other clarification