-
Notifications
You must be signed in to change notification settings - Fork 599
HDDS-14768. Fix lock leak during snapshot cache cleanup and handle eviction race appropriately. #9869
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HDDS-14768. Fix lock leak during snapshot cache cleanup and handle eviction race appropriately. #9869
Changes from all commits
89f51cc
abb4660
aa5528d
e90a965
0d130af
c1678c0
0d4ebb9
c84828d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -25,6 +25,7 @@ | |
| import com.google.common.annotations.VisibleForTesting; | ||
| import com.google.common.cache.CacheLoader; | ||
| import java.io.IOException; | ||
| import java.util.Collections; | ||
| import java.util.Set; | ||
| import java.util.UUID; | ||
| import java.util.concurrent.ConcurrentHashMap; | ||
|
|
@@ -135,6 +136,11 @@ ConcurrentHashMap<UUID, ReferenceCounted<OmSnapshot>> getDbMap() { | |
| return dbMap; | ||
| } | ||
|
|
||
| @VisibleForTesting | ||
| Set<UUID> getPendingEvictionQueue() { | ||
| return Collections.unmodifiableSet(pendingEvictionQueue); | ||
| } | ||
|
|
||
| /** | ||
| * @return number of DB instances currently held in cache. | ||
| */ | ||
|
|
@@ -158,6 +164,7 @@ public void invalidate(UUID key) { | |
| } | ||
| omMetrics.decNumSnapshotCacheSize(); | ||
| } | ||
| pendingEvictionQueue.remove(k); | ||
| return null; | ||
| }); | ||
| } | ||
|
|
@@ -323,8 +330,17 @@ private UncheckedAutoCloseableSupplier<OMLockDetails> lock(Supplier<OMLockDetail | |
|
|
||
| AtomicReference<OMLockDetails> lockDetails = new AtomicReference<>(emptyLockFunction.get()); | ||
| if (lockDetails.get().isLockAcquired()) { | ||
| if (!cleanupFunction.get()) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the lock is released by calling emptyUnlockFunction.get(), if cleanup operation was not successful or if it throws a Throwable. |
||
| lockDetails.set(emptyUnlockFunction.get()); | ||
| try { | ||
| if (!cleanupFunction.get()) { | ||
| throw new IllegalStateException("Failed to acquire lock as cleanup did not drain the cache."); | ||
| } | ||
| } catch (Throwable t) { | ||
| try { | ||
| lockDetails.set(emptyUnlockFunction.get()); | ||
| } catch (Throwable unlockThrowable) { | ||
| t.addSuppressed(unlockThrowable); | ||
| } | ||
| throw t; | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -377,26 +393,25 @@ private synchronized Void cleanup(UUID evictionKey, boolean expectKeyToBePresent | |
| } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this means the entry in the SnapshotCache is removed before cleanup() is called. it should not happen during Snapshot Purge Response, because it explicitly invalidate snapshot cache. |
||
|
|
||
| dbMap.compute(evictionKey, (k, v) -> { | ||
| pendingEvictionQueue.remove(k); | ||
| ReferenceCounted<OmSnapshot> result = null; | ||
| if (v == null) { | ||
| throw new IllegalStateException("SnapshotId '" + k + "' does not exist in cache. The RocksDB " + | ||
| "instance of the Snapshot may not be closed properly."); | ||
| } | ||
|
|
||
| if (v.getTotalRefCount() > 0) { | ||
| LOG.info("SnapshotId '{}' does not exist in cache during cleanup; " | ||
| + "it may have already been invalidated, closed, and removed.", k); | ||
| } else if (v.getTotalRefCount() > 0) { | ||
| LOG.debug("SnapshotId {} is still being referenced ({}), skipping its clean up.", k, v.getTotalRefCount()); | ||
| return v; | ||
| result = v; | ||
| } else { | ||
| LOG.debug("Closing SnapshotId {}. It is not being referenced anymore.", k); | ||
| // Close the instance, which also closes its DB handle. | ||
| try { | ||
| v.get().close(); | ||
| } catch (IOException ex) { | ||
| throw new IllegalStateException("Error while closing snapshot DB.", ex); | ||
| throw new IllegalStateException("Error while closing snapshot DB for snapshotId " + k, ex); | ||
| } | ||
| omMetrics.decNumSnapshotCacheSize(); | ||
| return null; | ||
| } | ||
| pendingEvictionQueue.remove(k); | ||
| return result; | ||
| }); | ||
| return null; | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -19,6 +19,7 @@ | |
|
|
||
| import static org.apache.hadoop.ozone.om.lock.DAGLeveledResource.SNAPSHOT_DB_LOCK; | ||
| import static org.apache.hadoop.ozone.om.lock.OzoneManagerLock.LeveledResource.VOLUME_LOCK; | ||
| import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; | ||
| import static org.junit.jupiter.api.Assertions.assertEquals; | ||
| import static org.junit.jupiter.api.Assertions.assertFalse; | ||
| import static org.junit.jupiter.api.Assertions.assertInstanceOf; | ||
|
|
@@ -46,6 +47,7 @@ | |
| import java.util.concurrent.TimeoutException; | ||
| import java.util.concurrent.atomic.AtomicBoolean; | ||
| import org.apache.hadoop.hdds.conf.OzoneConfiguration; | ||
| import org.apache.hadoop.hdds.utils.db.DBStore; | ||
| import org.apache.hadoop.hdds.utils.db.Table; | ||
| import org.apache.hadoop.ozone.om.OMMetadataManager; | ||
| import org.apache.hadoop.ozone.om.OMMetrics; | ||
|
|
@@ -511,4 +513,137 @@ void testSnapshotOperationsNotBlockedDuringCompaction() throws IOException, Inte | |
| verify(store1, times(1)).compactTable("table2"); | ||
| verify(store1, times(0)).compactTable("keyTable"); | ||
| } | ||
|
|
||
| private static IOzoneManagerLock newAcquiringLock() { | ||
| IOzoneManagerLock acquiringLock = mock(IOzoneManagerLock.class); | ||
| when(acquiringLock.acquireReadLock(eq(SNAPSHOT_DB_LOCK), any(String[].class))) | ||
| .thenReturn(OMLockDetails.EMPTY_DETAILS_LOCK_ACQUIRED); | ||
| when(acquiringLock.releaseReadLock(eq(SNAPSHOT_DB_LOCK), any(String[].class))) | ||
| .thenReturn(OMLockDetails.EMPTY_DETAILS_LOCK_NOT_ACQUIRED); | ||
| when(acquiringLock.acquireResourceWriteLock(eq(SNAPSHOT_DB_LOCK))) | ||
| .thenReturn(OMLockDetails.EMPTY_DETAILS_LOCK_ACQUIRED); | ||
| when(acquiringLock.releaseResourceWriteLock(eq(SNAPSHOT_DB_LOCK))) | ||
| .thenReturn(OMLockDetails.EMPTY_DETAILS_LOCK_NOT_ACQUIRED); | ||
| when(acquiringLock.acquireWriteLock(eq(SNAPSHOT_DB_LOCK), any(String[].class))) | ||
| .thenReturn(OMLockDetails.EMPTY_DETAILS_LOCK_ACQUIRED); | ||
| when(acquiringLock.releaseWriteLock(eq(SNAPSHOT_DB_LOCK), any(String[].class))) | ||
| .thenReturn(OMLockDetails.EMPTY_DETAILS_LOCK_NOT_ACQUIRED); | ||
| return acquiringLock; | ||
| } | ||
|
|
||
| private OmSnapshot mockSnapshot(UUID snapshotId) { | ||
| final OmSnapshot omSnapshot = mock(OmSnapshot.class); | ||
| when(omSnapshot.getSnapshotTableKey()).thenReturn(snapshotId.toString()); | ||
| when(omSnapshot.getSnapshotID()).thenReturn(snapshotId); | ||
|
|
||
| return omSnapshot; | ||
| } | ||
|
|
||
| @Test | ||
| @DisplayName("Stale eviction key (invalidate + late close) is cleaned up without throwing") | ||
| void testStaleEvictionKeyDuringCleanup() throws IOException { | ||
| snapshotCache = new SnapshotCache(cacheLoader, CACHE_SIZE_LIMIT, omMetrics, 0, true, newAcquiringLock()); | ||
| final UUID snapshotId = UUID.randomUUID(); | ||
|
|
||
| // Acquire a snapshot handle so it is ref-counted in the cache. | ||
| UncheckedAutoCloseableSupplier<OmSnapshot> handle = snapshotCache.get(snapshotId); | ||
| assertEquals(1, snapshotCache.size()); | ||
|
|
||
| // Invalidate removes the dbMap entry. The handle still exists and will later hit refcount=0. | ||
| snapshotCache.invalidate(snapshotId); | ||
| assertEquals(0, snapshotCache.size()); | ||
|
|
||
| // Late close triggers ReferenceCounted callback which can re-add snapshotId to pendingEvictionQueue. | ||
| handle.close(); | ||
| assertTrue(snapshotCache.getPendingEvictionQueue().contains(snapshotId)); | ||
|
|
||
| // cleanup(true) is invoked by lock(); it should remove the stale key and not throw. | ||
| assertDoesNotThrow(() -> { | ||
| try (UncheckedAutoCloseableSupplier<OMLockDetails> lockDetails = snapshotCache.lock()) { | ||
| assertTrue(lockDetails.get().isLockAcquired()); | ||
| } | ||
| }); | ||
| assertFalse(snapshotCache.getPendingEvictionQueue().contains(snapshotId)); | ||
| } | ||
|
|
||
| @Test | ||
| @DisplayName("Close failure keeps snapshot in eviction queue for retry") | ||
| void testCloseFailureRetriesSnapshot() throws Exception { | ||
|
|
||
| IOzoneManagerLock acquiringLock = newAcquiringLock(); | ||
| snapshotCache = new SnapshotCache(cacheLoader, CACHE_SIZE_LIMIT, omMetrics, 0, true, acquiringLock); | ||
| final UUID snapshotId = UUID.randomUUID(); | ||
|
|
||
| final AtomicBoolean failCloseOnce = new AtomicBoolean(true); | ||
| final OmSnapshot failingSnapshot = mockSnapshot(snapshotId); | ||
|
|
||
| OMMetadataManager metadataManager = mock(OMMetadataManager.class); | ||
| DBStore store = mock(DBStore.class); | ||
| when(failingSnapshot.getMetadataManager()).thenReturn(metadataManager); | ||
| when(metadataManager.getStore()).thenReturn(store); | ||
| when(store.listTables()).thenReturn(new ArrayList<>()); | ||
|
|
||
| doAnswer(invocation -> { | ||
| if (failCloseOnce.getAndSet(false)) { | ||
| throw new IOException("close failed"); | ||
| } | ||
| return null; | ||
| }).when(failingSnapshot).close(); | ||
|
|
||
| when(cacheLoader.load(eq(snapshotId))).thenReturn(failingSnapshot); | ||
|
|
||
| // Load + close handle so refcount transitions to 0 and snapshotId is queued for eviction. | ||
| try (UncheckedAutoCloseableSupplier<OmSnapshot> ignored = snapshotCache.get(snapshotId)) { | ||
| assertEquals(1, snapshotCache.size()); | ||
| assertEquals(1, omMetrics.getNumSnapshotCacheSize()); | ||
| } | ||
| assertEquals(0L, snapshotCache.getDbMap().get(snapshotId).getTotalRefCount()); | ||
| assertTrue(snapshotCache.getPendingEvictionQueue().contains(snapshotId)); | ||
|
|
||
| // First cleanup attempt fails to close; entry should remain in dbMap and key should stay queued for retry. | ||
| assertThrows(IllegalStateException.class, () -> snapshotCache.lock()); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1 |
||
| verify(acquiringLock, times(1)).acquireResourceWriteLock(eq(SNAPSHOT_DB_LOCK)); | ||
| verify(acquiringLock, times(1)).releaseResourceWriteLock(eq(SNAPSHOT_DB_LOCK)); | ||
| assertTrue(snapshotCache.getDbMap().containsKey(snapshotId)); | ||
| assertTrue(snapshotCache.getPendingEvictionQueue().contains(snapshotId)); | ||
| assertEquals(1, omMetrics.getNumSnapshotCacheSize()); | ||
|
|
||
| // Second cleanup attempt should succeed (close no longer throws), removing entry and eviction key. | ||
| try (UncheckedAutoCloseableSupplier<OMLockDetails> lockDetails = snapshotCache.lock()) { | ||
| assertTrue(lockDetails.get().isLockAcquired()); | ||
| } | ||
| assertFalse(snapshotCache.getDbMap().containsKey(snapshotId)); | ||
| assertFalse(snapshotCache.getPendingEvictionQueue().contains(snapshotId)); | ||
| assertEquals(0, omMetrics.getNumSnapshotCacheSize()); | ||
| } | ||
|
|
||
| @Test | ||
| @DisplayName("lock supplier releases write lock if cleanup throws an exception") | ||
| void testLockSupplierReleasesWriteLockOnCleanupException() throws Exception { | ||
| IOzoneManagerLock acquiringLock = newAcquiringLock(); | ||
| snapshotCache = new SnapshotCache(cacheLoader, CACHE_SIZE_LIMIT, omMetrics, 0, true, acquiringLock); | ||
|
|
||
| final UUID snapshotId = UUID.randomUUID(); | ||
| final OmSnapshot failingSnapshot = mockSnapshot(snapshotId); | ||
|
|
||
| OMMetadataManager metadataManager = mock(OMMetadataManager.class); | ||
| DBStore store = mock(DBStore.class); | ||
| when(failingSnapshot.getMetadataManager()).thenReturn(metadataManager); | ||
| when(metadataManager.getStore()).thenReturn(store); | ||
| // Trigger an unchecked exception during compaction, which is not caught by cleanup(). | ||
| when(store.listTables()).thenThrow(new RuntimeException("listTables failed")); | ||
|
|
||
| when(cacheLoader.load(eq(snapshotId))).thenReturn(failingSnapshot); | ||
|
|
||
| // Load the snapshot and close so it is enqueued for eviction (refcount reaches 0). | ||
| try (UncheckedAutoCloseableSupplier<OmSnapshot> ignored = snapshotCache.get(snapshotId)) { | ||
| assertEquals(1, snapshotCache.size()); | ||
| } | ||
| assertTrue(snapshotCache.getPendingEvictionQueue().contains(snapshotId)); | ||
|
|
||
| // cleanup(true) will throw -> lock() should release the resource write lock before rethrowing. | ||
| assertThrows(RuntimeException.class, () -> snapshotCache.lock()); | ||
| verify(acquiringLock, times(1)).acquireResourceWriteLock(eq(SNAPSHOT_DB_LOCK)); | ||
| verify(acquiringLock, times(1)).releaseResourceWriteLock(eq(SNAPSHOT_DB_LOCK)); | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pendingEvictionQueue may have the entry k, if snapshot purge response is happening after, and all references of the snapshot is decremented (releasing the SnapshotCache lock of the key), but before the periodic cleanup thread kicks in.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
note: this case should not happen during checkpointing, because checkpoints holds a write lock of the snapshot cache, and once released, it invokes cleanup() immediately.