Hadoop2.4.1
Eclipse IDE for Java Developers Luna Release (4.4.0)
HBase0.98.5
Reference:
- http://hbase.apache.org/xref/org/apache/hadoop/hbase/mapreduce/SampleUploader.html
Step:
- Create Table
$hbase shell
hbase> create 'TEST3','m0','m1','m2','m3','m4','m5','m6','m7','m8','m9','m10','m11','m12','m13','m14','m15'
- Input File in HDFS
- log file with:
- 98 columns
- 1 timestamp
- 1 ??
- 16 monitored servers( 6 info per server)
- Run the "HBimporttsv_v2.jar" to insert log file to HBase table
$ hadoop jar Downloads/HBimporttsv_v2.jar HBimporttsv.Hbaseimporttsv /user/hduser/test.log2 "TEST3"
[Code]
where Hbaseimporttsv.java is:
/*
* This program is the operation of importing csv file into HBase
*
*/
package HBimporttsv;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.io.*;
import org.apache.hadoop.hbase.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.*;
//import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.util.GenericOptionsParser;
public class Hbaseimporttsv {
private static final String NAME = "SampleUploader";
public static int NUM_OF_SERVER = 16;// number of monitored server
public static int NUM_OF_VAR = 6;// number of info per server
//public static String[] VAR = {"var1","var2","var3","var4","var5","var6"};//server info type
public static String[] VAR = {
"Tj.00","Cal Tj.00","Tc.00","DutV00","DutA00","ErrCode00",
"Tj.01","Cal Tj.01","Tc.01","DutV01","DutA01","ErrCode01",
"Tj.02","Cal Tj.02","Tc.02","DutV02","DutA02","ErrCode02",
"Tj.03","Cal Tj.03","Tc.03","DutV03","DutA03","ErrCode03",
"Tj.04","Cal Tj.04","Tc.04","DutV04","DutA04","ErrCode04",
"Tj.05","Cal Tj.05","Tc.05","DutV05","DutA05","ErrCode05",
"Tj.06","Cal Tj.06","Tc.06","DutV06","DutA06","ErrCode06",
"Tj.07","Cal Tj.07","Tc.07","DutV07","DutA07","ErrCode07",
"Tj.08","Cal Tj.08","Tc.08","DutV08","DutA08","ErrCode08",
"Tj.09","Cal Tj.09","Tc.09","DutV09","DutA09","ErrCode08",
"Tj.10","Cal Tj.10","Tc.10","DutV10","DutA10","ErrCode10",
"Tj.11","Cal Tj.11","Tc.11","DutV11","DutA11","ErrCode11",
"Tj.12","Cal Tj.12","Tc.12","DutV12","DutA12","ErrCode12",
"Tj.13","Cal Tj.13","Tc.13","DutV13","DutA13","ErrCode13",
"Tj.14","Cal Tj.14","Tc.14","DutV14","DutA14","ErrCode14",
"Tj.15","Cal Tj.15","Tc.15","DutV15","DutA15","ErrCode15",
};
public static int NUM_OF_TOTAL_COLUMNS = 98;
static class Uploader
extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
//private long checkpoint = 100;
//private long count = 0;
@Override
public void map(LongWritable key, Text line, Context context)
throws IOException {
// Input is a CSV file
// Split CSV line
// Ex:
// Input: 14/09/15 18:20:35, Z00 B00 ,50.5,53.26,53.45,1251.06,291.25,FF
// Output: values[0]="14/09/15 18:20:35", values[1]="Z00 B00",
// values[2]="50.5", values[3]="53.26", values[4]="53.45",
// values[5]="1251.06", values[6]="291.25", values[7]="FF"
String [] values = line.toString().split(",");
if(values.length != NUM_OF_TOTAL_COLUMNS)
return;
// Extract values[0] >> timestamp
byte [] timestamp = Bytes.toBytes(values[0]);
// Extract values[1] >> ??
//byte [] ?? = Bytes.toBytes(values[1]);
// Create Put
Put put = new Put(timestamp);//Using first row(timestamp) as ROW_KEY
//int var_index = 2; // server info star from values[2]
for(int j = 0; j< NUM_OF_SERVER;j++){
for(int i = 0; i< NUM_OF_VAR; i++){
put.add(Bytes.toBytes("m"+j), // Column Family name
Bytes.toBytes(VAR[(j*NUM_OF_VAR)+i]), // Column name
Bytes.toBytes(values[2+(j*NUM_OF_VAR)+i])); // Value
}
}
// Uncomment below to disable WAL. This will improve performance but means
// you will experience data loss in the case of a RegionServer crash.
// put.setWriteToWAL(false);
try {
context.write(new ImmutableBytesWritable(timestamp), put);
} catch (InterruptedException e) {
e.printStackTrace();
}
/*
// Set status every checkpoint lines
if(++count % checkpoint == 0) {
context.setStatus("Emitting Put " + count);
}
*/
}
}
public static Job configureJob(Configuration conf, String [] args)
throws IOException {
Path inputPath = new Path(args[0]); // input path
String tableName = args[1]; // Table name which is already in Database
Job job = new Job(conf, NAME + "_" + tableName);
job.setJarByClass(Uploader.class);
FileInputFormat.setInputPaths(job, inputPath);
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(Uploader.class);
// No reducers. Just write straight to table. Call initTableReducerJob
// because it sets up the TableOutputFormat. And Output write to table
TableMapReduceUtil.initTableReducerJob(tableName, null, job);
job.setNumReduceTasks(0);
return job;
}
public static void main(String[] args) throws Exception{
Configuration conf = HBaseConfiguration.create();
String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
if(otherArgs.length !=2){
System.err.println("Wrong number of arguments:"+ otherArgs.length);
System.err.println("Usage:"+ NAME + " <input> <tablename>");
System.exit(-1);
}
Job job = configureJob(conf, otherArgs);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
* This program is the operation of importing csv file into HBase
*
*/
package HBimporttsv;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.io.*;
import org.apache.hadoop.hbase.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.*;
//import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.util.GenericOptionsParser;
public class Hbaseimporttsv {
private static final String NAME = "SampleUploader";
public static int NUM_OF_SERVER = 16;// number of monitored server
public static int NUM_OF_VAR = 6;// number of info per server
//public static String[] VAR = {"var1","var2","var3","var4","var5","var6"};//server info type
public static String[] VAR = {
"Tj.00","Cal Tj.00","Tc.00","DutV00","DutA00","ErrCode00",
"Tj.01","Cal Tj.01","Tc.01","DutV01","DutA01","ErrCode01",
"Tj.02","Cal Tj.02","Tc.02","DutV02","DutA02","ErrCode02",
"Tj.03","Cal Tj.03","Tc.03","DutV03","DutA03","ErrCode03",
"Tj.04","Cal Tj.04","Tc.04","DutV04","DutA04","ErrCode04",
"Tj.05","Cal Tj.05","Tc.05","DutV05","DutA05","ErrCode05",
"Tj.06","Cal Tj.06","Tc.06","DutV06","DutA06","ErrCode06",
"Tj.07","Cal Tj.07","Tc.07","DutV07","DutA07","ErrCode07",
"Tj.08","Cal Tj.08","Tc.08","DutV08","DutA08","ErrCode08",
"Tj.09","Cal Tj.09","Tc.09","DutV09","DutA09","ErrCode08",
"Tj.10","Cal Tj.10","Tc.10","DutV10","DutA10","ErrCode10",
"Tj.11","Cal Tj.11","Tc.11","DutV11","DutA11","ErrCode11",
"Tj.12","Cal Tj.12","Tc.12","DutV12","DutA12","ErrCode12",
"Tj.13","Cal Tj.13","Tc.13","DutV13","DutA13","ErrCode13",
"Tj.14","Cal Tj.14","Tc.14","DutV14","DutA14","ErrCode14",
"Tj.15","Cal Tj.15","Tc.15","DutV15","DutA15","ErrCode15",
};
public static int NUM_OF_TOTAL_COLUMNS = 98;
static class Uploader
extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
//private long checkpoint = 100;
//private long count = 0;
@Override
public void map(LongWritable key, Text line, Context context)
throws IOException {
// Input is a CSV file
// Split CSV line
// Ex:
// Input: 14/09/15 18:20:35, Z00 B00 ,50.5,53.26,53.45,1251.06,291.25,FF
// Output: values[0]="14/09/15 18:20:35", values[1]="Z00 B00",
// values[2]="50.5", values[3]="53.26", values[4]="53.45",
// values[5]="1251.06", values[6]="291.25", values[7]="FF"
String [] values = line.toString().split(",");
if(values.length != NUM_OF_TOTAL_COLUMNS)
return;
// Extract values[0] >> timestamp
byte [] timestamp = Bytes.toBytes(values[0]);
// Extract values[1] >> ??
//byte [] ?? = Bytes.toBytes(values[1]);
// Create Put
Put put = new Put(timestamp);//Using first row(timestamp) as ROW_KEY
//int var_index = 2; // server info star from values[2]
for(int j = 0; j< NUM_OF_SERVER;j++){
for(int i = 0; i< NUM_OF_VAR; i++){
put.add(Bytes.toBytes("m"+j), // Column Family name
Bytes.toBytes(VAR[(j*NUM_OF_VAR)+i]), // Column name
Bytes.toBytes(values[2+(j*NUM_OF_VAR)+i])); // Value
}
}
// Uncomment below to disable WAL. This will improve performance but means
// you will experience data loss in the case of a RegionServer crash.
// put.setWriteToWAL(false);
try {
context.write(new ImmutableBytesWritable(timestamp), put);
} catch (InterruptedException e) {
e.printStackTrace();
}
/*
// Set status every checkpoint lines
if(++count % checkpoint == 0) {
context.setStatus("Emitting Put " + count);
}
*/
}
}
public static Job configureJob(Configuration conf, String [] args)
throws IOException {
Path inputPath = new Path(args[0]); // input path
String tableName = args[1]; // Table name which is already in Database
Job job = new Job(conf, NAME + "_" + tableName);
job.setJarByClass(Uploader.class);
FileInputFormat.setInputPaths(job, inputPath);
job.setInputFormatClass(TextInputFormat.class);
job.setMapperClass(Uploader.class);
// No reducers. Just write straight to table. Call initTableReducerJob
// because it sets up the TableOutputFormat. And Output write to table
TableMapReduceUtil.initTableReducerJob(tableName, null, job);
job.setNumReduceTasks(0);
return job;
}
public static void main(String[] args) throws Exception{
Configuration conf = HBaseConfiguration.create();
String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
if(otherArgs.length !=2){
System.err.println("Wrong number of arguments:"+ otherArgs.length);
System.err.println("Usage:"+ NAME + " <input> <tablename>");
System.exit(-1);
}
Job job = configureJob(conf, otherArgs);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
Check Result:
$hbase shell
hbase> get 'TEST3','14/09/16 06:35:38'
沒有留言:
張貼留言