2014年10月31日 星期五

Grep String include Tab charactor

如果要grep 一份文件裡面的字串,而這字串有包含tab這種分隔符號,
可以用[^I]來表示他

ex: 要查詢 doc.txt 中"I    love    you"  (中間以tab分隔開) 這種字串

     可用  $ cat doc.txt | grep 'I[^I]love[^I]you'

2014年10月21日 星期二

Binary Log File (hex) to TSV

/*
*  This program transfer Binary File to TSV File(using tab for column spliting)
*  Usage:
* $ scalac ProberLogTSV.scala
* $ scala -classpath . ProberLogTSV /home/hduser/log00001.dat
*  Result:
*       $ cat /home/hduser/testfile.tsv
*
*/
import scala.io.Source
import scala.io.Codec

import java.nio.charset.CodingErrorAction
import java.nio.file.{Paths, Files}
import java.nio.charset.StandardCharsets
import java.io._
import java.lang._

object PorberLogtoTSV {
def main(args: Array[String]) {

//val logFileDir   = args(0)
//val totalFiles   = java.lang.Integerargs.getInteger(1)
//var fileDirArray = new Array[String](totalFiles)
val logFile = args(0)

// Convert Binary File(ex: hex file) to ASCII String with lines
val logData = binaryToString(logFile)

// Convert String to Tab File type
val tabFile = convertToTabFile(logData)

Files.write(Paths.get("/home/hduser/testfile.tsv"), tabFile.getBytes(StandardCharsets.UTF_8))
    //print(tabFile)
}

def binaryToString(logFile: String) = {
val src = scala.io.Source.fromFile(logFile, "ISO-8859-1")
val charArray = src.toArray // Translate Source to Char Array
src.close()
var strBuf = new StringBuffer()
for(i <- 0 to charArray.length-1){
if(charArray(i).equals('\0')){ // Replace '\0' to '\n' for split lines
charArray(i) = '\n'
}
strBuf.append(charArray(i))
}
strBuf.toString
}

def convertToTabFile(logData : String) = {
val lines:Array[String] = logData.split(System.getProperty("line.separator"))
for(i <- 0 until lines.length){
var charArray = lines(i).toCharArray()
var count = 1
for(j <- 0 to charArray.length-1){
if(charArray(j).equals(' ') && count <=3){
charArray(j) = '\t'
count+=1
}
}
lines(i) = new String(charArray)
}
var strBuf = new StringBuffer()
for(str <- lines){
strBuf.append(str).append('\n')
}
strBuf.toString()
}

}

2014年10月16日 星期四

Hive output result to file

[Problem]
欲將Hive計算或收集完的結果,輸出至 HDFS 或是 Local Disk

[Method]  INSERT OVERWRITE
Reference:
http://stackoverflow.com/questions/18129581/how-do-i-output-the-results-of-a-hiveql-query-to-csv

ex :(insert overwrite directory 預設是對應輸出到HDFS的資料路徑)
hive> insert overwrite directory '/user/hduser/temp' select Avg(times) from hivetableerr;

Result:
$ hdfs dfs -cat /user/hduser/temp/000000_0
47.0

如果要輸出到Local Disk則使用

hive> insert overwrite directory local '/home/hduser/temp' select Avg(times) from hivetableerr;

另一個輸出到 Local Disk的方法,不須在Hive shell中,可直接透過 bash command來執行

$ hive -e 'select Avg(times) from hivetableerr;' > /home/hduser/temp


SQL Function


可透過對Hive 下一些 SQL function
去對HBase做一些計算例如AVG()

ex: 

hive>  select AVG(times) from hivetableerr;


[Result]

Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1
2014-10-16 15:45:47,503 Stage-1 map = 0%,  reduce = 0%
2014-10-16 15:46:05,165 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 2.19 sec
2014-10-16 15:46:19,847 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 3.49 sec
MapReduce Total cumulative CPU time: 3 seconds 490 msec
Ended Job = job_1412160247541_0074
MapReduce Jobs Launched:
Job 0: Map: 1  Reduce: 1   Cumulative CPU: 3.49 sec   HDFS Read: 255 HDFS Write: 5 SUCCESS
Total MapReduce CPU Time Spent: 3 seconds 490 msec
OK
47.0
Time taken: 44.88 seconds, Fetched: 1 row(s)

HBase Select with Join with Hive

[Problem]
當我們在HBase上創建了兩張Table如下,我們希望透過這兩張表找到Mary的錯誤次數(errorInfo.times)

"hivetable" 
cf
RowKey
id
id2
Jack
1423
Mary
1425
1745

 "hivetableErr"
errorInfo
RowKey
times
1423
43
1425
51


[Method]
Step1: 建立兩張Hive table與上面兩張Hbase table連結
create "hivetable"
create external table hivetable(name int, id int, id2 int)
stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
with serdeproperties("hbase.columns.mapping" = "cf:id, cf,id2")
tblproperties("hbase.table.name" = "hivetable");

create "hivetableerr"
create external table hivetableerr(id int, times int)
stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
with serdeproperties("hbase.columns.mapping" = "errorInfo:times")
tblproperties("hbase.table.name" = "hivetableErr");

查找指令如下

hive > select times from hivetableerr join hivetable on(hivetable.id = hivetableerr.id) where hivetable.key = "Mary";

SELECT FROM: 從 hivetableerr中找 times
JOIN ON: join兩張表格,將 hivetableerr 中的 id 對映到 hivetable中的 id
WHERE: 設定要查找錯誤次數的人為Mary

[Result]

Total jobs = 1
14/10/16 14:41:01 WARN conf.Configuration: file:/tmp/hduser/hive_2014-10-16_14-40-59_385_3743927975833041-1/-local-10006/jobconf.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.retry.interval;  Ignoring.
14/10/16 14:41:01 WARN conf.Configuration: file:/tmp/hduser/hive_2014-10-16_14-40-59_385_3743927975833041-1/-local-10006/jobconf.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.attempts;  Ignoring.
Execution log at: /tmp/hduser/hduser_20141016144040_963a7821-4ec5-4343-9a53-fc4a413057c1.log
2014-10-16 02:41:02     Starting to launch local task to process map join;      maximum memory = 477102080
2014-10-16 02:41:04     Dump the side-table into file: file:/tmp/hduser/hive_2014-10-16_14-40-59_385_3743927975833041-1/-local-10003/HashTable-Stage-3/MapJoin-mapfile10--.hashtable
2014-10-16 02:41:04     Uploaded 1 File to: file:/tmp/hduser/hive_2014-10-16_14-40-59_385_3743927975833041-1/-local-10003/HashTable-Stage-3/MapJoin-mapfile10--.hashtable (282 bytes)
2014-10-16 02:41:04     End of local task; Time Taken: 1.425 sec.
Execution completed successfully
MapredLocal task succeeded
Launching Job 1 out of 1
Number of reduce tasks is set to 0 since there's no reduce operator
Starting Job = job_1412160247541_0072, Tracking URL = http://master:8088/proxy/application_1412160247541_0072/
Kill Command = /usr/local/hadoop/bin/hadoop job  -kill job_1412160247541_0072
Hadoop job information for Stage-3: number of mappers: 1; number of reducers: 0
2014-10-16 14:41:17,420 Stage-3 map = 0%,  reduce = 0%
2014-10-16 14:41:25,978 Stage-3 map = 100%,  reduce = 0%, Cumulative CPU 1.99 sec
MapReduce Total cumulative CPU time: 1 seconds 990 msec
Ended Job = job_1412160247541_0072
MapReduce Jobs Launched:
Job 0: Map: 1   Cumulative CPU: 1.99 sec   HDFS Read: 258 HDFS Write: 3 SUCCESS
Total MapReduce CPU Time Spent: 1 seconds 990 msec
OK
51
Time taken: 27.719 seconds, Fetched: 1 row(s)

2014年10月15日 星期三

HIve connect to HBase Table

[Problem]
我們已知如何利用 Hive 在 HBase 上創建 Table
但如果想要直接從Hive連接到 HBase上已經存在的 Table呢?



[Table Connection]
Reference: http://item.iqadd.com/item/hive-hbase-integration

Hive Code
CREATE EXTERNAL TABLE hivetable(key String, value int)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES("hbase.columns.mapping" = "cf:id")
TBLPROPERTIES("hbase.table.name" = "hivetable");

解釋

CREATE EXTERNAL TABLE hive_table(key String, value int)
創建一個外部(External) Table名叫"hivetable",意即Table本身在別的Database,Hive只存此Table的MetaData並與外部Database中的Table直接連結操作。

STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
描述上個指令是透過Hive所提供 HBase的 Storage Handler來操作HBase,當然Hive也提供了其他database以及FileSystem的 Storage Handler

WITH SERDEPROPERTIES("hbase.columns.mapping" = "cf:id")
取得Table上特定的資料範圍(可選定特定欄位、起始、Filter),以此行指令為例,即此Hive table只收集HBase table上的 cf:id 欄位(取得多個欄位ex: "cf:id, cf:id2, cf2:name" ...)

TBLPROPERTIES("hbase.table.name" = "hivetable");
指定要連結的HBase Table Name,此例為連結到HBase上的"hivetable"

如此一來,只要HBase上這張"hivetable"有資料上的變化,可以直接從Hive中 "hive_table"觀察到變化

2014年10月13日 星期一

Java TCP Server receive Linux nc command

[Problem]

Linux 的 nc 指令(netcat)可透過 TCP/UDP 來傳送資料或訊息,但我目前無法使用 nc 或其他Linux指令來創造一個MultiThread TCP Server,僅能透過 nc 做一對一的傳送資料
如  Put Data into Remote HBase by using Linux Bash Script ,因此改用Java 寫一個MultiThreaded Server

------------------------------------------------------------------------------------------------------------
[Concept]


Reference:
How to write RAW data to a file using Java? e.g same as: nc -l 8000 > capture.raw
Multithreaded Server in Java
-------------------------------------------------------------------------------------------------------------
[Code]

tcpserver.java  (Main Class)

package tcpserver;

public class tcpserver {
public static void main(String args[]){
MultiThreadedServer server = new MultiThreadedServer(9075);
System.out.println("***************************");
System.out.println("** TCP Collection Server **");
System.out.println("***************************");
new Thread(server).start();
}
}

MultiThreadedServer.java

package tcpserver;

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;

public class MultiThreadedServer implements Runnable{
protected int          serverPort   = 50200;
    protected ServerSocket serverSocket = null;
    protected boolean      isStopped    = false;
    protected Thread       runningThread= null;

    public MultiThreadedServer(int port){
        this.serverPort = port;
        System.out.println("System port setting : " + port);
    }

    public void run(){
     
    synchronized(this){
            this.runningThread = Thread.currentThread();  //Avoid another same Server(?)
        }
   
        openServerSocket();
     
        System.out.println("[INFO] Start Listening!");
        while(!isStopped()){
            Socket clientSocket = null;
            try {
                clientSocket = this.serverSocket.accept(); //wait client
            } catch (IOException e) {
                if(isStopped()) {
                    System.out.println("[ERROR] Server Stopped.") ;
                    return;
                }
                throw new RuntimeException("[ERROR] Error accepting client connection", e);
            }
            // Create a New Thread for client
            new Thread(
            new WorkerRunnable(clientSocket)
            ).start();
        }
        System.out.println("Server Stopped.") ;
    }


    private synchronized boolean isStopped() {
        return this.isStopped;
    }

    public synchronized void stop(){
        this.isStopped = true;
        try {
            this.serverSocket.close();
        } catch (IOException e) {
            throw new RuntimeException("Error closing server", e);
        }
    }

    private void openServerSocket() {
        try {
            this.serverSocket = new ServerSocket(this.serverPort);
            System.out.println("[INFO] Create a ServerSocket");
        } catch (IOException e) {
            throw new RuntimeException("Cannot open port 8080", e);
        }
    }
}

WorkerRunnable.java

package tcpserver;

import java.io.*;
import java.net.*;

public class WorkerRunnable implements Runnable{
protected Socket clientSocket = null;

public WorkerRunnable(Socket clientSocket){
this.clientSocket = clientSocket;
}

public void run(){
try{
byte[] buff = new byte[1024];
int bytes_read=0;
InputStream data = clientSocket.getInputStream(); // get input data
while(( bytes_read = data.read(buff)) != -1){     // print out
System.out.println(new String(buff));
//String str = new String(buff);
}
data.close();
}
catch(IOException e){
e.printStackTrace();
}
}
}

-------------------------------------------------------------------------------------------------------------
[Result]

2014年10月12日 星期日

Put Data into Remote HBase by using Linux Bash Script


1. Start a TCP listening Server on 192.168.0.7:5002, 
and cast received info to HBase Shell

start_colleciton.sh 
while true;
do nc -l 192.168.0.7 5002
done | hbase shell

[程式意義]
nc -l : 會在 IP:192.168.0.7 開啟一個 port:5002,來 listen 任何發送到此 port的TCP封包,
(若使用 nc -lu 則是 listen UDP 封包)
while true; ... do ... done : 因為TCP處理一次後就會關掉,必須加上此指令來持續接收,
| hbase shell : 將 nc 指令 listen到的訊息(HBase Shell Operations) 丟給 hbase shell做

--------------------------------------------------------------------------------------------------------------------
2. Send Info to Server

tcollector.sh
#!/bin/bash
set -e
while true;
do awk -v now=`date +%s` \
'{ print "put " $1", " now", " $2", " $3}' HBase_Info
sleep 15
done | nc -w 30 192.168.0.7 5002

[程式意義]
set -e : 檢查 pipe執行( | ) 如果其中有例外則結束整個 pipe

awk -v now=`date +%s` \
'{ print "put " $1", " now", " $2", " $3}' HBase_Info
    - 設定變數 now 為現在時間(date+%s ==> 取到秒)
    - 將HBase_Info 此檔案內容一行一行讀出,
                  第一欄為 $1= 'hiveTableonHB'
                  第二欄為 $2= 'cf:time'
                  第三欄為 $3= 'test'

nc -w 30 192.168.0.7 5002 : 將上述awk 指令中  print 的 內容傳送到 192.168.0.7:5002上

HBase_Info
'hiveTableonHB' 'cf:time' 'test'

--------------------------------------------------------------------------------------------------------------------
[RESULT]

1.8.7-p357 :004 > scan 'hiveTableonHB'
ROW                      COLUMN+CELL
 1413177758              column=cf:time, timestamp=1413177976111, value=test
 1413180302              column=cf:time, timestamp=1413180310949, value=test
 1413180317              column=cf:time, timestamp=1413180324753, value=test
6 row(s) in 0.0270 seconds

2014年10月7日 星期二

實作HBase 操作介面

[尚須改進]
- 使用 Pool connection
- 加入 Spark 計算


功能:
0. 操作選項清單 (HBaseGet.java)
[已有功能]
- 可選擇計算或查詢
- 指令核對,錯誤指令將重新詢問
- 可選擇是否結束

[期望改進]
- 導入圖形介面(選單、參數方塊...等)

1. 計算         (CountJob.java)
[基本功能]
- 自行選擇Table, Column, 及輸出位置, row range尚須手動修改code
- 自動檢驗輸入的Table name, Column Family Name, Column Name

[期望改進]

- 可計算:
- Average
- 輸出結果 文字檔 至 HDFS

[期望改進]
1. conf.set("mapred.jar", ...) 因為要local端位址,還要想辦法修改(是否可自動?)
2. 把 平均或標準差的計算再拉到另外class

2. 查詢         (ScanHTable.java)
[已有功能]
      - 可設定簡單filter

      [期望改進]
      - filter 值得輸入方式

3. Create Table
     [已有功能]
     - 可手動輸入 tablename, cf

    [期望改進]
    - 自動創建大量cf之表格

4. Delete Table
     [已有功能]
     - 可手動輸入 tablename, cf

    [期望改進]
    - 是否要獨立disable?

5. List Table
     [已有功能]
     - 列出現存table name


* 6. Import Data to Table
*- Online
- Offline (HBaseimporttsv_v4.jar)

2014年10月1日 星期三

Java compare part of String method

Java 中判斷 String 中是否包含 subString,意即比對部分字串
比如要判斷Sting "ERROR543"中,是否包含 "ERROR"

可用

public class test {
public static void main(String[] args){

String a = "ERROR543";
if(a.indexOf("ERRO") != -1)
System.out.println("yes");
else
System.out.println("NO");
}
}

Eclipse with Hadoop plugin

[Reference]
https://github.com/winghc/hadoop2x-eclipse-plugin  (package download)
http://blog.jourk.com/compile-hadoop-eclipse-plugin-2-4-1-plugin.html  (method)
http://blog.csdn.net/yueritian/article/details/23868175  (Debug)


Software:
eclpise:  eclipse-java-luna-R-linux-gtk-x86_64.tar.gz
Hadoop: 2.4.1
#Because Hadoop2.4.1 doesn't provide the plugin for eclipse, so we need to create one
Plugin create:
Download: https://github.com/winghc/hadoop2x-eclipse-plugin
1.Modify: 
(this tool now(2014/09/15)is set for hadoop2.2.0 version, so it need to be modify for 2.4.1v.)
[Change 1]
  Find ~/hadoop2x-eclipse-plungin-master/src/contrib/eclipse-plugin/build.xml
  Add commons-collections-3.2.1.jar setting :
...
<copy file="${hadoop.home}/share/hadoop/common/lib/commons-collections-${commons-collections.version}.jar" todir="${build.dir}/lib" verbose="true"/>
<!--     Add this Line         -->
<copy file="${hadoop.home}/share/hadoop/common/lib/commons-collections-${commons-collections.version}.jar" todir="${build.dir}/lib" verbose="true"/>
<!--                           -->
<copy file="${hadoop.home}/share/hadoop/common/lib/commons-collections-${commons-collections.version}.jar" todir="${build.dir}/lib" verbose="true"/>
...

    Change
lib/commons-lang-2.5.jar
 to
        lib/commons-lang-2.6.jar

 Add
lib/commons-collections-${commons-collections.version}.jar, 
 
[Change 2]
Find ~/hadoop2x-eclipse-plungin-master/ivy/libraries.properties
Add
commons-collections.version=3.2.1

2.Jar file
$cd src/contrib/eclipse-plugin
$ant jar -Dversion=2.4.1 -Declipse.home=/home/hdp2/eclipse -Dhadoop.home=/home/hdp2/hadoop-2.4.1

Then the jar file will appear in the ~/hadoop2x-eclipse-plungin-master/build/contrib/eclipse-plugin

# -Dversion : the installed Hadoop version
  -Declipse.home : the dir of ECLIPSE_HOME
  -Dhadoop.home : the dir of HADOOP_HOME

3. Move the plugin to eclipse/plugins
$cp build/contrib/eclipse-plugin/hadoop-eclipse-plugin.2.4.1.jar /home/hdp2/eclipse/plugins

4. Start eclipse with debug parameter:
$ eclipse/eclipse -clean -consolelog -debug

[備註] 此指令很重要,若eclipse開啟後點選 map/reduce location 沒反應,可以從terminal去看error info!

5. Eclipse開啟後
- Windows > Open Perspective > Other > Map/Reduce
- Windows > Show View > Other > MapReduce Tools > Map/Reduce Locations
- 點選右下角藍色大象 > 設定Hadoop server Location
[備註] 若點選大象時,沒跳出設定畫面,請查詢terminal上面錯誤訊息為何

- Setting:
Location name : master
MapReduce Master >>  Host: 192.168.0.7  Port: 8032
DFS Master     >>  Host:(use M/R Master host) Port:9000
User name : hduser (hadoop user name)

Eclipse Submit Job from Hadoop Client to remote Hadoop Master

[Software]
    Hadoop2.4.1
    Eclipse IDE for Java Developers Luna Release (4.4.0)
[Problem]
If you want to submit job from local side to remote Hadoop server through runngin java application on eclipse directly without send jar file to Hadoop server(ex. eclipce on 192.168.0.51 and Hadoop master on 192.168.0.7) 

1. Point to the jar file which will be create by the following step
add following code to your which set conf. to point to the jar file which will be create by the following step

conf.set("mapred.jar", "/home/hdp2/workspace/HBaseGet/HbaseGet_v3.jar");

[Reference]
http://stackoverflow.com/questions/21793565/class-not-found-exception-in-eclipse-wordcount-program

[NOTICE 1]
The "/home/hdp2/workspace/HBaseGet/HbaseGet_v3.jar" is the jar file location at local side

[NOTICE 2]
If you don't do this step, Eclipse may occur following error:
[Error]
... Class org.apache....  Map not found ...


2. Set yarn master locaiton
 add following to " yarn-site.xml "
<property>  
<name>yarn.resourcemanager.address</name>  
<value>master:8032</value>  
</property>  
<property>  
<name>yarn.resourcemanager.scheduler.address</name>  
<value>master:8030</value>  
</property>  
<property>  
<name>yarn.resourcemanager.resource-tracker.address</name>  
<value>master:8031</value>  
</property>  

3. Set Configuration for Client Side
 Select "Run Configuration" > "Classpath" > "Advanced" > "Add External Folder"  > Select the Hadoop and HBase conf folder(ex. $HADOOP_HOME/etc/hadoop and $HBASE_HOME/conf)

[IMPORTANT !]
Conf settings in Both Hadoop and HBase conf folder MUST be consistent  with the conf in the Hadoop and HBase master!!
# Simple method is copy the conf folder from  master to local

[Notice]
Because the Eclipse is running locally without the correct server configurations,
so if you don't do this step, it will occur following errors:
[Error 1]
Opening socket connection to server localhost/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
[Error 2]
"eclipse"  ... Retrying connect to server: 0.0.0.0/0.0.0.0:8032 ....

4. Export Jar File
"File" > "Export..." > "Java" > "JAR file" > select resource to export > "Finish"

5. Run Application
"Run Configuration" > Set "Arguments" > "Run"


(6. Staging problem ( Multiple users in Hadoop )
If you are the Hadoop client not master(ex. client_username="hdp2", Hadoop_username="hduser"), it may occur error when execute app like:

2012-10-09 10:06:31,233 ERROR org.apache.hadoop.security.UserGroupInformation: PriviledgedActionException as:root cause:org.apache.hadoop.security.AccessControlException: Permission denied: user=root, access=READ_EXECUTE, inode=”system”:mapred:supergroup:rwx——

[NEED TO DO]
http://amalgjose.wordpress.com/2013/02/09/setting-up-multiple-users-in-hadoop-clusters/
1. Create a new user(hdp2) at Hadoop master server
$sudo adduser hdp2

2. Create a user for client on HDFS
$hdfs dfs -mkdir /user/hdp2

3. Using new username(hdp2) to login Hadoop Master server, and execute any example jar(such as WordCount)
It will create the staging folder of new user in HDFS/tmp/
(Don't know why......)
4. Change priority
$hdfs dfs –chown –R hdp2:hadoop /tmp/hadoop-yarn/staging/hdp2/
$hdfs dfs -chmod 777 /tmp/