Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
359 views
in Technique[技术] by (71.8m points)

java - Extending Hadoop's TableInputFormat to scan with a prefix used for distribution of timestamp keys

I have an hbase table who's key is a timestamp with a one byte random prefix to distribute the keys so scans don't hotspot. I'm trying to extend TableInputFormat so that I can run a single MapReduce on the table with a range, prefixing all 256 possible prefixes so that all ranges with the specified timestamp range are scanned. My solution isn't working though, as it always seems to scan the last prefix (127) 256 times. Something must be shared across all scans.

My code is below. Any ideas?

public class PrefixedTableInputFormat extends TableInputFormat {

  @Override
  public List<InputSplit> getSplits(JobContext context)
    throws IOException {
    List<InputSplit> splits = new ArrayList<InputSplit>();
    Scan scan = getScan();
    byte startRow[] = scan.getStartRow(), stopRow[] = scan.getStopRow();
    byte prefixedStartRow[] = new byte[startRow.length+1];
    byte prefixedStopRow[] = new byte[stopRow.length+1];
    System.arraycopy(startRow, 0, prefixedStartRow, 1, startRow.length);
    System.arraycopy(stopRow, 0, prefixedStopRow, 1, stopRow.length);

    for (int prefix = -128; prefix < 128; prefix++) {
      prefixedStartRow[0] = (byte) prefix;
      prefixedStopRow[0] = (byte) prefix;
      scan.setStartRow(prefixedStartRow);
      scan.setStopRow(prefixedStopRow);
      setScan(scan);
      splits.addAll(super.getSplits(context));
    }

    return splits;
  }
}

and

  Configuration config = HBaseConfiguration.create();
  Job job = new Job(config, "Aggregate");
  job.setJarByClass(Aggregate.class);

  Scan scan = new Scan();
  scan.setStartRow("20120630".getBytes());
  scan.setStopRow("20120701".getBytes());
  scan.setCaching(500);
  scan.setCacheBlocks(false);

  TableMapReduceUtil.initTableMapperJob(
      "event",
      scan,
      Mapper.class,
      ImmutableBytesWritable.class,
      ImmutableBytesWritable.class,
      job,
      true,
      PrefixedTableInputFormat.class);
  TableMapReduceUtil.initTableReducerJob("event", Reducer.class, job);
See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Answer

0 votes
by (71.8m points)

You're going to need to make a deep copy of the splits in each iteration:

for (int prefix = -128; prefix < 128; prefix++) {
  prefixedStartRow[0] = (byte) prefix;
  prefixedStopRow[0] = (byte) prefix;
  scan.setStartRow(prefixedStartRow);
  scan.setStopRow(prefixedStopRow);
  setScan(scan);

  for (InputSplit subSplit : super.getSplits(context)) {
    splits.add((InputSplit) ReflectionUtils.copy(conf,
          (TableSplit) subSplit, new TableSplit());
  }
}

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome to OStack Knowledge Sharing Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...