Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -356,7 +356,7 @@ public static Result scanByRegionEncodedName(Connection connection, String regio
throws IOException {
RowFilter rowFilter =
new RowFilter(CompareOperator.EQUAL, new SubstringComparator(regionEncodedName));
Scan scan = getMetaScan(connection.getConfiguration(), 1);
Scan scan = getMetaScan(connection.getConfiguration(), 1, false);
scan.setFilter(rowFilter);
try (Table table = getMetaHTable(connection);
ResultScanner resultScanner = table.getScanner(scan)) {
Expand Down Expand Up @@ -558,13 +558,13 @@ public static Scan getScanForTableName(Configuration conf, TableName tableName)
// Stop key appends the smallest possible char to the table name
byte[] stopKey = getTableStopRowForMeta(tableName, QueryType.REGION);

Scan scan = getMetaScan(conf, -1);
Scan scan = getMetaScan(conf, -1, false);
scan.setStartRow(startKey);
scan.setStopRow(stopKey);
return scan;
}

private static Scan getMetaScan(Configuration conf, int rowUpperLimit) {
private static Scan getMetaScan(Configuration conf, int rowUpperLimit, boolean isPagedScan) {
Scan scan = new Scan();
int scannerCaching = conf.getInt(HConstants.HBASE_META_SCANNER_CACHING,
HConstants.DEFAULT_HBASE_META_SCANNER_CACHING);
Expand All @@ -575,7 +575,14 @@ private static Scan getMetaScan(Configuration conf, int rowUpperLimit) {
scan.setLimit(rowUpperLimit);
scan.setReadType(Scan.ReadType.PREAD);
}
scan.setCaching(scannerCaching);
if (isPagedScan) {
// Caller is doing a bounded paged scan and expects the whole slice back in one ScannerNext
// RPC. Size caching to the slice. Trade-off: a single larger response uses more
// RegionServer heap, fine for meta rows (small).
scan.setCaching(rowUpperLimit);
} else {
scan.setCaching(scannerCaching);
}
scan.setPriority(HConstants.INTERNAL_READ_QOS);
return scan;
}
Expand Down Expand Up @@ -706,6 +713,25 @@ public static void scanMetaForTableRegions(Connection connection, Visitor visito
scanMetaForTableRegions(connection, visitor, tableName, CatalogReplicaMode.NONE);
}

/**
* Scan meta for regions of {@code tableName}, starting at the meta row derived from
* {@code startRow} and returning at most {@code rowLimit} rows. {@code startRow} must be a region
* start-key boundary (e.g. the end key of the previously visited region), or {@code null}/empty
* to start at the first region. The scan is sized so that the whole {@code rowLimit}-row slice
* comes back in a single ScannerNext RPC, regardless of the configured
* {@code hbase.meta.scanner.caching}.
*/
public static void scanMetaForTableRegions(Connection connection, Visitor visitor,
TableName tableName, byte[] startRow, int rowLimit, CatalogReplicaMode metaReplicaMode)
throws IOException {
byte[] metaStart = (startRow == null || startRow.length == 0)
? getTableStartRowForMeta(tableName, QueryType.REGION)
: RegionInfo.createRegionName(tableName, startRow, HConstants.ZEROES, false);
byte[] metaStop = getTableStopRowForMeta(tableName, QueryType.REGION);
scanMeta(connection, metaStart, metaStop, QueryType.REGION, null, rowLimit, true, visitor,
metaReplicaMode);
}

private static void scanMeta(Connection connection, TableName table, QueryType type, int maxRows,
final Visitor visitor, CatalogReplicaMode metaReplicaMode) throws IOException {
scanMeta(connection, getTableStartRowForMeta(table, type), getTableStopRowForMeta(table, type),
Expand Down Expand Up @@ -760,8 +786,15 @@ static void scanMeta(Connection connection, @Nullable final byte[] startRow,
private static void scanMeta(Connection connection, @Nullable final byte[] startRow,
@Nullable final byte[] stopRow, QueryType type, @Nullable Filter filter, int maxRows,
final Visitor visitor, CatalogReplicaMode metaReplicaMode) throws IOException {
scanMeta(connection, startRow, stopRow, type, filter, maxRows, false, visitor, metaReplicaMode);
}

private static void scanMeta(Connection connection, @Nullable final byte[] startRow,
@Nullable final byte[] stopRow, QueryType type, @Nullable Filter filter, int maxRows,
boolean isPagedScan, final Visitor visitor, CatalogReplicaMode metaReplicaMode)
throws IOException {
int rowUpperLimit = maxRows > 0 ? maxRows : Integer.MAX_VALUE;
Scan scan = getMetaScan(connection.getConfiguration(), rowUpperLimit);
Scan scan = getMetaScan(connection.getConfiguration(), rowUpperLimit, isPagedScan);

for (byte[] family : type.getFamilies()) {
scan.addFamily(family);
Expand Down Expand Up @@ -830,7 +863,7 @@ private static void scanMeta(Connection connection, @Nullable final byte[] start
private static RegionInfo getClosestRegionInfo(Connection connection,
@NonNull final TableName tableName, @NonNull final byte[] row) throws IOException {
byte[] searchRow = RegionInfo.createRegionName(tableName, row, HConstants.NINES, false);
Scan scan = getMetaScan(connection.getConfiguration(), 1);
Scan scan = getMetaScan(connection.getConfiguration(), 1, false);
scan.setReversed(true);
scan.withStartRow(searchRow);
try (ResultScanner resultScanner = getMetaHTable(connection).getScanner(scan)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1146,11 +1146,7 @@ rpcControllerFactory, getMetaLookupPool(), connectionConfig.getMetaReadRpcTimeou
}
} finally {
if (lockedUserRegion) {
userRegionLock.unlock();
// update duration of the lock being held
if (metrics != null) {
metrics.updateUserRegionLockHeld(EnvironmentEdgeManager.currentTime() - lockStartTime);
}
releaseUserRegionLock(lockStartTime);
}
}
try {
Expand Down Expand Up @@ -1185,6 +1181,19 @@ void takeUserRegionLock() throws IOException {
}
}

/**
* Release {@link #userRegionLock} previously acquired via {@link #takeUserRegionLock()} and
* record the held duration in metrics.
* @param lockStartTimeMs value of {@link EnvironmentEdgeManager#currentTime()} captured
* immediately after {@link #takeUserRegionLock()} returned
*/
void releaseUserRegionLock(long lockStartTimeMs) {
userRegionLock.unlock();
if (metrics != null) {
metrics.updateUserRegionLockHeld(EnvironmentEdgeManager.currentTime() - lockStartTimeMs);
}
}

/**
* Put a newly discovered HRegionLocation into the cache.
* @param tableName The table name.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.trace.TableSpanBuilder;
import org.apache.hadoop.hbase.trace.TraceUtil;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
Expand Down Expand Up @@ -111,6 +112,63 @@ public List<HRegionLocation> getAllRegionLocations() throws IOException {
}, HRegionLocator::getRegionNames, supplier);
}

@Override
public List<HRegionLocation> getRegionLocationsPage(byte[] startKey, int limit)
throws IOException {
if (TableName.isMetaTableName(tableName)) {
throw new IOException(
"getRegionLocationsPage(startKey, limit) is not supported for hbase:meta;"
+ " use getRegionLocation(EMPTY_START_ROW) instead.");
}
final int effectiveLimit = limit > 0
? limit
: connection.getConfiguration().getInt(HConstants.HBASE_META_SCANNER_CACHING,
HConstants.DEFAULT_HBASE_META_SCANNER_CACHING);
final byte[] effectiveStart = startKey == null ? HConstants.EMPTY_START_ROW : startKey;
final CatalogReplicaMode metaReplicaMode = CatalogReplicaMode.fromString(connection
.getConfiguration().get(LOCATOR_META_REPLICAS_MODE, CatalogReplicaMode.NONE.toString()));

final Supplier<Span> supplier = new TableSpanBuilder(connection)
.setName("HRegionLocator.getRegionLocationsPage").setTableName(tableName);
return tracedLocationFuture(() -> {
final List<HRegionLocation> out = new ArrayList<>(effectiveLimit);
MetaTableAccessor.Visitor visitor = new MetaTableAccessor.TableVisitorBase(tableName) {
@Override
public boolean visitInternal(Result result) throws IOException {
RegionLocations locs = MetaTableAccessor.getRegionLocations(result);
if (locs == null) {
return true;
}
for (HRegionLocation loc : locs.getRegionLocations()) {
if (loc != null) {
out.add(loc);
}
}
RegionLocations cleaned = locs.removeElementsWithNullLocation();
if (cleaned != null) {
connection.cacheLocation(tableName, cleaned);
}
return true;
}
};

boolean locked = false;
long lockStart = 0;
try {
connection.takeUserRegionLock();
lockStart = EnvironmentEdgeManager.currentTime();
locked = true;
MetaTableAccessor.scanMetaForTableRegions(connection, visitor, tableName, effectiveStart,
effectiveLimit, metaReplicaMode);
} finally {
if (locked) {
connection.releaseUserRegionLock(lockStart);
}
}
return out;
}, HRegionLocator::getRegionNames, supplier);
}

private static List<String> getRegionNames(List<HRegionLocation> locations) {
if (CollectionUtils.isEmpty(locations)) {
return Collections.emptyList();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.IOException;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.util.Pair;
Expand Down Expand Up @@ -130,6 +131,54 @@ default List<HRegionLocation> getRegionLocations(byte[] row) throws IOException
*/
List<HRegionLocation> getAllRegionLocations() throws IOException;

/**
* Bulk lookup of region locations from {@code hbase:meta} in a single RPC, starting at
* {@code startKey} (region start-key boundary, inclusive) and returning at most {@code limit}
* regions in start-key order.
* <p/>
* The returned list includes all replicas of each region (matching
* {@link #getAllRegionLocations()}), and the result is also written to the connection's region
* location cache.
* <p/>
* Ordering: regions are returned in ascending region start-key order (the natural order of
* {@code hbase:meta} rows for a single table). Within each region, replicas are returned in
* ascending replica-id order (replica 0, then 1, then 2, ...). Split parents and offline regions
* are filtered out, which may cause a page to contain fewer than {@code limit} regions but never
* disturbs ordering of the survivors.
* <p/>
* To page through all regions of a table, call repeatedly passing
* {@code last.getRegion().getEndKey()} as the next {@code startKey}, where {@code last} is the
* final element of the previous response. All replicas of a region share the same
* {@link RegionInfo}, so the last entry's end key is the correct cursor regardless of which
* replica it is. Pass {@code null} for the first call. Stop paging when the returned list is
* empty or when the last region's end key is {@link HConstants#EMPTY_END_ROW} (zero-length) -
* that signals the end of the table; passing it back in would re-scan from the beginning since by
* convention an empty start key means "from the first region".
* <p/>
* Unlike {@link #getAllRegionLocations()}, this method performs at most one RPC against
* {@code hbase:meta} per invocation, so its latency is bounded by {@code limit} rather than table
* size. Suitable for callers that wrap meta lookups in a lock with a fixed timeout, e.g. for bulk
* region-cache warmup.
* <p/>
* This method is optional. Implementations that cannot support paginated lookups should throw
* {@link UnsupportedOperationException} (the default behavior); callers should fall back to
* {@link #getAllRegionLocations()} in that case.
* @param startKey region start-key to begin scanning from (inclusive); {@code null} or empty
* starts from the first region
* @param limit maximum number of regions to return; if &lt;= 0, falls back to
* {@code hbase.meta.scanner.caching}
* @return up to {@code limit} {@link HRegionLocation}s in start-key order, possibly empty when no
* more regions exist
* @throws IOException if a remote or network exception occurs
* @throws UnsupportedOperationException if this implementation does not support paginated lookups
*/
default List<HRegionLocation> getRegionLocationsPage(byte[] startKey, int limit)
throws IOException {
throw new UnsupportedOperationException(
"getRegionLocationsPage(byte[], int) is not supported by this RegionLocator;"
+ " fall back to getAllRegionLocations()");
}

/**
* Gets the starting row key for every region in the currently open table.
* <p>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.SmallTests;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;

/**
* Asserts the single-RPC promise of the paginated meta-scan path
* ({@link MetaTableAccessor#scanMetaForTableRegions(Connection, MetaTableAccessor.Visitor, TableName, byte[], int, CatalogReplicaMode)})
* by capturing the {@link Scan} dispatched against the meta {@link Table} and asserting
* {@code scan.getCaching() == rowLimit}. ScannerNext RPC count is
* {@code ceil(rowsRequested / scan.getCaching())}, so {@code caching == rowLimit} is sufficient to
* prove a single ScannerNext RPC.
* <p/>
* The configured {@code hbase.meta.scanner.caching} is set to a value smaller than {@code rowLimit}
* so the paged-vs-unbounded branches in {@code MetaTableAccessor#getMetaScan} are distinguishable.
*/
@Tag(ClientTests.TAG)
@Tag(SmallTests.TAG)
public class TestMetaTableAccessorPagedScanCaching {

private static final TableName USER_TABLE = TableName.valueOf("LocatorPaged");
private static final int META_CACHING = 2;
private static final int NUM_REGIONS = 5;

private static final MetaTableAccessor.Visitor NOOP_VISITOR = result -> true;

private Connection connection;
private Table metaTable;

@BeforeEach
public void setUp() throws IOException {
Configuration conf = HBaseConfiguration.create();
conf.setInt(HConstants.HBASE_META_SCANNER_CACHING, META_CACHING);

connection = mock(Connection.class);
metaTable = mock(Table.class);
ResultScanner scanner = mock(ResultScanner.class);

when(connection.getConfiguration()).thenReturn(conf);
when(connection.getTable(TableName.META_TABLE_NAME)).thenReturn(metaTable);
when(metaTable.getScanner(any(Scan.class))).thenReturn(scanner);
when(scanner.next()).thenReturn(null);
}

@Test
public void testPagedScanCachingEqualsLimitWhenLimitWithinCaching() throws IOException {
int rowLimit = META_CACHING;
MetaTableAccessor.scanMetaForTableRegions(connection, NOOP_VISITOR, USER_TABLE, null, rowLimit,
CatalogReplicaMode.NONE);
assertEquals(rowLimit, capturedScan().getCaching());
}

@Test
public void testPagedScanCachingEqualsLimitWhenLimitExceedsCaching() throws IOException {
int rowLimit = NUM_REGIONS;
MetaTableAccessor.scanMetaForTableRegions(connection, NOOP_VISITOR, USER_TABLE, null, rowLimit,
CatalogReplicaMode.NONE);
assertEquals(rowLimit, capturedScan().getCaching());
}

@Test
public void testUnboundedPathStillUsesConfiguredCaching() throws IOException {
MetaTableAccessor.scanMetaForTableRegions(connection, NOOP_VISITOR, USER_TABLE);
Scan scan = capturedScan();
assertEquals(META_CACHING, scan.getCaching());
assertEquals(Integer.MAX_VALUE, scan.getLimit());
}

private Scan capturedScan() throws IOException {
ArgumentCaptor<Scan> scanCaptor = ArgumentCaptor.forClass(Scan.class);
verify(metaTable).getScanner(scanCaptor.capture());
return scanCaptor.getValue();
}
}
Loading