HBase MapReduce

- Data Locality, block placement policy. the first copy is written to the data node where region server runs.

- TableInputFormat, divide table at region boundaries by start row and end row

static class AnalyzeMapper extends TableMapper<Text, IntWritable> {// extends from TableMapper
    private IntWritable ONE = new IntWritable(1);
    public void map(ImmutableBytesWritable row, Result columns, Context context)
        throws IOException {
        String value = null;
        try {
            for (KeyValue kv : columns.list()) {
                value = Bytes.toStringBinary(kv.getValue());
                JSONObject json = (JSONObject) parser.parse(value);
                String author = (String) json.get("author");
                context.write(new Text(author), ONE);
        } catch (Exception e) {
            System.err.println("Row: " + Bytes.toStringBinary(row.get()) +
            ", JSON: " + value);
TableMapReduceUtil.initTableMapperJob(table, scan, AnalyzeMapper.class,
    Text.class, IntWritable.class, job); // set mapper class, set TableInputFormat

- TableOutputFormat, 

Put put = new Put(rowkey); // from mapper class
put.add(family, qualifier, Bytes.toBytes(lineString));
context.write(new ImmutableBytesWritable(rowkey), put);
job.setOutputFormatClass(TableOutputFormat.class); //tableoutputformat for hbase
job.getConfiguration().set(TableOutputFormat.OUTPUT_TABLE, table);
job.setOutputKeyClass(ImmutableBytesWritable.class); // key type needed by TableOutputFormat
job.setOutputValueClass(Writable.class);job.setNumReduceTasks(0); //no reduce task
TableMapReduceUtil.initTableReducerJob(output, IdentityTableReducer.class, job);
job.setNumReduceTasks(0); //reduce task is not necessary