Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Welcome To Ask or Share your Answers For Others

Categories

0 votes
362 views
in Technique[技术] by (71.8m points)

java - Extending Hadoop's TableInputFormat to scan with a prefix used for distribution of timestamp keys

I have an hbase table who's key is a timestamp with a one byte random prefix to distribute the keys so scans don't hotspot. I'm trying to extend TableInputFormat so that I can run a single MapReduce on the table with a range, prefixing all 256 possible prefixes so that all ranges with the specified timestamp range are scanned. My solution isn't working though, as it always seems to scan the last prefix (127) 256 times. Something must be shared across all scans.

My code is below. Any ideas?

public class PrefixedTableInputFormat extends TableInputFormat {

  @Override
  public List<InputSplit> getSplits(JobContext context)
    throws IOException {
    List<InputSplit> splits = new ArrayList<InputSplit>();
    Scan scan = getScan();
    byte startRow[] = scan.getStartRow(), stopRow[] = scan.getStopRow();
    byte prefixedStartRow[] = new byte[startRow.length+1];
    byte prefixedStopRow[] = new byte[stopRow.length+1];
    System.arraycopy(startRow, 0, prefixedStartRow, 1, startRow.length);
    System.arraycopy(stopRow, 0, prefixedStopRow, 1, stopRow.length);

    for (int prefix = -128; prefix < 128; prefix++) {
      prefixedStartRow[0] = (byte) prefix;
      prefixedStopRow[0] = (byte) prefix;
      scan.setStartRow(prefixedStartRow);
      scan.setStopRow(prefixedStopRow);
      setScan(scan);
      splits.addAll(super.getSplits(context));
    }

    return splits;
  }
}

and

  Configuration config = HBaseConfiguration.create();
  Job job = new Job(config, "Aggregate");
  job.setJarByClass(Aggregate.class);

  Scan scan = new Scan();
  scan.setStartRow("20120630".getBytes());
  scan.setStopRow("20120701".getBytes());
  scan.setCaching(500);
  scan.setCacheBlocks(false);

  TableMapReduceUtil.initTableMapperJob(
      "event",
      scan,
      Mapper.class,
      ImmutableBytesWritable.class,
      ImmutableBytesWritable.class,
      job,
      true,
      PrefixedTableInputFormat.class);
  TableMapReduceUtil.initTableReducerJob("event", Reducer.class, job);
See Question&Answers more detail:os

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
Welcome To Ask or Share your Answers For Others

1 Reply

0 votes
by (71.8m points)

You're going to need to make a deep copy of the splits in each iteration:

for (int prefix = -128; prefix < 128; prefix++) {
  prefixedStartRow[0] = (byte) prefix;
  prefixedStopRow[0] = (byte) prefix;
  scan.setStartRow(prefixedStartRow);
  scan.setStopRow(prefixedStopRow);
  setScan(scan);

  for (InputSplit subSplit : super.getSplits(context)) {
    splits.add((InputSplit) ReflectionUtils.copy(conf,
          (TableSplit) subSplit, new TableSplit());
  }
}

与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…
OGeek|极客中国-欢迎来到极客的世界,一个免费开放的程序员编程交流平台!开放,进步,分享!让技术改变生活,让极客改变未来! Welcome to OGeek Q&A Community for programmer and developer-Open, Learning and Share
Click Here to Ask a Question

...