filter 需要实现Filter 或者继承FilterBase
@InterfaceAudience.Public@InterfaceStability.Stablepublic abstract class Filter {//返回码public enum ReturnCode {INCLUDE, //结果中包含着一样INCLUDE_AND_NEXT_COL, //包含着这样一行,跳到下一行比较SKIP, //跳到下一个keyvalue 并进行处理NEXT_COL, //跳过当前一colNEXT_ROW, //跳过当前一行SEEK_NEXT_USING_HINT, //跳到下一个满足地方,需要调用getNextKeyHint()}protected transient boolean reversed;abstract public void reset() throws IOException;//判断行健是否满足,不满足可以跳过,避免其他检查:比如前缀过滤器abstract public boolean filterRowKey(byte[] buffer, int offset, int length) throws IOException;//这个过滤器可以提前结束abstract public boolean filterAllRemaining() throws IOException;- //对cell处理,
abstract public ReturnCode filterKeyValue(final Cell v) throws IOException;- abstract public Cell transformCell(final Cell v) throws IOException;
@Deprecated // use Cell transformCell(final Cell)abstract public KeyValue transform(final KeyValue currentKV) throws IOException;//经过前面处理后,如果还有数据,将对当前行一起处理, 比如依赖过去器abstract public void filterRowCells(List<Cell> kvs) throws IOException;abstract public boolean hasFilterRow();//经过这么多流程如果还有数据,会去检查一下数据的要求。比如pagefilter 是否已经够一页了abstract public boolean filterRow() throws IOException;@Deprecatedabstract public KeyValue getNextKeyHint(final KeyValue currentKV) throws IOException;abstract public Cell getNextCellHint(final Cell currentKV) throws IOException;abstract public boolean isFamilyEssential(byte[] name) throws IOException;abstract public byte[] toByteArray() throws IOException;public static Filter parseFrom(final byte [] pbBytes) throws DeserializationException {throw new DeserializationException("parseFrom called on base Filter, but should be called on derived type");}abstract boolean areSerializedFieldsEqual(Filter other);public void setReversed(boolean reversed) {this.reversed = reversed;}- public boolean isReversed() {
return this.reversed;}}
流程如下:
scan,或者get是调用的入口
基本流程就是下面 一样,但是没有看到fiterKeyValue,如果你们找到了,告诉我一声。
private boolean nextInternal(List<Cell> results, ScannerContext scannerContext)throws IOException {while (true) {- boolean stopRow = isStopRow(currentRow, offset, length);
boolean hasFilterRow = this.filter != null && this.filter.hasFilterRow();if (hasFilterRow) {if (LOG.isTraceEnabled()) {LOG.trace("filter#hasFilterRow is true which prevents partial results from being "+ " formed. Changing scope of limits that may create partials");}scannerContext.setSizeLimitScope(LimitScope.BETWEEN_ROWS);scannerContext.setTimeLimitScope(LimitScope.BETWEEN_ROWS);}if (filterRowKey(currentRow, offset, length)) {incrementCountOfRowsFilteredMetric(scannerContext);// early check, see HBASE-16296- //filterAllRemaining 实际调用
if (isFilterDoneInternal()) {return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();}- incrementCountOfRowsScannedMetric(scannerContext);
- //里面会调用filter.reset();
boolean moreRows = nextRow(scannerContext, currentRow, offset, length);if (!moreRows) {return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();}results.clear();continue;}populateResult(results, this.storeHeap, scannerContext, currentRow, offset, length);- Cell nextKv = this.storeHeap.peek();
stopRow = nextKv == null ||isStopRow(nextKv.getRowArray(), nextKv.getRowOffset(), nextKv.getRowLength());final boolean isEmptyRow = results.isEmpty();-
- FilterWrapper.FilterRowRetCode ret = FilterWrapper.FilterRowRetCode.NOT_CALLED;
if (hasFilterRow) {-
//会调用filterRowCells(List) 和filterRowCells(cell) ret = filter.filterRowCellsWithRet(results);-
long timeProgress = scannerContext.getTimeProgress(); if (scannerContext.getKeepProgress()) {scannerContext.setProgress(initialBatchProgress, initialSizeProgress,initialTimeProgress);} else {scannerContext.clearProgress();}scannerContext.setTimeProgress(timeProgress);scannerContext.incrementBatchProgress(results.size());for (Cell cell : results) {scannerContext.incrementSizeProgress(CellUtil.estimatedHeapSizeOfWithoutTags(cell));}}if (isEmptyRow || ret == FilterWrapper.FilterRowRetCode.EXCLUDE || filterRow()) {incrementCountOfRowsFilteredMetric(scannerContext);results.clear();boolean moreRows = nextRow(scannerContext, currentRow, offset, length);if (!moreRows) {return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();}-
if (!stopRow) continue; return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();}-
if (this.joinedHeap != null) { boolean mayHaveData = joinedHeapMayHaveData(currentRow, offset, length);if (mayHaveData) {joinedContinuationRow = current;populateFromJoinedHeap(results, scannerContext);if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {return true;}}}} else {-
populateFromJoinedHeap(results, scannerContext); if (scannerContext.checkAnyLimitReached(LimitScope.BETWEEN_CELLS)) {return true;}}if (stopRow) {return scannerContext.setScannerState(NextState.NO_MORE_VALUES).hasMoreValues();} else {return scannerContext.setScannerState(NextState.MORE_VALUES).hasMoreValues();}}}
就这样。
自定义完成后,打成jar ,需要export export HBASE_CLAPSS
或者将jar 放到hbase 的安装目录的lib下面,重启hbase