MapReduce example

Neptune uses distributed/parallel computing platform to analyze the data stored in the neptune table. By default neptune uses Hadoop Map&Reduce computing platform.
You can use AbstractTabletInputFormat class as MapReduce's InputFormat. AbstractTabletInputFotmat has one abstract method. so you must implements this method.
public abstract AbstractTabletInputFormat implements InputFormat {
  //return input table's detail info
  public abstract InputTableInfo[] getInputTableInfos(JobConf jobConf);
  ...
}
You can define scan type by InputTableInfo.setRowScan(). If return true, map method is defined as "public void map(Row.Key key, Row value, ...)".
If false, map method is defined as "public void map(Row.Key key, ScanCell value, ...)".
DefaultTabletInputFormat class is default implementation class of AbstractInputFormat. To use DefaultTabletInputFormat class you must set following properties in JobConf.
  • AbstractTabletInputFormat.INPUT_TABLE
  • AbstractTabletInputFormat.INPUT_COLUMN_LIST: ",” deliminator
To run MapReduce job with Neptune you must set classpath. There are several way.
The first approach is adding to HADOOP_CLASSPATH in a ${HADOOP_HOME}/conf/hadoop-env.sh next jar files or directory
${NEPTUNE_HOME}/conf:${NEPTUNE_HOME}/neptune-1.0-dev.jar:${NEPTUNE_HOME}/lib/zookeeper-xxx.jar
This approach is simple, but you must restart hadoop cluster.
The second approach is to use DistributedCache. To use DistributedCache you must put jar files and xml files to HDFS before running job. NeptuneMapReduceUtil class helps uploading.
public class FirstMapReduce {
  public void main(String[] args) throws Exception {
    //Create Output Table
    NConfiguration conf = new NConfiguration();
    String tableName = "InvertedTable";
    TableSchema tableSchema = new TableSchema();
    tableSchema.addColumn("Column1");
    if(!NTable.existsTable(conf, tableName)) {
      NTable.createTable(conf, tableSchema);
    }

    JobConf jobConf = new JobConf(FirstMapReduce.class);
    jobConf.setJobName("FirstMapReduce");

    String libDir = NeptuneMapReduceUtil.initMapReduce(jobConf);    

    //
    jobConf.setMapperClass(FirstMapReduceMapper.class);
    jobConf.setInputFormat(FirstMapReduceInputFormat.class);
    jobConf.setMapOutputKeyClass(Text.class);
    jobConf.setMapOutputValueClass(Text.class);
    //

    //
    String outputPath = "temp/FirstMapReduce";
    jobConf.setOutputPath(new Path(outputPath));
    jobConf.setReducerClass(FirstMapReduceReducer.class);
    jobConf.setOutputKeyClass(Text.class);
    jobConf.setOutputValueClass(Text.class);
    NTable ntable = NTable.openTable(conf, "SampleTable1");
    TabletInfo[] tabletInfos = ntable.listTabletInfos();
    jobConf.setNumReduceTasks(tabletInfos.length);
    jobConf.setMaxReduceAttempts(0);
    //

    try {
      JobClient.runJob(jobConf);
    } finally {
      NeptuneFileSystem fs = NeptuneFileSystem.get(new NConfiguration());
      fs.delete(new GPath(outputPath), true);
    
      NeptuneMapReduceUtil.clearMapReduce(libDir);  
    }
  }

  static class FirstMapReduceInputFormat 
      extends AbstractTabletInputFormat {
    public FirstMapReduceInputFormat() throws IOException {
      super();
    }

    @Override
    public InputTableInfo[] getInputTableInfos(JobConf jobConf) {
      RowFilter rowFilter = new RowFilter();
      rowFilter.addCellFilter(new CellFilter("Column1"));
      
      InputTableInfo inputTableInfo = new InputTableInfo();
      inputTableInfo.setTable("SimpleTable", rowFilter);
      
      return new InputTableInfo[]{inputTableInfo};
    }
  }

  static class FirstMapReduceMapper
      implements Mapper {
    @Override
    public void map(Row.Key key, Row value, OutputCollector output,
        Reporter reporter) throws IOException {
      for(Cell eachCell: value.getFirstColumnCells()) {
        output.collect(new Text(eachCell.getKey().toString()), new Text(key.toString()));
      }
    }

    @Override
    public void configure(JobConf job) {
    }

    @Override
    public void close() throws IOException {
    }

  }

  static class FirstMapReduceReducer
      implements Reducer {
    NTable ntable;
    IOException exception;

    @Override
    public void reduce(Text key, Iterator values,
        OutputCollector output, Reporter reporter)
        throws IOException {
      if(exception != null) {
        throw exception;
      }
      Row.Key rowKey = new Row.Key(key.toString());

      Row row = new Row(rowKey);
      while(values.hasNext()) {
        Text text = values.next();
        row.addCell("Column1",
            new Cell(new Cell.Key(text.toString()), null));
      }

      ntable.put(row);
    }

    @Override
    public void configure(JobConf job) {
      try {
        NConfiguration conf = new NConfiguration();
        ntable = NTable.openTable(conf, "InvertedTable");
        if(ntable == null) {
          throw new IOException("No InvertedTable table");
        }
      } catch (IOException e) {
        exception = e;
      }
    }

    @Override
    public void close() throws IOException {
    }
  }
}