顯示具有 CSV 標籤的文章。 顯示所有文章
顯示具有 CSV 標籤的文章。 顯示所有文章

2014年9月30日 星期二

Import CSV file to HBASE(Using Jar File)

[Software]
    Hadoop2.4.1
    Eclipse IDE for Java Developers Luna Release (4.4.0)
    HBase0.98.5

Reference:
- http://hbase.apache.org/xref/org/apache/hadoop/hbase/mapreduce/SampleUploader.html

Step:
- Create Table
$hbase shell
hbase> create 'TEST3','m0','m1','m2','m3','m4','m5','m6','m7','m8','m9','m10','m11','m12','m13','m14','m15'
- Input File in HDFS
- log file with:
- 98 columns
- 1  timestamp
- 1  ??
- 16 monitored servers( 6 info per server)
- Run the "HBimporttsv_v2.jar" to insert log file to HBase table
$ hadoop jar Downloads/HBimporttsv_v2.jar HBimporttsv.Hbaseimporttsv /user/hduser/test.log2 "TEST3"

[Code]
where Hbaseimporttsv.java is:

/*
 * This program is the operation of importing csv file into HBase
 *
 */
package HBimporttsv;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.io.*;
import org.apache.hadoop.hbase.mapreduce.*;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.*;
//import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapreduce.*;
import org.apache.hadoop.util.GenericOptionsParser;

public class Hbaseimporttsv {
private static final String NAME = "SampleUploader";
public static int NUM_OF_SERVER = 16;// number of monitored server
public  static int NUM_OF_VAR = 6;// number of info per server
//public static String[]  VAR = {"var1","var2","var3","var4","var5","var6"};//server info type
public static String[] VAR = {
"Tj.00","Cal Tj.00","Tc.00","DutV00","DutA00","ErrCode00",
"Tj.01","Cal Tj.01","Tc.01","DutV01","DutA01","ErrCode01",
"Tj.02","Cal Tj.02","Tc.02","DutV02","DutA02","ErrCode02",
"Tj.03","Cal Tj.03","Tc.03","DutV03","DutA03","ErrCode03",
"Tj.04","Cal Tj.04","Tc.04","DutV04","DutA04","ErrCode04",
"Tj.05","Cal Tj.05","Tc.05","DutV05","DutA05","ErrCode05",
"Tj.06","Cal Tj.06","Tc.06","DutV06","DutA06","ErrCode06",
"Tj.07","Cal Tj.07","Tc.07","DutV07","DutA07","ErrCode07",
"Tj.08","Cal Tj.08","Tc.08","DutV08","DutA08","ErrCode08",
"Tj.09","Cal Tj.09","Tc.09","DutV09","DutA09","ErrCode08",
"Tj.10","Cal Tj.10","Tc.10","DutV10","DutA10","ErrCode10",
"Tj.11","Cal Tj.11","Tc.11","DutV11","DutA11","ErrCode11",
"Tj.12","Cal Tj.12","Tc.12","DutV12","DutA12","ErrCode12",
"Tj.13","Cal Tj.13","Tc.13","DutV13","DutA13","ErrCode13",
"Tj.14","Cal Tj.14","Tc.14","DutV14","DutA14","ErrCode14",
"Tj.15","Cal Tj.15","Tc.15","DutV15","DutA15","ErrCode15",
};

public static int NUM_OF_TOTAL_COLUMNS = 98;

static class Uploader
    extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {

      //private long checkpoint = 100;
      //private long count = 0;

      @Override
      public void map(LongWritable key, Text line, Context context)
      throws IOException {
        // Input is a CSV file
        // Split CSV line
    // Ex:
    // Input: 14/09/15 18:20:35, Z00 B00 ,50.5,53.26,53.45,1251.06,291.25,FF
    // Output: values[0]="14/09/15 18:20:35", values[1]="Z00 B00",
    //         values[2]="50.5", values[3]="53.26", values[4]="53.45",
    //         values[5]="1251.06", values[6]="291.25", values[7]="FF"
    String [] values = line.toString().split(",");
        if(values.length != NUM_OF_TOTAL_COLUMNS)
          return;

        // Extract values[0] >> timestamp
        byte [] timestamp = Bytes.toBytes(values[0]);
     
        // Extract values[1] >> ??
        //byte [] ?? = Bytes.toBytes(values[1]);
     
        // Create Put
        Put put = new Put(timestamp);//Using first row(timestamp) as ROW_KEY
        //int var_index = 2; // server info star from values[2]
        for(int j = 0; j< NUM_OF_SERVER;j++){
        for(int i = 0; i< NUM_OF_VAR; i++){
            put.add(Bytes.toBytes("m"+j), // Column Family name
            Bytes.toBytes(VAR[(j*NUM_OF_VAR)+i]), // Column name
            Bytes.toBytes(values[2+(j*NUM_OF_VAR)+i])); // Value
            }
        }

        // Uncomment below to disable WAL. This will improve performance but means
        // you will experience data loss in the case of a RegionServer crash.
        // put.setWriteToWAL(false);

        try {
          context.write(new ImmutableBytesWritable(timestamp), put);
        } catch (InterruptedException e) {
          e.printStackTrace();
        }
     
        /*
        // Set status every checkpoint lines
        if(++count % checkpoint == 0) {
          context.setStatus("Emitting Put " + count);
        }
        */
      }
    }

public static Job configureJob(Configuration conf, String [] args)
  throws IOException {
    Path inputPath = new Path(args[0]); // input path
    String tableName = args[1]; // Table name which is already in Database
    Job job = new Job(conf, NAME + "_" + tableName);
    job.setJarByClass(Uploader.class);
    FileInputFormat.setInputPaths(job, inputPath);
    job.setInputFormatClass(TextInputFormat.class);
    job.setMapperClass(Uploader.class);
    // No reducers.  Just write straight to table.  Call initTableReducerJob
    // because it sets up the TableOutputFormat. And Output write to table
    TableMapReduceUtil.initTableReducerJob(tableName, null, job);
    job.setNumReduceTasks(0);
    return job;
  }

public static void main(String[] args) throws Exception{
Configuration conf = HBaseConfiguration.create();
String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
if(otherArgs.length !=2){
System.err.println("Wrong number of arguments:"+ otherArgs.length);
System.err.println("Usage:"+ NAME + " <input> <tablename>");
System.exit(-1);
}
Job job = configureJob(conf, otherArgs);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}


Check Result:
$hbase shell
hbase>  get 'TEST3','14/09/16 06:35:38'

Import CSV file to HBASE(Using HBase Shell Command)

[Software]
    Hadoop2.4.1
    HBase 0.98.5
[Reference]
- http://www.openscg.com/2013/08/hadoop-hbase-tutorial/  (Operations)
- http://wiki.apache.org/hadoop/Hbase/Shell  (HBase shell command)


Input type:

14/09/15 18:20:35, Z00 B00 ,0050.50     ,0053.26     ,0053.45     ,1251.06     ,0291.25      ,FF     
14/09/15 18:20:35, Z00 B01 ,0053.50     ,0055.80     ,0056.03     ,1249.79     ,0357.45      ,FF     
.......

Table:

|         |  type   |                                               m1                                    |
------------------------------------------------------------------------------------
HBASE_ROW_KEY     | states      |   deg      |  high      |  heat      | lenght    |   avg      |char|
------------------------------------------------------------------------------------
14/09/15 18:20:35          | Z00 B00 | 0050.50 | 0053.26 | 0053.45 | 1251.06 | 0291.25 | FF |     
14/09/15 18:20:35          | Z00 B01 | 0053.50 | 0055.80 | 0056.03 | 1249.79 | 0357.45 | FF |   
.......

Step:
$hbase shell
> create 'log_data', 'type','m1'  //建立"log_data",其中包含兩個 column family, "type" and "m1"
> quit

$hbase org.apache.hadoop.hbase.mapreduce.ImportTsv '-Dimporttsv.separator=,' -Dimporttsv.columns=HBASE_ROW_KEY,type:states,m1:deg,m1:high,m1:heat,m1:length,m1:avg,m1:char log_data /user/hduser/test_log.csv

- org.apache.hadoop.hbase.mapreduce.ImportTsv 
執行 hbase-server-${version}-hadoop2.jar 中的 ImportTsv Class,這讓HBASE可以載入csv格式的data
- '-Dimporttsv.separator=,' 
讓HBase知道每行資料值的分隔界線為","
- -Dimporttsv.columns 
設定Columns Family(在hbase shell建立的'type'與'm1'),至少要有一個HBASE_ROW_KEY來當row key,
column格式則為 "columnfamilyname:columnname" ex: "m1:deg"
- log_data
此arg為input table name (即於hbase shell中建立的 "log_data")
- /user/hduser/test_log.csv
此arg為input file name ,對應位置為與HBASE連結的HDFS

$hbase shell
> scan 'log_data' // 查看輸入資料的table

[Future Work]
1. 尚未對完整log包含後面刪除欄位做輸入
2. 透過其他更簡便的介面或程式碼做Input