2014年12月11日 星期四

Hadoop Sequence file 看起來怪怪的?

有時候,透過 Flume 傳送file (使用spool soruce) 給 HDFS 儲存,內容如下:
a.txt
a file is here
b.txt
b file is here

當查看 HDFS上的 Hadoop squence file 時
使用
$ hdfs dfs -cat <SEQUENCEFILE>
 顯示

SEQ!org.apache.hadoop.io.LongWritableorg.apache.hadoop.io.TextK▒*▒▒     ▒▒▒▒͇J<▒▒a file is herJ<▒▒b file is here


這是因為存的是 sequence file ,所以前頭會顯示"SEQ" 而"!org.apache.hadoop.io.LongWritableorg.apache.hadoop.io.Text" 則是包含此sequence data中,key/value的 class type。

如果想要看到文字內容,可使用
$ hdfs dfs -text <SEQUENCEFILE>

1418355275278   a file is here
1418355275280   b file is here


如果下載下來(仍然是squence , 必須透過一些轉換指令才能觀看例如: strings, od .. 等)
$ hdfs dfs -get <SEQUENCEFILE>
$ strings <SEQUENCEFILE>

!org.apache.hadoop.io.LongWritable
org.apache.hadoop.io.Text
a file is here
b file is here

[Reference]
http://stackoverflow.com/questions/23827051/sequence-and-vectors-from-csv-file

2014年12月1日 星期一

Python 處理 generator所產生的Dictionary Result

在 Django 中,
當我們想將 python 透過 HappyBase scan function 取得的 HBase Table scan resualt,
呈現在網頁上時

Example: 'pytest' Table
RowKey    f:id
"John"    "a"
"Mary"   "b"
"Tom"    "c"

views.py (回傳 HBase scan result 給 detail.html)
...
connection = happybase.Connection('192.168.0.7')
connection.open()
table = connection.table('pytest')
result = table.scan(columns=['f:id'], filter="SingleColumnValueFilter('f', 'id', !=, 'binary:a')")
template = loader.get_template('app1/detail.html')
context = Context({'messages': result, })
return HttpResponse(template.render(context))
...

connection = happybase.Connection('192.168.0.7')
connection.open()
使用HappyHBase連結到HBase(實際上是連結到Zookeeper,尚不知為何不用設定zookeeper的Port)

table = connection.table('pytest')
result = table.scan(columns=['f:id'], filter="SingleColumnValueFilter('f', 'id', !=, 'binary:a')")
對HBase上的 "pytest" Table 做 scan,篩選出 'f:id'不等於"a" 的row,並只回傳row的 id那欄

template = loader.get_template('app1/detail.html')
context = Context({'messages': result, })
return HttpResponse(template.render(context))
將result回傳包成HttpResponse,並回傳給指定的html template

-----------------HTML 顯示方法 1 ---------------------
若直接在html中顯示值:
detail.html
{% for v in messages %}
    {{v}}
    <br>

{% endfor %}

會顯示
('Mary', {'f:id': 'b'})
('Tom', {'f:id': 'c'})

(X1,X2, ...) => python的 Tuple 型態
{a1:b1, a2:b2, ...} => python的 Dictionary型態(類似java的Map)
--------------------------------------------

----------------HTML 顯示方法 2 -------------------
但如果我們只需要呈現 rowkey以及 id的值 則必須

views.py中再加入

@register.filter
def get_item(dictionary, key):    
     return dictionary.get(key)

並且於 detail.html中加入

{% for k, v in messages %}  <-- k, v對應tuple中的兩個值
    {{ k }}
    {% for id in v %}
        {{ v | get_item:id }} <-- Dictionary type的只能用這種方式取值
    {% endfor %}
{% endfor %}

就會顯示
Mary  b
Tom   c
--------------------------------------------

2014年10月31日 星期五

Grep String include Tab charactor

如果要grep 一份文件裡面的字串,而這字串有包含tab這種分隔符號,
可以用[^I]來表示他

ex: 要查詢 doc.txt 中"I    love    you"  (中間以tab分隔開) 這種字串

     可用  $ cat doc.txt | grep 'I[^I]love[^I]you'

2014年10月21日 星期二

Binary Log File (hex) to TSV

/*
*  This program transfer Binary File to TSV File(using tab for column spliting)
*  Usage:
* $ scalac ProberLogTSV.scala
* $ scala -classpath . ProberLogTSV /home/hduser/log00001.dat
*  Result:
*       $ cat /home/hduser/testfile.tsv
*
*/
import scala.io.Source
import scala.io.Codec

import java.nio.charset.CodingErrorAction
import java.nio.file.{Paths, Files}
import java.nio.charset.StandardCharsets
import java.io._
import java.lang._

object PorberLogtoTSV {
def main(args: Array[String]) {

//val logFileDir   = args(0)
//val totalFiles   = java.lang.Integerargs.getInteger(1)
//var fileDirArray = new Array[String](totalFiles)
val logFile = args(0)

// Convert Binary File(ex: hex file) to ASCII String with lines
val logData = binaryToString(logFile)

// Convert String to Tab File type
val tabFile = convertToTabFile(logData)

Files.write(Paths.get("/home/hduser/testfile.tsv"), tabFile.getBytes(StandardCharsets.UTF_8))
    //print(tabFile)
}

def binaryToString(logFile: String) = {
val src = scala.io.Source.fromFile(logFile, "ISO-8859-1")
val charArray = src.toArray // Translate Source to Char Array
src.close()
var strBuf = new StringBuffer()
for(i <- 0 to charArray.length-1){
if(charArray(i).equals('\0')){ // Replace '\0' to '\n' for split lines
charArray(i) = '\n'
}
strBuf.append(charArray(i))
}
strBuf.toString
}

def convertToTabFile(logData : String) = {
val lines:Array[String] = logData.split(System.getProperty("line.separator"))
for(i <- 0 until lines.length){
var charArray = lines(i).toCharArray()
var count = 1
for(j <- 0 to charArray.length-1){
if(charArray(j).equals(' ') && count <=3){
charArray(j) = '\t'
count+=1
}
}
lines(i) = new String(charArray)
}
var strBuf = new StringBuffer()
for(str <- lines){
strBuf.append(str).append('\n')
}
strBuf.toString()
}

}

2014年10月16日 星期四

Hive output result to file

[Problem]
欲將Hive計算或收集完的結果,輸出至 HDFS 或是 Local Disk

[Method]  INSERT OVERWRITE
Reference:
http://stackoverflow.com/questions/18129581/how-do-i-output-the-results-of-a-hiveql-query-to-csv

ex :(insert overwrite directory 預設是對應輸出到HDFS的資料路徑)
hive> insert overwrite directory '/user/hduser/temp' select Avg(times) from hivetableerr;

Result:
$ hdfs dfs -cat /user/hduser/temp/000000_0
47.0

如果要輸出到Local Disk則使用

hive> insert overwrite directory local '/home/hduser/temp' select Avg(times) from hivetableerr;

另一個輸出到 Local Disk的方法,不須在Hive shell中,可直接透過 bash command來執行

$ hive -e 'select Avg(times) from hivetableerr;' > /home/hduser/temp


SQL Function


可透過對Hive 下一些 SQL function
去對HBase做一些計算例如AVG()

ex: 

hive>  select AVG(times) from hivetableerr;


[Result]

Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 1
2014-10-16 15:45:47,503 Stage-1 map = 0%,  reduce = 0%
2014-10-16 15:46:05,165 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 2.19 sec
2014-10-16 15:46:19,847 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 3.49 sec
MapReduce Total cumulative CPU time: 3 seconds 490 msec
Ended Job = job_1412160247541_0074
MapReduce Jobs Launched:
Job 0: Map: 1  Reduce: 1   Cumulative CPU: 3.49 sec   HDFS Read: 255 HDFS Write: 5 SUCCESS
Total MapReduce CPU Time Spent: 3 seconds 490 msec
OK
47.0
Time taken: 44.88 seconds, Fetched: 1 row(s)

HBase Select with Join with Hive

[Problem]
當我們在HBase上創建了兩張Table如下,我們希望透過這兩張表找到Mary的錯誤次數(errorInfo.times)

"hivetable" 
cf
RowKey
id
id2
Jack
1423
Mary
1425
1745

 "hivetableErr"
errorInfo
RowKey
times
1423
43
1425
51


[Method]
Step1: 建立兩張Hive table與上面兩張Hbase table連結
create "hivetable"
create external table hivetable(name int, id int, id2 int)
stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
with serdeproperties("hbase.columns.mapping" = "cf:id, cf,id2")
tblproperties("hbase.table.name" = "hivetable");

create "hivetableerr"
create external table hivetableerr(id int, times int)
stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
with serdeproperties("hbase.columns.mapping" = "errorInfo:times")
tblproperties("hbase.table.name" = "hivetableErr");

查找指令如下

hive > select times from hivetableerr join hivetable on(hivetable.id = hivetableerr.id) where hivetable.key = "Mary";

SELECT FROM: 從 hivetableerr中找 times
JOIN ON: join兩張表格,將 hivetableerr 中的 id 對映到 hivetable中的 id
WHERE: 設定要查找錯誤次數的人為Mary

[Result]

Total jobs = 1
14/10/16 14:41:01 WARN conf.Configuration: file:/tmp/hduser/hive_2014-10-16_14-40-59_385_3743927975833041-1/-local-10006/jobconf.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.retry.interval;  Ignoring.
14/10/16 14:41:01 WARN conf.Configuration: file:/tmp/hduser/hive_2014-10-16_14-40-59_385_3743927975833041-1/-local-10006/jobconf.xml:an attempt to override final parameter: mapreduce.job.end-notification.max.attempts;  Ignoring.
Execution log at: /tmp/hduser/hduser_20141016144040_963a7821-4ec5-4343-9a53-fc4a413057c1.log
2014-10-16 02:41:02     Starting to launch local task to process map join;      maximum memory = 477102080
2014-10-16 02:41:04     Dump the side-table into file: file:/tmp/hduser/hive_2014-10-16_14-40-59_385_3743927975833041-1/-local-10003/HashTable-Stage-3/MapJoin-mapfile10--.hashtable
2014-10-16 02:41:04     Uploaded 1 File to: file:/tmp/hduser/hive_2014-10-16_14-40-59_385_3743927975833041-1/-local-10003/HashTable-Stage-3/MapJoin-mapfile10--.hashtable (282 bytes)
2014-10-16 02:41:04     End of local task; Time Taken: 1.425 sec.
Execution completed successfully
MapredLocal task succeeded
Launching Job 1 out of 1
Number of reduce tasks is set to 0 since there's no reduce operator
Starting Job = job_1412160247541_0072, Tracking URL = http://master:8088/proxy/application_1412160247541_0072/
Kill Command = /usr/local/hadoop/bin/hadoop job  -kill job_1412160247541_0072
Hadoop job information for Stage-3: number of mappers: 1; number of reducers: 0
2014-10-16 14:41:17,420 Stage-3 map = 0%,  reduce = 0%
2014-10-16 14:41:25,978 Stage-3 map = 100%,  reduce = 0%, Cumulative CPU 1.99 sec
MapReduce Total cumulative CPU time: 1 seconds 990 msec
Ended Job = job_1412160247541_0072
MapReduce Jobs Launched:
Job 0: Map: 1   Cumulative CPU: 1.99 sec   HDFS Read: 258 HDFS Write: 3 SUCCESS
Total MapReduce CPU Time Spent: 1 seconds 990 msec
OK
51
Time taken: 27.719 seconds, Fetched: 1 row(s)

2014年10月15日 星期三

HIve connect to HBase Table

[Problem]
我們已知如何利用 Hive 在 HBase 上創建 Table
但如果想要直接從Hive連接到 HBase上已經存在的 Table呢?



[Table Connection]
Reference: http://item.iqadd.com/item/hive-hbase-integration

Hive Code
CREATE EXTERNAL TABLE hivetable(key String, value int)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES("hbase.columns.mapping" = "cf:id")
TBLPROPERTIES("hbase.table.name" = "hivetable");

解釋

CREATE EXTERNAL TABLE hive_table(key String, value int)
創建一個外部(External) Table名叫"hivetable",意即Table本身在別的Database,Hive只存此Table的MetaData並與外部Database中的Table直接連結操作。

STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
描述上個指令是透過Hive所提供 HBase的 Storage Handler來操作HBase,當然Hive也提供了其他database以及FileSystem的 Storage Handler

WITH SERDEPROPERTIES("hbase.columns.mapping" = "cf:id")
取得Table上特定的資料範圍(可選定特定欄位、起始、Filter),以此行指令為例,即此Hive table只收集HBase table上的 cf:id 欄位(取得多個欄位ex: "cf:id, cf:id2, cf2:name" ...)

TBLPROPERTIES("hbase.table.name" = "hivetable");
指定要連結的HBase Table Name,此例為連結到HBase上的"hivetable"

如此一來,只要HBase上這張"hivetable"有資料上的變化,可以直接從Hive中 "hive_table"觀察到變化

2014年10月13日 星期一

Java TCP Server receive Linux nc command

[Problem]

Linux 的 nc 指令(netcat)可透過 TCP/UDP 來傳送資料或訊息,但我目前無法使用 nc 或其他Linux指令來創造一個MultiThread TCP Server,僅能透過 nc 做一對一的傳送資料
如  Put Data into Remote HBase by using Linux Bash Script ,因此改用Java 寫一個MultiThreaded Server

------------------------------------------------------------------------------------------------------------
[Concept]


Reference:
How to write RAW data to a file using Java? e.g same as: nc -l 8000 > capture.raw
Multithreaded Server in Java
-------------------------------------------------------------------------------------------------------------
[Code]

tcpserver.java  (Main Class)

package tcpserver;

public class tcpserver {
public static void main(String args[]){
MultiThreadedServer server = new MultiThreadedServer(9075);
System.out.println("***************************");
System.out.println("** TCP Collection Server **");
System.out.println("***************************");
new Thread(server).start();
}
}

MultiThreadedServer.java

package tcpserver;

import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;

public class MultiThreadedServer implements Runnable{
protected int          serverPort   = 50200;
    protected ServerSocket serverSocket = null;
    protected boolean      isStopped    = false;
    protected Thread       runningThread= null;

    public MultiThreadedServer(int port){
        this.serverPort = port;
        System.out.println("System port setting : " + port);
    }

    public void run(){
     
    synchronized(this){
            this.runningThread = Thread.currentThread();  //Avoid another same Server(?)
        }
   
        openServerSocket();
     
        System.out.println("[INFO] Start Listening!");
        while(!isStopped()){
            Socket clientSocket = null;
            try {
                clientSocket = this.serverSocket.accept(); //wait client
            } catch (IOException e) {
                if(isStopped()) {
                    System.out.println("[ERROR] Server Stopped.") ;
                    return;
                }
                throw new RuntimeException("[ERROR] Error accepting client connection", e);
            }
            // Create a New Thread for client
            new Thread(
            new WorkerRunnable(clientSocket)
            ).start();
        }
        System.out.println("Server Stopped.") ;
    }


    private synchronized boolean isStopped() {
        return this.isStopped;
    }

    public synchronized void stop(){
        this.isStopped = true;
        try {
            this.serverSocket.close();
        } catch (IOException e) {
            throw new RuntimeException("Error closing server", e);
        }
    }

    private void openServerSocket() {
        try {
            this.serverSocket = new ServerSocket(this.serverPort);
            System.out.println("[INFO] Create a ServerSocket");
        } catch (IOException e) {
            throw new RuntimeException("Cannot open port 8080", e);
        }
    }
}

WorkerRunnable.java

package tcpserver;

import java.io.*;
import java.net.*;

public class WorkerRunnable implements Runnable{
protected Socket clientSocket = null;

public WorkerRunnable(Socket clientSocket){
this.clientSocket = clientSocket;
}

public void run(){
try{
byte[] buff = new byte[1024];
int bytes_read=0;
InputStream data = clientSocket.getInputStream(); // get input data
while(( bytes_read = data.read(buff)) != -1){     // print out
System.out.println(new String(buff));
//String str = new String(buff);
}
data.close();
}
catch(IOException e){
e.printStackTrace();
}
}
}

-------------------------------------------------------------------------------------------------------------
[Result]

2014年10月12日 星期日

Put Data into Remote HBase by using Linux Bash Script


1. Start a TCP listening Server on 192.168.0.7:5002, 
and cast received info to HBase Shell

start_colleciton.sh 
while true;
do nc -l 192.168.0.7 5002
done | hbase shell

[程式意義]
nc -l : 會在 IP:192.168.0.7 開啟一個 port:5002,來 listen 任何發送到此 port的TCP封包,
(若使用 nc -lu 則是 listen UDP 封包)
while true; ... do ... done : 因為TCP處理一次後就會關掉,必須加上此指令來持續接收,
| hbase shell : 將 nc 指令 listen到的訊息(HBase Shell Operations) 丟給 hbase shell做

--------------------------------------------------------------------------------------------------------------------
2. Send Info to Server

tcollector.sh
#!/bin/bash
set -e
while true;
do awk -v now=`date +%s` \
'{ print "put " $1", " now", " $2", " $3}' HBase_Info
sleep 15
done | nc -w 30 192.168.0.7 5002

[程式意義]
set -e : 檢查 pipe執行( | ) 如果其中有例外則結束整個 pipe

awk -v now=`date +%s` \
'{ print "put " $1", " now", " $2", " $3}' HBase_Info
    - 設定變數 now 為現在時間(date+%s ==> 取到秒)
    - 將HBase_Info 此檔案內容一行一行讀出,
                  第一欄為 $1= 'hiveTableonHB'
                  第二欄為 $2= 'cf:time'
                  第三欄為 $3= 'test'

nc -w 30 192.168.0.7 5002 : 將上述awk 指令中  print 的 內容傳送到 192.168.0.7:5002上

HBase_Info
'hiveTableonHB' 'cf:time' 'test'

--------------------------------------------------------------------------------------------------------------------
[RESULT]

1.8.7-p357 :004 > scan 'hiveTableonHB'
ROW                      COLUMN+CELL
 1413177758              column=cf:time, timestamp=1413177976111, value=test
 1413180302              column=cf:time, timestamp=1413180310949, value=test
 1413180317              column=cf:time, timestamp=1413180324753, value=test
6 row(s) in 0.0270 seconds