Map Reduce job to demonstrate how to import an tsv file from hdfs to hbase table:
Let us start by creating a hbase table:
We have a table 'sample3 '
create 'sample3','region','time','product','sale','profit'
Here 'region','time','product','sale','profit' are different colfamily of the table
Structue of the table is :
hbase(main):043:0> describe 'sample3'
DESCRIPTION ENABLED
{NAME => 'sample3', FAMILIES => [{NAME => 'product', BLOOMFILTER => 'NONE', REPLICATION_SCOPE => '0', COMPRESSION => 'NONE', VERSIONS => '3', TTL => '2 true
147483647', BLOCKSIZE => '65536', IN_MEMORY => 'false', BLOCKCACHE => 'true'}, {NAME => 'profit', BLOOMFILTER => 'NONE', REPLICATION_SCOPE => '0', COMP
RESSION => 'NONE', VERSIONS => '3', TTL => '2147483647', BLOCKSIZE => '65536', IN_MEMORY => 'false', BLOCKCACHE => 'true'}, {NAME => 'region', BLOOMFIL
TER => 'NONE', REPLICATION_SCOPE => '0', COMPRESSION => 'NONE', VERSIONS => '3', TTL => '2147483647', BLOCKSIZE => '65536', IN_MEMORY => 'false', BLOCK
CACHE => 'true'}, {NAME => 'sale', BLOOMFILTER => 'NONE', REPLICATION_SCOPE => '0', COMPRESSION => 'NONE', VERSIONS => '3', TTL => '2147483647', BLOCKS
IZE => '65536', IN_MEMORY => 'false', BLOCKCACHE => 'true'}, {NAME => 'time', BLOOMFILTER => 'NONE', REPLICATION_SCOPE => '0', COMPRESSION => 'NONE', V
ERSIONS => '3', TTL => '2147483647', BLOCKSIZE => '65536', IN_MEMORY => 'false', BLOCKCACHE => 'true'}]} e
Let us have a sample tsv(Tab seperated file) having sample data like shown below:
1 India Haryana Chandigarh 2009 April P1 1 5
2 India Haryana Ambala 2009 May P1 2 10
3 India Haryana Panipat 2010 June P2 3 15
4 United States California Fresno 2009 April P2 2 5
5 United States California Long Beach 2010 July P2 4 10
6 United States California San Fransico 2011 August P1 6 20
Note: Place this file in HDFS
Let us have a look at the map reduce job code:
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
public class ImportFromFile {
static class ImportMapper extends
Mapper<LongWritable, Text, ImmutableBytesWritable, Writable> {
@Override
public void map(LongWritable offset, Text line, Context context)
throws IOException {
try {
String lineString = line.toString();
String[] arr = lineString.split("\t");
//Apply boundary checks according to your tsv file
Put put = new Put(arr[0].getBytes());
put.add("region".getBytes(), "country".getBytes(), Bytes.toBytes(arr[1]));
put.add("region".getBytes(),"state".getBytes(), Bytes.toBytes(arr[2]));
put.add("region".getBytes(),"city".getBytes(), Bytes.toBytes(arr[3]));
put.add("time".getBytes(),"year".getBytes(), Bytes.toBytes(arr[4]));
put.add("time".getBytes(),"month".getBytes(), Bytes.toBytes(arr[5]));
put.add("product".getBytes(),"productid".getBytes(), Bytes.toBytes(arr[6]));
put.add("sale".getBytes(),"unit".getBytes(), Bytes.toBytes(arr[7]));
put.add("profit".getBytes(),"total".getBytes(), Bytes.toBytes(arr[8]));
context.write(new ImmutableBytesWritable(arr[0].getBytes()), put);
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
String table = "sample3";
String input = "/home/impadmin/testdata/hbaseolap/olapdata";
String column = "";
conf.set("conf.column", column);
Job job = new Job(conf, "Import from file " + input + " into table "
+ table);
job.setJarByClass(ImportFromFile.class);
job.setMapperClass(ImportMapper.class);
job.setOutputFormatClass(TableOutputFormat.class);
job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, table);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(Writable.class);
job.setNumReduceTasks(0);
FileInputFormat.addInputPath(job, new Path(input));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
After running this, file is imported in hbase table.
Cheers
Geetanjali
For any query please drop a comment......
Let us start by creating a hbase table:
We have a table 'sample3 '
create 'sample3','region','time','product','sale','profit'
Here 'region','time','product','sale','profit' are different colfamily of the table
Structue of the table is :
hbase(main):043:0> describe 'sample3'
DESCRIPTION ENABLED
{NAME => 'sample3', FAMILIES => [{NAME => 'product', BLOOMFILTER => 'NONE', REPLICATION_SCOPE => '0', COMPRESSION => 'NONE', VERSIONS => '3', TTL => '2 true
147483647', BLOCKSIZE => '65536', IN_MEMORY => 'false', BLOCKCACHE => 'true'}, {NAME => 'profit', BLOOMFILTER => 'NONE', REPLICATION_SCOPE => '0', COMP
RESSION => 'NONE', VERSIONS => '3', TTL => '2147483647', BLOCKSIZE => '65536', IN_MEMORY => 'false', BLOCKCACHE => 'true'}, {NAME => 'region', BLOOMFIL
TER => 'NONE', REPLICATION_SCOPE => '0', COMPRESSION => 'NONE', VERSIONS => '3', TTL => '2147483647', BLOCKSIZE => '65536', IN_MEMORY => 'false', BLOCK
CACHE => 'true'}, {NAME => 'sale', BLOOMFILTER => 'NONE', REPLICATION_SCOPE => '0', COMPRESSION => 'NONE', VERSIONS => '3', TTL => '2147483647', BLOCKS
IZE => '65536', IN_MEMORY => 'false', BLOCKCACHE => 'true'}, {NAME => 'time', BLOOMFILTER => 'NONE', REPLICATION_SCOPE => '0', COMPRESSION => 'NONE', V
ERSIONS => '3', TTL => '2147483647', BLOCKSIZE => '65536', IN_MEMORY => 'false', BLOCKCACHE => 'true'}]} e
Let us have a sample tsv(Tab seperated file) having sample data like shown below:
1 India Haryana Chandigarh 2009 April P1 1 5
2 India Haryana Ambala 2009 May P1 2 10
3 India Haryana Panipat 2010 June P2 3 15
4 United States California Fresno 2009 April P2 2 5
5 United States California Long Beach 2010 July P2 4 10
6 United States California San Fransico 2011 August P1 6 20
Note: Place this file in HDFS
Let us have a look at the map reduce job code:
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
public class ImportFromFile {
static class ImportMapper extends
Mapper<LongWritable, Text, ImmutableBytesWritable, Writable> {
@Override
public void map(LongWritable offset, Text line, Context context)
throws IOException {
try {
String lineString = line.toString();
String[] arr = lineString.split("\t");
//Apply boundary checks according to your tsv file
Put put = new Put(arr[0].getBytes());
put.add("region".getBytes(), "country".getBytes(), Bytes.toBytes(arr[1]));
put.add("region".getBytes(),"state".getBytes(), Bytes.toBytes(arr[2]));
put.add("region".getBytes(),"city".getBytes(), Bytes.toBytes(arr[3]));
put.add("time".getBytes(),"year".getBytes(), Bytes.toBytes(arr[4]));
put.add("time".getBytes(),"month".getBytes(), Bytes.toBytes(arr[5]));
put.add("product".getBytes(),"productid".getBytes(), Bytes.toBytes(arr[6]));
put.add("sale".getBytes(),"unit".getBytes(), Bytes.toBytes(arr[7]));
put.add("profit".getBytes(),"total".getBytes(), Bytes.toBytes(arr[8]));
context.write(new ImmutableBytesWritable(arr[0].getBytes()), put);
} catch (Exception e) {
e.printStackTrace();
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
String table = "sample3";
String input = "/home/impadmin/testdata/hbaseolap/olapdata";
String column = "";
conf.set("conf.column", column);
Job job = new Job(conf, "Import from file " + input + " into table "
+ table);
job.setJarByClass(ImportFromFile.class);
job.setMapperClass(ImportMapper.class);
job.setOutputFormatClass(TableOutputFormat.class);
job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, table);
job.setOutputKeyClass(ImmutableBytesWritable.class);
job.setOutputValueClass(Writable.class);
job.setNumReduceTasks(0);
FileInputFormat.addInputPath(job, new Path(input));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
After running this, file is imported in hbase table.
Cheers
Geetanjali
For any query please drop a comment......