2014年9月30日 星期二

Import CSV file to HBASE(Using Jar File)

[Software]
    Hadoop2.4.1
    Eclipse IDE for Java Developers Luna Release (4.4.0)
    HBase0.98.5

Reference:
- http://hbase.apache.org/xref/org/apache/hadoop/hbase/mapreduce/SampleUploader.html

Step:
- Create Table
$hbase shell
hbase> create 'TEST3','m0','m1','m2','m3','m4','m5','m6','m7','m8','m9','m10','m11','m12','m13','m14','m15'
- Input File in HDFS
- log file with:
- 98 columns
- 1  timestamp
- 1  ??
- 16 monitored servers( 6 info per server)
- Run the "HBimporttsv_v2.jar" to insert log file to HBase table
$ hadoop jar Downloads/HBimporttsv_v2.jar HBimporttsv.Hbaseimporttsv /user/hduser/test.log2 "TEST3"

[Code]
where Hbaseimporttsv.java is:

/*
 * This program is the operation of importing csv file into HBase
 *
 */
package HBimporttsv;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.io.*;
import org.apache.hadoop.hbase.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.*;
//import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.util.GenericOptionsParser;

public class Hbaseimporttsv {
private static final String NAME = "SampleUploader";
public static int NUM_OF_SERVER = 16;// number of monitored server
public  static int NUM_OF_VAR = 6;// number of info per server
//public static String[]  VAR = {"var1","var2","var3","var4","var5","var6"};//server info type
public static String[] VAR = {
"Tj.00","Cal Tj.00","Tc.00","DutV00","DutA00","ErrCode00",
"Tj.01","Cal Tj.01","Tc.01","DutV01","DutA01","ErrCode01",
"Tj.02","Cal Tj.02","Tc.02","DutV02","DutA02","ErrCode02",
"Tj.03","Cal Tj.03","Tc.03","DutV03","DutA03","ErrCode03",
"Tj.04","Cal Tj.04","Tc.04","DutV04","DutA04","ErrCode04",
"Tj.05","Cal Tj.05","Tc.05","DutV05","DutA05","ErrCode05",
"Tj.06","Cal Tj.06","Tc.06","DutV06","DutA06","ErrCode06",
"Tj.07","Cal Tj.07","Tc.07","DutV07","DutA07","ErrCode07",
"Tj.08","Cal Tj.08","Tc.08","DutV08","DutA08","ErrCode08",
"Tj.09","Cal Tj.09","Tc.09","DutV09","DutA09","ErrCode08",
"Tj.10","Cal Tj.10","Tc.10","DutV10","DutA10","ErrCode10",
"Tj.11","Cal Tj.11","Tc.11","DutV11","DutA11","ErrCode11",
"Tj.12","Cal Tj.12","Tc.12","DutV12","DutA12","ErrCode12",
"Tj.13","Cal Tj.13","Tc.13","DutV13","DutA13","ErrCode13",
"Tj.14","Cal Tj.14","Tc.14","DutV14","DutA14","ErrCode14",
"Tj.15","Cal Tj.15","Tc.15","DutV15","DutA15","ErrCode15",
};

public static int NUM_OF_TOTAL_COLUMNS = 98;

static class Uploader
    extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {

      //private long checkpoint = 100;
      //private long count = 0;

      @Override
      public void map(LongWritable key, Text line, Context context)
      throws IOException {
        // Input is a CSV file
        // Split CSV line
    // Ex:
    // Input: 14/09/15 18:20:35, Z00 B00 ,50.5,53.26,53.45,1251.06,291.25,FF
    // Output: values[0]="14/09/15 18:20:35", values[1]="Z00 B00",
    //         values[2]="50.5", values[3]="53.26", values[4]="53.45",
    //         values[5]="1251.06", values[6]="291.25", values[7]="FF"
    String [] values = line.toString().split(",");
        if(values.length != NUM_OF_TOTAL_COLUMNS)
          return;

        // Extract values[0] >> timestamp
        byte [] timestamp = Bytes.toBytes(values[0]);
     
        // Extract values[1] >> ??
        //byte [] ?? = Bytes.toBytes(values[1]);
     
        // Create Put
        Put put = new Put(timestamp);//Using first row(timestamp) as ROW_KEY
        //int var_index = 2; // server info star from values[2]
        for(int j = 0; j< NUM_OF_SERVER;j++){
        for(int i = 0; i< NUM_OF_VAR; i++){
            put.add(Bytes.toBytes("m"+j), // Column Family name
            Bytes.toBytes(VAR[(j*NUM_OF_VAR)+i]), // Column name
            Bytes.toBytes(values[2+(j*NUM_OF_VAR)+i])); // Value
            }
        }

        // Uncomment below to disable WAL. This will improve performance but means
        // you will experience data loss in the case of a RegionServer crash.
        // put.setWriteToWAL(false);

        try {
          context.write(new ImmutableBytesWritable(timestamp), put);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
     
        /*
        // Set status every checkpoint lines
        if(++count % checkpoint == 0) {
          context.setStatus("Emitting Put " + count);
        }
        */
      }
    }

public static Job configureJob(Configuration conf, String [] args)
  throws IOException {
    Path inputPath = new Path(args[0]); // input path
    String tableName = args[1]; // Table name which is already in Database
    Job job = new Job(conf, NAME + "_" + tableName);
    job.setJarByClass(Uploader.class);
    FileInputFormat.setInputPaths(job, inputPath);
    job.setInputFormatClass(TextInputFormat.class);
    job.setMapperClass(Uploader.class);
    // No reducers.  Just write straight to table.  Call initTableReducerJob
    // because it sets up the TableOutputFormat. And Output write to table
    TableMapReduceUtil.initTableReducerJob(tableName, null, job);
    job.setNumReduceTasks(0);
    return job;
  }

public static void main(String[] args) throws Exception{
Configuration conf = HBaseConfiguration.create();
String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
if(otherArgs.length !=2){
System.err.println("Wrong number of arguments:"+ otherArgs.length);
System.err.println("Usage:"+ NAME + " <input> <tablename>");
System.exit(-1);
}
Job job = configureJob(conf, otherArgs);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}


Check Result:
$hbase shell
hbase>  get 'TEST3','14/09/16 06:35:38'

沒有留言:

張貼留言