2014年9月30日 星期二

HBase Count Table Rows (Using Java Jar File)

[Software]
    Hadoop2.4.1
    Eclipse IDE for Java Developers Luna Release (4.4.0)
    HBase0.98.5

/*
 * Version:
 * v1 : count rows of appoint table, only map task, output: counter "ROWS"
 */
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
import org.apache.hadoop.hbase.mapreduce.TableMapper;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat;

import org.apache.hadoop.util.GenericOptionsParser;

public class HbaseGet {
private static byte[] tablename;
private static byte[] familyname;
private static byte[] columnname;

public static class GetMap
extends TableMapper<Text, LongWritable> {//in Java: Text=>String, LongWritable=>long

public static enum Counters {Rows, Times};

@Override
public void map(ImmutableBytesWritable rowkey, Result result, Context context)
throws IOException {
byte[] b = result.getColumnLatest(Bytes.toBytes("m0"),  Bytes.toBytes("Tj.00")).getValue();
String msg = Bytes.toString(b);
if(msg != null && !msg.isEmpty())
context.getCounter(Counters.Rows).increment(1);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = HBaseConfiguration.create();
String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
if(otherArgs.length !=3){
System.err.println("Wrong number of arguments:"+ otherArgs.length);
System.err.println("Usage: hadoop jar HBaseGet.jar HbaseGet <tablename> <CF> <CN>");
System.exit(-1);
}
tablename  = Bytes.toBytes(otherArgs[0]);
familyname = Bytes.toBytes(otherArgs[1]);
columnname = Bytes.toBytes(otherArgs[2]);

Job job = new Job(conf, otherArgs[0]);
job.setJarByClass(HbaseGet.class);

Scan scan = new Scan();
scan.addColumn(familyname,columnname);
TableMapReduceUtil.initTableMapperJob(
Bytes.toString(tablename),
scan,
GetMap.class,
ImmutableBytesWritable.class,
Result.class, //Single row result of a Get or Scan query
job);
job.setOutputFormatClass(NullOutputFormat.class);
job.setNumReduceTasks(0);
System.exit(job.waitForCompletion(true)?0:1);
}
}

沒有留言:

張貼留言