diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/PipeTsFileDeletionBarrier.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/PipeTsFileDeletionBarrier.java new file mode 100644 index 0000000000000..954821dea0278 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/PipeTsFileDeletionBarrier.java @@ -0,0 +1,334 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.consensus.deletion; + +import org.apache.iotdb.commons.utils.TestOnly; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.NavigableMap; +import java.util.NavigableSet; +import java.util.Objects; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +/** + * An ordered barrier for coordinating delete materialization and TsFile transfer. + * + *

Delete materialization and realtime TsFile transfer are observed by followers through + * replicateIndex order, but the leader locally materializes delete mods after publishing delete + * events. This barrier ensures that a TsFile snapshot: + * + *

1. includes every delete that entered the delete path before the TsFile got its replicate + * index, and + * + *

2. excludes deletes that entered after that replicate index. + */ +public class PipeTsFileDeletionBarrier { + + private static final Logger LOGGER = LoggerFactory.getLogger(PipeTsFileDeletionBarrier.class); + + private static final long INITIAL_DELETE_SEQ = 0L; + + private final ConcurrentMap regionId2DeletionState = + new ConcurrentHashMap<>(); + private final ConcurrentMap tsFilePath2BarrierState = + new ConcurrentHashMap<>(); + + public long beginDeletion(final int regionId) { + return getOrCreateRegionDeletionState(regionId).beginDeletion(); + } + + public void resolveDeletionTargets( + final int regionId, final long deleteSeq, final Collection tsFilePaths) { + if (deleteSeq <= INITIAL_DELETE_SEQ) { + return; + } + + final RegionDeletionState regionDeletionState = getOrCreateRegionDeletionState(regionId); + final Set sanitizedTsFilePaths = sanitizeTsFilePaths(tsFilePaths); + + synchronized (regionDeletionState.monitor) { + for (final String tsFilePath : sanitizedTsFilePaths) { + getOrCreateTsFileBarrierState(tsFilePath).registerPendingDeletion(deleteSeq); + } + regionDeletionState.resolveDeletion(deleteSeq); + } + } + + public long beginSnapshot(final int regionId, final String tsFilePath) { + if (Objects.isNull(tsFilePath)) { + return INITIAL_DELETE_SEQ; + } + + final RegionDeletionState regionDeletionState = getOrCreateRegionDeletionState(regionId); + synchronized (regionDeletionState.monitor) { + final long snapshotUpperBound = regionDeletionState.getMaxAllocatedDeletionSeq(); + getOrCreateTsFileBarrierState(tsFilePath).registerSnapshot(snapshotUpperBound); + return snapshotUpperBound; + } + } + + public void awaitDeletionResolutionUpTo(final int regionId, final long deleteSeqUpperBound) + throws InterruptedException { + if (deleteSeqUpperBound <= INITIAL_DELETE_SEQ) { + return; + } + + getOrCreateRegionDeletionState(regionId).awaitDeletionResolutionUpTo(deleteSeqUpperBound); + } + + public void awaitPendingDeletionsUpTo(final String tsFilePath, final long deleteSeqUpperBound) + throws InterruptedException { + if (Objects.isNull(tsFilePath) || deleteSeqUpperBound <= INITIAL_DELETE_SEQ) { + return; + } + + final TsFileBarrierState barrierState = tsFilePath2BarrierState.get(tsFilePath); + if (Objects.isNull(barrierState)) { + return; + } + + barrierState.awaitPendingDeletionsUpTo(deleteSeqUpperBound); + } + + public void awaitSnapshotsBeforeMaterialization(final String tsFilePath, final long deleteSeq) + throws InterruptedException { + if (Objects.isNull(tsFilePath) || deleteSeq <= INITIAL_DELETE_SEQ) { + return; + } + + final TsFileBarrierState barrierState = tsFilePath2BarrierState.get(tsFilePath); + if (Objects.isNull(barrierState)) { + return; + } + + barrierState.awaitSnapshotsBeforeMaterialization(deleteSeq); + } + + public void finishSnapshot(final String tsFilePath, final long snapshotUpperBound) { + if (Objects.isNull(tsFilePath)) { + return; + } + + final TsFileBarrierState barrierState = tsFilePath2BarrierState.get(tsFilePath); + if (Objects.isNull(barrierState)) { + return; + } + + barrierState.finishSnapshot(snapshotUpperBound); + cleanupTsFileBarrierStateIfEmpty(tsFilePath, barrierState); + } + + public void finishDeletion(final long deleteSeq, final Collection tsFilePaths) { + if (deleteSeq <= INITIAL_DELETE_SEQ) { + return; + } + + final Set sanitizedTsFilePaths = sanitizeTsFilePaths(tsFilePaths); + for (final String tsFilePath : sanitizedTsFilePaths) { + final TsFileBarrierState barrierState = tsFilePath2BarrierState.get(tsFilePath); + if (Objects.isNull(barrierState)) { + continue; + } + + barrierState.finishDeletion(deleteSeq); + cleanupTsFileBarrierStateIfEmpty(tsFilePath, barrierState); + } + } + + @TestOnly + public void clear() { + regionId2DeletionState.clear(); + tsFilePath2BarrierState.clear(); + } + + private RegionDeletionState getOrCreateRegionDeletionState(final int regionId) { + return regionId2DeletionState.computeIfAbsent(regionId, ignored -> new RegionDeletionState()); + } + + private TsFileBarrierState getOrCreateTsFileBarrierState(final String tsFilePath) { + return tsFilePath2BarrierState.computeIfAbsent(tsFilePath, ignored -> new TsFileBarrierState()); + } + + private Set sanitizeTsFilePaths(final Collection tsFilePaths) { + if (Objects.isNull(tsFilePaths) || tsFilePaths.isEmpty()) { + return Collections.emptySet(); + } + + final Set sanitizedTsFilePaths = new HashSet<>(); + for (final String tsFilePath : tsFilePaths) { + if (Objects.nonNull(tsFilePath)) { + sanitizedTsFilePaths.add(tsFilePath); + } + } + return sanitizedTsFilePaths; + } + + private void cleanupTsFileBarrierStateIfEmpty( + final String tsFilePath, final TsFileBarrierState barrierState) { + tsFilePath2BarrierState.computeIfPresent( + tsFilePath, + (ignored, state) -> { + if (state != barrierState) { + return state; + } + synchronized (state.monitor) { + return state.isEmptyUnsafe() ? null : state; + } + }); + } + + private static class RegionDeletionState { + private final Object monitor = new Object(); + private final NavigableSet unresolvedDeletionSeqs = new TreeSet<>(); + + private long maxAllocatedDeletionSeq = INITIAL_DELETE_SEQ; + private long maxResolvedDeletionSeq = INITIAL_DELETE_SEQ; + + private long beginDeletion() { + synchronized (monitor) { + final long deleteSeq = ++maxAllocatedDeletionSeq; + unresolvedDeletionSeqs.add(deleteSeq); + return deleteSeq; + } + } + + private long getMaxAllocatedDeletionSeq() { + return maxAllocatedDeletionSeq; + } + + private void resolveDeletion(final long deleteSeq) { + synchronized (monitor) { + if (deleteSeq > maxAllocatedDeletionSeq || deleteSeq <= INITIAL_DELETE_SEQ) { + LOGGER.warn( + "PipeTsFileDeletionBarrier: try to resolve invalid delete seq {}, max allocated {}.", + deleteSeq, + maxAllocatedDeletionSeq); + return; + } + + unresolvedDeletionSeqs.remove(deleteSeq); + while (maxResolvedDeletionSeq < maxAllocatedDeletionSeq + && !unresolvedDeletionSeqs.contains(maxResolvedDeletionSeq + 1)) { + maxResolvedDeletionSeq++; + } + monitor.notifyAll(); + } + } + + private void awaitDeletionResolutionUpTo(final long deleteSeqUpperBound) + throws InterruptedException { + synchronized (monitor) { + while (maxResolvedDeletionSeq < deleteSeqUpperBound) { + monitor.wait(); + } + } + } + } + + private static class TsFileBarrierState { + private final Object monitor = new Object(); + private final NavigableSet pendingDeletionSeqs = new TreeSet<>(); + private final NavigableMap snapshotUpperBound2ReferenceCount = new TreeMap<>(); + + private void registerPendingDeletion(final long deleteSeq) { + synchronized (monitor) { + pendingDeletionSeqs.add(deleteSeq); + monitor.notifyAll(); + } + } + + private void finishDeletion(final long deleteSeq) { + synchronized (monitor) { + pendingDeletionSeqs.remove(deleteSeq); + monitor.notifyAll(); + } + } + + private void registerSnapshot(final long snapshotUpperBound) { + synchronized (monitor) { + snapshotUpperBound2ReferenceCount.merge(snapshotUpperBound, 1, Integer::sum); + monitor.notifyAll(); + } + } + + private void finishSnapshot(final long snapshotUpperBound) { + synchronized (monitor) { + snapshotUpperBound2ReferenceCount.computeIfPresent( + snapshotUpperBound, (ignored, count) -> count == 1 ? null : count - 1); + monitor.notifyAll(); + } + } + + private void awaitPendingDeletionsUpTo(final long deleteSeqUpperBound) + throws InterruptedException { + synchronized (monitor) { + while (!pendingDeletionSeqs.headSet(deleteSeqUpperBound, true).isEmpty()) { + monitor.wait(); + } + } + } + + private void awaitSnapshotsBeforeMaterialization(final long deleteSeq) + throws InterruptedException { + synchronized (monitor) { + while (hasSnapshotEarlierThan(deleteSeq)) { + monitor.wait(); + } + } + } + + private boolean hasSnapshotEarlierThan(final long deleteSeq) { + final Map.Entry firstSnapshot = snapshotUpperBound2ReferenceCount.firstEntry(); + return Objects.nonNull(firstSnapshot) && firstSnapshot.getKey() < deleteSeq; + } + + private boolean isEmptyUnsafe() { + return pendingDeletionSeqs.isEmpty() && snapshotUpperBound2ReferenceCount.isEmpty(); + } + } + + /////////////////////////////// singleton /////////////////////////////// + + private static class PipeTsFileDeletionBarrierHolder { + private static final PipeTsFileDeletionBarrier INSTANCE = new PipeTsFileDeletionBarrier(); + + private PipeTsFileDeletionBarrierHolder() { + // empty constructor + } + } + + public static PipeTsFileDeletionBarrier getInstance() { + return PipeTsFileDeletionBarrierHolder.INSTANCE; + } + + private PipeTsFileDeletionBarrier() { + // empty constructor + } +} diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeCompactedTsFileInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeCompactedTsFileInsertionEvent.java index 343c8d8932931..47eb15a27aeec 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeCompactedTsFileInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeCompactedTsFileInsertionEvent.java @@ -53,6 +53,8 @@ public PipeCompactedTsFileInsertionEvent( anyOfOriginalEvents.getTreeModelDatabaseName(), tsFileResource, null, + null, + true, bindIsWithMod(originalEvents), bindIsLoaded(originalEvents), bindIsGeneratedByHistoricalExtractor(originalEvents), diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java index 316b728a278a9..5fdf8e22ec164 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java @@ -35,6 +35,7 @@ import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern; import org.apache.iotdb.commons.pipe.resource.ref.PipePhantomReferenceManager.PipeEventResource; import org.apache.iotdb.db.auth.AuthorityChecker; +import org.apache.iotdb.db.pipe.consensus.deletion.PipeTsFileDeletionBarrier; import org.apache.iotdb.db.pipe.event.ReferenceTrackableEvent; import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent; import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent; @@ -48,6 +49,7 @@ import org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexKeeper; import org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName; import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor; +import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent; import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent; @@ -81,10 +83,20 @@ public class PipeTsFileInsertionEvent extends PipeInsertionEvent private File tsFile; private long extractTime = 0; + // Whether this event should transfer mod files if they exist. + private boolean shouldTransferModFile; + // Whether the assigner-side first retain should delay mod snapshotting until replicateIndex is + // assigned to the pipe-specific copy. + private boolean shouldDelayModSnapshotUntilReplicateIndex; + // Whether this event should consult the live TsFileResource for mod visibility before pinning. + private boolean shouldRefreshModFileStateFromResource; + // Whether the current modFile is already pinned and should no longer be overwritten by refresh. + private boolean isModPinned; // This is true iff the modFile exists and should be transferred private boolean isWithMod; private File modFile; private final File sharedModFile; + private long snapshotUpperBoundForDelayedMod = NO_COMMIT_ID; protected final boolean isLoaded; protected final boolean isGeneratedByPipe; @@ -114,6 +126,8 @@ public PipeTsFileInsertionEvent( databaseNameFromDataRegion, resource, null, + null, + true, true, isLoaded, false, @@ -128,7 +142,8 @@ public PipeTsFileInsertionEvent( null, true, Long.MIN_VALUE, - Long.MAX_VALUE); + Long.MAX_VALUE, + false); } public PipeTsFileInsertionEvent( @@ -136,6 +151,8 @@ public PipeTsFileInsertionEvent( final String databaseNameFromDataRegion, final TsFileResource resource, final File tsFile, + final File modFile, + final boolean shouldRefreshModFileStateFromResource, final boolean isWithMod, final boolean isLoaded, final boolean isGeneratedByHistoricalExtractor, @@ -151,6 +168,54 @@ public PipeTsFileInsertionEvent( final boolean skipIfNoPrivileges, final long startTime, final long endTime) { + this( + isTableModelEvent, + databaseNameFromDataRegion, + resource, + tsFile, + modFile, + shouldRefreshModFileStateFromResource, + isWithMod, + isLoaded, + isGeneratedByHistoricalExtractor, + tableNames, + pipeName, + creationTime, + pipeTaskMeta, + treePattern, + tablePattern, + userId, + userName, + cliHostname, + skipIfNoPrivileges, + startTime, + endTime, + false); + } + + private PipeTsFileInsertionEvent( + final Boolean isTableModelEvent, + final String databaseNameFromDataRegion, + final TsFileResource resource, + final File tsFile, + final File modFile, + final boolean shouldRefreshModFileStateFromResource, + final boolean isWithMod, + final boolean isLoaded, + final boolean isGeneratedByHistoricalExtractor, + final Set tableNames, + final String pipeName, + final long creationTime, + final PipeTaskMeta pipeTaskMeta, + final TreePattern treePattern, + final TablePattern tablePattern, + final String userId, + final String userName, + final String cliHostname, + final boolean skipIfNoPrivileges, + final long startTime, + final long endTime, + final boolean shouldDelayModSnapshotUntilReplicateIndex) { super( pipeName, creationTime, @@ -174,8 +239,20 @@ public PipeTsFileInsertionEvent( // hard-link it to each pipe dir this.tsFile = Objects.isNull(tsFile) ? resource.getTsFile() : tsFile; - this.isWithMod = isWithMod && resource.anyModFileExists(); - this.modFile = this.isWithMod ? resource.getExclusiveModFile().getFile() : null; + this.shouldTransferModFile = isWithMod; + this.shouldDelayModSnapshotUntilReplicateIndex = shouldDelayModSnapshotUntilReplicateIndex; + this.shouldRefreshModFileStateFromResource = + this.shouldTransferModFile && shouldRefreshModFileStateFromResource; + this.modFile = modFile; + this.isModPinned = + this.shouldTransferModFile + && !this.shouldRefreshModFileStateFromResource + && Objects.nonNull(this.modFile); + if (this.shouldRefreshModFileStateFromResource) { + refreshModFileState(); + } else { + this.isWithMod = this.shouldTransferModFile && Objects.nonNull(this.modFile); + } // TODO: process the shared mod file this.sharedModFile = resource.getSharedModFile() != null ? resource.getSharedModFile().getFile() : null; @@ -276,6 +353,7 @@ public File getTsFile() { } public File getModFile() { + refreshModFileState(); return modFile; } @@ -284,13 +362,21 @@ public File getSharedModFile() { } public boolean isWithMod() { + refreshModFileState(); return isWithMod; } - // If the previous "isWithMod" is false, the modFile has been set to "null", then the isWithMod - // can't be set to true - public void disableMod4NonTransferPipes(final boolean isWithMod) { - this.isWithMod = isWithMod && this.isWithMod; + public void disableMod4NonTransferPipes(final boolean shouldTransferModFile) { + this.shouldTransferModFile = shouldTransferModFile && this.shouldTransferModFile; + refreshModFileState(); + } + + public void enableDelayModSnapshotUntilReplicateIndex() { + shouldDelayModSnapshotUntilReplicateIndex = true; + } + + public boolean isModPinned() { + return isModPinned; } public boolean isLoaded() { @@ -319,17 +405,33 @@ public long getExtractTime() { /////////////////////////// EnrichedEvent /////////////////////////// + @Override + public void setReplicateIndexForIoTV2(final long replicateIndexForIoTV2) { + super.setReplicateIndexForIoTV2(replicateIndexForIoTV2); + + if (!shouldDelayModSnapshotUntilReplicateIndex + || !shouldTransferModFile + || Objects.isNull(resource) + || Objects.isNull(pipeName) + || snapshotUpperBoundForDelayedMod != NO_COMMIT_ID) { + return; + } + + snapshotUpperBoundForDelayedMod = + PipeTsFileDeletionBarrier.getInstance() + .beginSnapshot(Integer.parseInt(resource.getDataRegionId()), resource.getTsFilePath()); + } + @Override public boolean internallyIncreaseResourceReferenceCount(final String holderMessage) { extractTime = System.nanoTime(); try { - tsFile = PipeDataNodeResourceManager.tsfile().increaseFileReference(tsFile, true, pipeName); - if (isWithMod) { - modFile = - PipeDataNodeResourceManager.tsfile().increaseFileReference(modFile, false, pipeName); - } + pinSnapshotForFirstReference(); return true; } catch (final Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } LOGGER.warn( String.format( "Increase reference count for TsFile %s or modFile %s error. Holder Message: %s", @@ -348,7 +450,7 @@ public boolean internallyIncreaseResourceReferenceCount(final String holderMessa public boolean internallyDecreaseResourceReferenceCount(final String holderMessage) { try { PipeDataNodeResourceManager.tsfile().decreaseFileReference(tsFile, pipeName); - if (isWithMod) { + if (isModPinned && Objects.nonNull(modFile)) { PipeDataNodeResourceManager.tsfile().decreaseFileReference(modFile, pipeName); } close(); @@ -423,7 +525,9 @@ public PipeTsFileInsertionEvent shallowCopySelfAndBindPipeTaskMetaForProgressRep getSourceDatabaseNameFromDataRegion(), resource, tsFile, - isWithMod, + modFile, + shouldRefreshModFileStateFromResource, + shouldTransferModFile, isLoaded, isGeneratedByHistoricalExtractor, tableNames, @@ -437,7 +541,8 @@ public PipeTsFileInsertionEvent shallowCopySelfAndBindPipeTaskMetaForProgressRep cliHostname, skipIfNoPrivileges, startTime, - endTime); + endTime, + shouldDelayModSnapshotUntilReplicateIndex); } @Override @@ -754,6 +859,7 @@ public boolean isGeneratedByHistoricalExtractor() { private TsFileInsertionEventParser initEventParser() { try { + refreshModFileState(); eventParser.compareAndSet( null, new TsFileInsertionEventParserProvider( @@ -848,23 +954,220 @@ public void trackResource() { @Override public PipeEventResource eventResourceBuilder() { + refreshModFileState(); return new PipeTsFileInsertionEventResource( this.isReleased, this.referenceCount, this.pipeName, this.tsFile, - this.isWithMod, + this.isModPinned, this.modFile, - this.sharedModFile, this.eventParser); } - private static class PipeTsFileInsertionEventResource extends PipeEventResource { + private void refreshModFileState() { + if (!shouldTransferModFile) { + isWithMod = false; + if (!isModPinned) { + modFile = null; + } + return; + } + + if (isModPinned || !shouldRefreshModFileStateFromResource) { + isWithMod = Objects.nonNull(modFile); + return; + } + + if (Objects.isNull(resource)) { + isWithMod = Objects.nonNull(modFile); + return; + } + + isWithMod = resource.anyModFileExists(); + modFile = isWithMod ? resource.getExclusiveModFile().getFile() : null; + } + + private void pinSnapshotForFirstReference() throws Exception { + final SnapshotPinResult snapshotPinResult = + Objects.nonNull(resource) + ? pinResourceBackedSnapshotForFirstReference() + : pinStandaloneSnapshotForFirstReference(); + + tsFile = snapshotPinResult.tsFile; + modFile = snapshotPinResult.modFile; + isWithMod = snapshotPinResult.isWithMod; + isModPinned = snapshotPinResult.isModPinned; + shouldRefreshModFileStateFromResource = snapshotPinResult.shouldRefreshModFileStateFromResource; + } + + private SnapshotPinResult pinResourceBackedSnapshotForFirstReference() throws Exception { + final File pinnedTsFile = pinTsFileUnderResourceReadLock(); + final boolean shouldFinishSnapshotBarrier = shouldDelayModSnapshotForPipeCopy(); + boolean modSnapshotAttempted = false; + try { + if (shouldOnlyPinTsFileOnFirstReference()) { + refreshModFileState(); + return new SnapshotPinResult( + pinnedTsFile, modFile, isWithMod, false, shouldRefreshModFileStateFromResource); + } + if (shouldFinishSnapshotBarrier) { + awaitDelayedDeletionMaterializationBeforeModSnapshot(); + } + + modSnapshotAttempted = true; + return pinModSnapshotUnderResourceReadLock(pinnedTsFile); + } catch (final Exception e) { + if (!modSnapshotAttempted) { + releasePinnedSnapshotQuietly(pinnedTsFile, null); + } + throw e; + } finally { + if (shouldFinishSnapshotBarrier) { + finishDelayedSnapshotBarrier(); + } + } + } + + private SnapshotPinResult pinStandaloneSnapshotForFirstReference() throws IOException { + final File pinnedTsFile = + PipeDataNodeResourceManager.tsfile().increaseFileReference(tsFile, true, pipeName); + + if (Objects.isNull(modFile)) { + return new SnapshotPinResult( + pinnedTsFile, null, false, false, shouldRefreshModFileStateFromResource); + } + + File pinnedModFile = null; + try { + pinnedModFile = + PipeDataNodeResourceManager.tsfile().increaseFileReference(modFile, false, pipeName); + return new SnapshotPinResult(pinnedTsFile, pinnedModFile, true, true, false); + } catch (final Exception e) { + releasePinnedSnapshotQuietly(pinnedTsFile, pinnedModFile); + throw e; + } + } + + private File pinTsFileUnderResourceReadLock() throws IOException { + resource.readLock(); + try { + return PipeDataNodeResourceManager.tsfile().increaseFileReference(tsFile, true, pipeName); + } finally { + resource.readUnlock(); + } + } + + private void awaitDelayedDeletionMaterializationBeforeModSnapshot() throws InterruptedException { + final int regionId = Integer.parseInt(resource.getDataRegionId()); + final PipeTsFileDeletionBarrier barrier = PipeTsFileDeletionBarrier.getInstance(); + barrier.awaitDeletionResolutionUpTo(regionId, snapshotUpperBoundForDelayedMod); + barrier.awaitPendingDeletionsUpTo(resource.getTsFilePath(), snapshotUpperBoundForDelayedMod); + } + + private SnapshotPinResult pinModSnapshotUnderResourceReadLock(final File pinnedTsFile) + throws IOException { + File pinnedModFile = null; + boolean withMod = false; + + resource.readLock(); + try { + final ModificationFile resourceExclusiveModFile = resource.getExclusiveModFile(); + resourceExclusiveModFile.writeLock(); + try { + if (!shouldTransferModFile) { + return new SnapshotPinResult(pinnedTsFile, null, false, false, false); + } + + if (!shouldRefreshModFileStateFromResource && Objects.nonNull(modFile)) { + pinnedModFile = + PipeDataNodeResourceManager.tsfile().increaseFileReference(modFile, false, pipeName); + withMod = true; + } else if (resourceExclusiveModFile.exists()) { + pinnedModFile = + PipeDataNodeResourceManager.tsfile() + .increaseFileReference(resourceExclusiveModFile.getFile(), false, pipeName); + withMod = true; + } + } finally { + resourceExclusiveModFile.writeUnlock(); + } + } catch (final Exception e) { + releasePinnedSnapshotQuietly(pinnedTsFile, pinnedModFile); + throw e; + } finally { + resource.readUnlock(); + } + + return new SnapshotPinResult( + pinnedTsFile, pinnedModFile, withMod, Objects.nonNull(pinnedModFile), false); + } + + private boolean shouldOnlyPinTsFileOnFirstReference() { + return shouldDelayModSnapshotUntilReplicateIndex && Objects.isNull(pipeName); + } + + private boolean shouldDelayModSnapshotForPipeCopy() { + return shouldDelayModSnapshotUntilReplicateIndex + && Objects.nonNull(pipeName) + && snapshotUpperBoundForDelayedMod != NO_COMMIT_ID; + } + + private void finishDelayedSnapshotBarrier() { + if (!shouldDelayModSnapshotForPipeCopy()) { + return; + } + + PipeTsFileDeletionBarrier.getInstance() + .finishSnapshot(resource.getTsFilePath(), snapshotUpperBoundForDelayedMod); + snapshotUpperBoundForDelayedMod = NO_COMMIT_ID; + } + + private void releasePinnedSnapshotQuietly(final File pinnedTsFile, final File pinnedModFile) { + try { + if (Objects.nonNull(pinnedModFile)) { + PipeDataNodeResourceManager.tsfile().decreaseFileReference(pinnedModFile, pipeName); + } + } catch (final Exception e) { + LOGGER.warn("Decrease reference count for modFile {} error.", pinnedModFile, e); + } + + try { + if (Objects.nonNull(pinnedTsFile)) { + PipeDataNodeResourceManager.tsfile().decreaseFileReference(pinnedTsFile, pipeName); + } + } catch (final Exception e) { + LOGGER.warn("Decrease reference count for TsFile {} error.", pinnedTsFile, e); + } + } + + private static class SnapshotPinResult { private final File tsFile; + private final File modFile; private final boolean isWithMod; + private final boolean isModPinned; + private final boolean shouldRefreshModFileStateFromResource; + + private SnapshotPinResult( + final File tsFile, + final File modFile, + final boolean isWithMod, + final boolean isModPinned, + final boolean shouldRefreshModFileStateFromResource) { + this.tsFile = tsFile; + this.modFile = modFile; + this.isWithMod = isWithMod; + this.isModPinned = isModPinned; + this.shouldRefreshModFileStateFromResource = shouldRefreshModFileStateFromResource; + } + } + + private static class PipeTsFileInsertionEventResource extends PipeEventResource { + + private final File tsFile; + private final boolean isModPinned; private final File modFile; - private final File sharedModFile; // unused now private final AtomicReference eventParser; private final String pipeName; @@ -873,16 +1176,14 @@ private PipeTsFileInsertionEventResource( final AtomicInteger referenceCount, final String pipeName, final File tsFile, - final boolean isWithMod, + final boolean isModPinned, final File modFile, - final File sharedModFile, final AtomicReference eventParser) { super(isReleased, referenceCount); this.pipeName = pipeName; this.tsFile = tsFile; - this.isWithMod = isWithMod; + this.isModPinned = isModPinned; this.modFile = modFile; - this.sharedModFile = sharedModFile; this.eventParser = eventParser; } @@ -891,7 +1192,7 @@ protected void finalizeResource() { try { // decrease reference count PipeDataNodeResourceManager.tsfile().decreaseFileReference(tsFile, pipeName); - if (isWithMod) { + if (isModPinned && Objects.nonNull(modFile)) { PipeDataNodeResourceManager.tsfile().decreaseFileReference(modFile, pipeName); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/IoTConsensusV2SyncSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/IoTConsensusV2SyncSink.java index 5866a5ceeaa6c..76bebb976bbcd 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/IoTConsensusV2SyncSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/IoTConsensusV2SyncSink.java @@ -382,7 +382,7 @@ private void doTransfer(final PipeTsFileInsertionEvent pipeTsFileInsertionEvent) new TConsensusGroupId(TConsensusGroupType.DataRegion, consensusGroupId); // 1. Transfer tsFile, and mod file if exists - if (pipeTsFileInsertionEvent.isWithMod()) { + if (modFile != null) { transferFilePieces( modFile, syncIoTConsensusV2ServiceClient, true, tCommitId, tConsensusGroupId); transferFilePieces( diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/handler/IoTConsensusV2TsFileInsertionEventHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/handler/IoTConsensusV2TsFileInsertionEventHandler.java index 28850bc543dd9..90cdac84b896f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/handler/IoTConsensusV2TsFileInsertionEventHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/handler/IoTConsensusV2TsFileInsertionEventHandler.java @@ -46,7 +46,6 @@ import java.io.IOException; import java.io.RandomAccessFile; import java.util.Arrays; -import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; public class IoTConsensusV2TsFileInsertionEventHandler @@ -101,17 +100,14 @@ public IoTConsensusV2TsFileInsertionEventHandler( tsFile = event.getTsFile(); modFile = event.getModFile(); - transferMod = event.isWithMod(); + transferMod = modFile != null; currentFile = transferMod ? modFile : tsFile; readFileBufferSize = PipeConfig.getInstance().getPipeSinkReadFileBufferSize(); readBuffer = new byte[readFileBufferSize]; position = 0; - reader = - Objects.nonNull(modFile) - ? new RandomAccessFile(modFile, "r") - : new RandomAccessFile(tsFile, "r"); + reader = transferMod ? new RandomAccessFile(modFile, "r") : new RandomAccessFile(tsFile, "r"); isSealSignalSent = new AtomicBoolean(false); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java index f8d0b104096f1..a10abf8272713 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java @@ -404,6 +404,9 @@ private boolean transferWithoutCheck(final TsFileInsertionEvent tsFileInsertionE throw new FileNotFoundException(pipeTsFileInsertionEvent.getTsFile().getAbsolutePath()); } + final boolean supportMod = clientManager.supportModsIfIsDataNodeReceiver(); + final File modFile = supportMod ? pipeTsFileInsertionEvent.getModFile() : null; + final PipeTransferTsFileHandler pipeTransferTsFileHandler = new PipeTransferTsFileHandler( this, @@ -416,9 +419,8 @@ private boolean transferWithoutCheck(final TsFileInsertionEvent tsFileInsertionE new AtomicInteger(1), new AtomicBoolean(false), pipeTsFileInsertionEvent.getTsFile(), - pipeTsFileInsertionEvent.getModFile(), - pipeTsFileInsertionEvent.isWithMod() - && clientManager.supportModsIfIsDataNodeReceiver(), + modFile, + modFile != null, pipeTsFileInsertionEvent.isTableModelEvent() ? pipeTsFileInsertionEvent.getTableModelDatabaseName() : null); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java index 809b40f6eba02..76b4dd1f30b30 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java @@ -856,6 +856,8 @@ private Event supplyTsFileEvent(final TsFileResource resource) { resource.getDatabaseName(), resource, null, + null, + true, shouldTransferModFile, false, true, diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java index 9c7182f051c74..7041d723bd4d4 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java @@ -93,13 +93,19 @@ public PipeDataRegionAssigner(final int dataRegionId) { } public void publishToAssign(final PipeRealtimeEvent event) { + final EnrichedEvent innerEvent = event.getEvent(); + if (innerEvent instanceof PipeTsFileInsertionEvent + && DataRegionConsensusImpl.getInstance() instanceof IoTConsensusV2 + && IoTConsensusV2Processor.isShouldReplicate(innerEvent)) { + ((PipeTsFileInsertionEvent) innerEvent).enableDelayModSnapshotUntilReplicateIndex(); + } + if (!event.increaseReferenceCount(PipeDataRegionAssigner.class.getName())) { LOGGER.warn( "The reference count of the realtime event {} cannot be increased, skipping it.", event); return; } - final EnrichedEvent innerEvent = event.getEvent(); eventCounter.increaseEventCount(innerEvent); if (innerEvent instanceof PipeHeartbeatEvent) { ((PipeHeartbeatEvent) innerEvent).onPublished(); @@ -174,6 +180,12 @@ private void assignToSource( source.getRealtimeDataExtractionStartTime(), source.getRealtimeDataExtractionEndTime()); final EnrichedEvent innerEvent = copiedEvent.getEvent(); + if (innerEvent instanceof PipeTsFileInsertionEvent) { + final PipeTsFileInsertionEvent tsFileInsertionEvent = + (PipeTsFileInsertionEvent) innerEvent; + tsFileInsertionEvent.disableMod4NonTransferPipes(source.isShouldTransferModFile()); + } + // if using IoTV2, assign a replicateIndex for this realtime event if (DataRegionConsensusImpl.getInstance() instanceof IoTConsensusV2 && IoTConsensusV2Processor.isShouldReplicate(innerEvent)) { @@ -187,12 +199,6 @@ private void assignToSource( innerEvent); } - if (innerEvent instanceof PipeTsFileInsertionEvent) { - final PipeTsFileInsertionEvent tsFileInsertionEvent = - (PipeTsFileInsertionEvent) innerEvent; - tsFileInsertionEvent.disableMod4NonTransferPipes(source.isShouldTransferModFile()); - } - if (innerEvent instanceof PipeDeleteDataNodeEvent) { final PipeDeleteDataNodeEvent deleteDataNodeEvent = (PipeDeleteDataNodeEvent) innerEvent; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java index 3cce521a51e0b..541b3432272f6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java @@ -164,7 +164,6 @@ public DeletionResource listenToDeleteData( } assigner.publishToAssign(PipeRealtimeEventFactory.createRealtimeEvent(node)); - return deletionResource; } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java index eb37cb1d5d21b..3c09dd37d17f9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java @@ -67,6 +67,7 @@ import org.apache.iotdb.db.pipe.consensus.deletion.DeletionResource; import org.apache.iotdb.db.pipe.consensus.deletion.DeletionResource.Status; import org.apache.iotdb.db.pipe.consensus.deletion.DeletionResourceManager; +import org.apache.iotdb.db.pipe.consensus.deletion.PipeTsFileDeletionBarrier; import org.apache.iotdb.db.pipe.consensus.deletion.persist.PageCacheDeletionBuffer; import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil; import org.apache.iotdb.db.pipe.source.dataregion.realtime.listener.PipeInsertionDataNodeListener; @@ -2830,6 +2831,88 @@ private void getTwoKindsOfTsFiles( .forEach(unsealedResource::add); } + private DeleteMaterializationBarrierContext beginDeleteMaterializationBarrierIfNecessary( + final boolean shouldUseBarrier) { + if (!shouldUseBarrier) { + return null; + } + + return new DeleteMaterializationBarrierContext( + dataRegionId.getId(), + PipeTsFileDeletionBarrier.getInstance().beginDeletion(dataRegionId.getId())); + } + + private Set collectImpactedTsFilePaths( + final Collection sealedTsFiles, final ModEntry deletion) { + return sealedTsFiles.stream() + .filter(resource -> !canSkipDelete(resource, deletion)) + .map(TsFileResource::getTsFilePath) + .collect(Collectors.toSet()); + } + + private void resolveDeleteMaterializationTargets( + final DeleteMaterializationBarrierContext barrierContext, + final Collection tsFilePaths) { + if (Objects.isNull(barrierContext) || barrierContext.resolved) { + return; + } + + barrierContext.tsFilePaths.addAll(tsFilePaths); + PipeTsFileDeletionBarrier.getInstance() + .resolveDeletionTargets( + barrierContext.regionId, barrierContext.deleteSeq, barrierContext.tsFilePaths); + barrierContext.resolved = true; + } + + private void awaitSnapshotsBeforeMaterializing( + final DeleteMaterializationBarrierContext barrierContext, + final Collection sealedTsFiles, + final ModEntry deletion) + throws InterruptedException { + if (Objects.isNull(barrierContext)) { + return; + } + + final PipeTsFileDeletionBarrier barrier = PipeTsFileDeletionBarrier.getInstance(); + for (final TsFileResource sealedTsFile : sealedTsFiles) { + if (!canSkipDelete(sealedTsFile, deletion)) { + barrier.awaitSnapshotsBeforeMaterialization( + sealedTsFile.getTsFilePath(), barrierContext.deleteSeq); + } + } + } + + private void finishDeleteMaterializationBarrier( + final DeleteMaterializationBarrierContext barrierContext) { + if (Objects.isNull(barrierContext) || barrierContext.finished) { + return; + } + + if (!barrierContext.resolved) { + PipeTsFileDeletionBarrier.getInstance() + .resolveDeletionTargets( + barrierContext.regionId, barrierContext.deleteSeq, Collections.emptySet()); + barrierContext.resolved = true; + } + + PipeTsFileDeletionBarrier.getInstance() + .finishDeletion(barrierContext.deleteSeq, barrierContext.tsFilePaths); + barrierContext.finished = true; + } + + private static class DeleteMaterializationBarrierContext { + private final int regionId; + private final long deleteSeq; + private final Set tsFilePaths = new HashSet<>(); + private boolean resolved = false; + private boolean finished = false; + + private DeleteMaterializationBarrierContext(final int regionId, final long deleteSeq) { + this.regionId = regionId; + this.deleteSeq = deleteSeq; + } + } + public void deleteByDevice(final MeasurementPath pattern, final DeleteDataNode node) throws IOException { if (SettleService.getINSTANCE().getFilesToBeSettledCount().get() != 0) { @@ -2863,6 +2946,9 @@ public void deleteByDevice(final MeasurementPath pattern, final DeleteDataNode n } ModEntry deletion = new TreeDeletionEntry(pattern, startTime, endTime); + final DeleteMaterializationBarrierContext barrierContext = + beginDeleteMaterializationBarrierIfNecessary( + DeletionResource.isDeleteNodeGeneratedInLocalByIoTV2(node)); List sealedTsFileResource = new ArrayList<>(); List unsealedTsFileResource = new ArrayList<>(); @@ -2870,19 +2956,29 @@ public void deleteByDevice(final MeasurementPath pattern, final DeleteDataNode n // deviceMatchInfo is used for filter the matched deviceId in TsFileResource // deviceMatchInfo contains the DeviceId means this device matched the pattern deleteDataInUnsealedFiles(unsealedTsFileResource, deletion, sealedTsFileResource); - // capture deleteDataNode and wait it to be persisted to DAL. - DeletionResource deletionResource = - PipeInsertionDataNodeListener.getInstance() - .listenToDeleteData(dataRegionId.getId(), node); - // just get result. We have already waited for result in `listenToDeleteData` - if (deletionResource != null && deletionResource.waitForResult() == Status.FAILURE) { - throw deletionResource.getCause(); - } - writeUnlock(); - hasReleasedLock = true; + resolveDeleteMaterializationTargets( + barrierContext, collectImpactedTsFilePaths(sealedTsFileResource, deletion)); + try { + // capture deleteDataNode and wait it to be persisted to DAL. + final DeletionResource deletionResource = + PipeInsertionDataNodeListener.getInstance() + .listenToDeleteData(dataRegionId.getId(), node); + // just get result. We have already waited for result in `listenToDeleteData` + if (deletionResource != null && deletionResource.waitForResult() == Status.FAILURE) { + throw deletionResource.getCause(); + } + writeUnlock(); + hasReleasedLock = true; - deleteDataInSealedFiles(sealedTsFileResource, deletion); + awaitSnapshotsBeforeMaterializing(barrierContext, sealedTsFileResource, deletion); + deleteDataInSealedFiles(sealedTsFileResource, deletion); + } finally { + finishDeleteMaterializationBarrier(barrierContext); + } } catch (Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } throw new IOException(e); } finally { if (!hasReleasedLock) { @@ -2959,6 +3055,9 @@ public void deleteByTable(RelationalDeleteDataNode node) throws IOException { } } + final DeleteMaterializationBarrierContext barrierContext = + beginDeleteMaterializationBarrierIfNecessary( + DeletionResource.isDeleteNodeGeneratedInLocalByIoTV2(node)); List> sealedTsFileResourceLists = new ArrayList<>(modEntries.size()); for (TableDeletionEntry modEntry : modEntries) { List sealedTsFileResource = new ArrayList<>(); @@ -2974,22 +3073,37 @@ public void deleteByTable(RelationalDeleteDataNode node) throws IOException { sealedTsFileResourceLists.add(sealedTsFileResource); } - // capture deleteDataNode and wait it to be persisted to DAL. - DeletionResource deletionResource = - PipeInsertionDataNodeListener.getInstance() - .listenToDeleteData(dataRegionId.getId(), node); - // just get result. We have already waited for result in `listenToDeleteData` - if (deletionResource != null && deletionResource.waitForResult() == Status.FAILURE) { - throw deletionResource.getCause(); + final Set impactedTsFilePaths = new HashSet<>(); + for (int i = 0; i < modEntries.size(); i++) { + impactedTsFilePaths.addAll( + collectImpactedTsFilePaths(sealedTsFileResourceLists.get(i), modEntries.get(i))); } + resolveDeleteMaterializationTargets(barrierContext, impactedTsFilePaths); + try { + // capture deleteDataNode and wait it to be persisted to DAL. + final DeletionResource deletionResource = + PipeInsertionDataNodeListener.getInstance() + .listenToDeleteData(dataRegionId.getId(), node); + // just get result. We have already waited for result in `listenToDeleteData` + if (deletionResource != null && deletionResource.waitForResult() == Status.FAILURE) { + throw deletionResource.getCause(); + } - writeUnlock(); - hasReleasedLock = true; + writeUnlock(); + hasReleasedLock = true; - for (int i = 0; i < modEntries.size(); i++) { - deleteDataInSealedFiles(sealedTsFileResourceLists.get(i), modEntries.get(i)); + for (int i = 0; i < modEntries.size(); i++) { + awaitSnapshotsBeforeMaterializing( + barrierContext, sealedTsFileResourceLists.get(i), modEntries.get(i)); + deleteDataInSealedFiles(sealedTsFileResourceLists.get(i), modEntries.get(i)); + } + } finally { + finishDeleteMaterializationBarrier(barrierContext); } } catch (Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } throw new IOException(e); } finally { if (!hasReleasedLock) { @@ -3028,22 +3142,35 @@ public void deleteDataDirectly(MeasurementPath pathToDelete, DeleteDataNode node } } TreeDeletionEntry deletion = new TreeDeletionEntry(pathToDelete, startTime, endTime); + final DeleteMaterializationBarrierContext barrierContext = + beginDeleteMaterializationBarrierIfNecessary( + DeletionResource.isDeleteNodeGeneratedInLocalByIoTV2(node)); List sealedTsFileResource = new ArrayList<>(); List unsealedTsFileResource = new ArrayList<>(); getTwoKindsOfTsFiles(sealedTsFileResource, unsealedTsFileResource, startTime, endTime); deleteDataDirectlyInFile(unsealedTsFileResource, deletion); - // capture deleteDataNode and wait it to be persisted to DAL. - DeletionResource deletionResource = - PipeInsertionDataNodeListener.getInstance() - .listenToDeleteData(dataRegionId.getId(), node); - // just get result. We have already waited for result in `listenToDeleteData` - if (deletionResource != null && deletionResource.waitForResult() == Status.FAILURE) { - throw deletionResource.getCause(); + resolveDeleteMaterializationTargets( + barrierContext, collectImpactedTsFilePaths(sealedTsFileResource, deletion)); + try { + // capture deleteDataNode and wait it to be persisted to DAL. + final DeletionResource deletionResource = + PipeInsertionDataNodeListener.getInstance() + .listenToDeleteData(dataRegionId.getId(), node); + // just get result. We have already waited for result in `listenToDeleteData` + if (deletionResource != null && deletionResource.waitForResult() == Status.FAILURE) { + throw deletionResource.getCause(); + } + writeUnlock(); + releasedLock = true; + awaitSnapshotsBeforeMaterializing(barrierContext, sealedTsFileResource, deletion); + deleteDataDirectlyInFile(sealedTsFileResource, deletion); + } finally { + finishDeleteMaterializationBarrier(barrierContext); } - writeUnlock(); - releasedLock = true; - deleteDataDirectlyInFile(sealedTsFileResource, deletion); } catch (Exception e) { + if (e instanceof InterruptedException) { + Thread.currentThread().interrupt(); + } throw new IOException(e); } finally { if (!releasedLock) { diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/deletion/PipeTsFileDeletionBarrierTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/deletion/PipeTsFileDeletionBarrierTest.java new file mode 100644 index 0000000000000..5a7a4e8a0a115 --- /dev/null +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/deletion/PipeTsFileDeletionBarrierTest.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.iotdb.db.pipe.consensus.deletion; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Collections; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +public class PipeTsFileDeletionBarrierTest { + + private static final int TEST_REGION_ID = 1; + private static final String TEST_TSFILE_PATH = "/tmp/test.tsfile"; + + @After + public void tearDown() { + PipeTsFileDeletionBarrier.getInstance().clear(); + } + + @Test + public void testSnapshotWaitsForEarlierDeletionResolution() throws Exception { + final PipeTsFileDeletionBarrier barrier = PipeTsFileDeletionBarrier.getInstance(); + final long deleteSeq = barrier.beginDeletion(TEST_REGION_ID); + + final CountDownLatch started = new CountDownLatch(1); + final CountDownLatch finished = new CountDownLatch(1); + final AtomicReference failure = new AtomicReference<>(null); + + final Thread thread = + new Thread( + () -> { + try { + started.countDown(); + barrier.awaitDeletionResolutionUpTo(TEST_REGION_ID, deleteSeq); + } catch (final Throwable throwable) { + failure.set(throwable); + } finally { + finished.countDown(); + } + }, + "test-wait-delete-resolution"); + thread.start(); + + Assert.assertTrue(started.await(5, TimeUnit.SECONDS)); + Assert.assertFalse(finished.await(200, TimeUnit.MILLISECONDS)); + + barrier.resolveDeletionTargets(TEST_REGION_ID, deleteSeq, Collections.emptySet()); + + Assert.assertTrue(finished.await(5, TimeUnit.SECONDS)); + if (failure.get() != null) { + throw new AssertionError("Snapshot wait should finish successfully", failure.get()); + } + thread.join(TimeUnit.SECONDS.toMillis(5)); + } + + @Test + public void testSnapshotWaitsForPendingDeletionUpToUpperBound() throws Exception { + final PipeTsFileDeletionBarrier barrier = PipeTsFileDeletionBarrier.getInstance(); + final long deleteSeq = barrier.beginDeletion(TEST_REGION_ID); + barrier.resolveDeletionTargets( + TEST_REGION_ID, deleteSeq, Collections.singleton(TEST_TSFILE_PATH)); + + final long snapshotUpperBound = barrier.beginSnapshot(TEST_REGION_ID, TEST_TSFILE_PATH); + Assert.assertEquals(deleteSeq, snapshotUpperBound); + + final CountDownLatch started = new CountDownLatch(1); + final CountDownLatch finished = new CountDownLatch(1); + final AtomicReference failure = new AtomicReference<>(null); + + final Thread thread = + new Thread( + () -> { + try { + started.countDown(); + barrier.awaitPendingDeletionsUpTo(TEST_TSFILE_PATH, snapshotUpperBound); + } catch (final Throwable throwable) { + failure.set(throwable); + } finally { + finished.countDown(); + } + }, + "test-wait-pending-delete"); + thread.start(); + + Assert.assertTrue(started.await(5, TimeUnit.SECONDS)); + Assert.assertFalse(finished.await(200, TimeUnit.MILLISECONDS)); + + barrier.finishDeletion(deleteSeq, Collections.singleton(TEST_TSFILE_PATH)); + + Assert.assertTrue(finished.await(5, TimeUnit.SECONDS)); + if (failure.get() != null) { + throw new AssertionError("Pending delete wait should finish successfully", failure.get()); + } + + barrier.finishSnapshot(TEST_TSFILE_PATH, snapshotUpperBound); + thread.join(TimeUnit.SECONDS.toMillis(5)); + } + + @Test + public void testLaterDeletionWaitsForEarlierSnapshot() throws Exception { + final PipeTsFileDeletionBarrier barrier = PipeTsFileDeletionBarrier.getInstance(); + final long snapshotUpperBound = barrier.beginSnapshot(TEST_REGION_ID, TEST_TSFILE_PATH); + Assert.assertEquals(0L, snapshotUpperBound); + + final long deleteSeq = barrier.beginDeletion(TEST_REGION_ID); + barrier.resolveDeletionTargets( + TEST_REGION_ID, deleteSeq, Collections.singleton(TEST_TSFILE_PATH)); + + final CountDownLatch started = new CountDownLatch(1); + final CountDownLatch finished = new CountDownLatch(1); + final AtomicReference failure = new AtomicReference<>(null); + + final Thread thread = + new Thread( + () -> { + try { + started.countDown(); + barrier.awaitSnapshotsBeforeMaterialization(TEST_TSFILE_PATH, deleteSeq); + } catch (final Throwable throwable) { + failure.set(throwable); + } finally { + finished.countDown(); + } + }, + "test-wait-earlier-snapshot"); + thread.start(); + + Assert.assertTrue(started.await(5, TimeUnit.SECONDS)); + Assert.assertFalse(finished.await(200, TimeUnit.MILLISECONDS)); + + barrier.finishSnapshot(TEST_TSFILE_PATH, snapshotUpperBound); + + Assert.assertTrue(finished.await(5, TimeUnit.SECONDS)); + if (failure.get() != null) { + throw new AssertionError("Later deletion wait should finish successfully", failure.get()); + } + + barrier.finishDeletion(deleteSeq, Collections.singleton(TEST_TSFILE_PATH)); + thread.join(TimeUnit.SECONDS.toMillis(5)); + } +} diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTsFileInsertionEventTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTsFileInsertionEventTest.java index 5ba0843bf8003..90f3d3b002f28 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTsFileInsertionEventTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTsFileInsertionEventTest.java @@ -24,11 +24,13 @@ import org.apache.iotdb.commons.auth.entity.PrivilegeType; import org.apache.iotdb.commons.conf.IoTDBConstant; import org.apache.iotdb.commons.exception.auth.AccessDeniedException; +import org.apache.iotdb.commons.path.MeasurementPath; import org.apache.iotdb.commons.path.PartialPath; import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePattern; import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern; import org.apache.iotdb.commons.utils.FileUtils; import org.apache.iotdb.db.auth.AuthorityChecker; +import org.apache.iotdb.db.pipe.consensus.deletion.PipeTsFileDeletionBarrier; import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent; import org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName; import org.apache.iotdb.db.queryengine.plan.relational.security.AccessControl; @@ -36,6 +38,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.RelationalAuthorStatement; import org.apache.iotdb.db.queryengine.plan.statement.Statement; import org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionTestFileWriter; +import org.apache.iotdb.db.storageengine.dataregion.modification.TreeDeletionEntry; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource; import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus; import org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator; @@ -48,6 +51,7 @@ import org.apache.tsfile.file.metadata.enums.CompressionType; import org.apache.tsfile.file.metadata.enums.TSEncoding; import org.apache.tsfile.read.common.TimeRange; +import org.junit.After; import org.junit.Assert; import org.junit.Test; @@ -56,6 +60,10 @@ import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -64,6 +72,11 @@ import static org.apache.iotdb.db.auth.AuthorityChecker.NO_PERMISSION_PROMOTION; public class PipeTsFileInsertionEventTest { + @After + public void tearDown() { + PipeTsFileDeletionBarrier.getInstance().clear(); + } + @Test public void testAuthCheck() throws Exception { final AccessControl oldControl = AuthorityChecker.getAccessControl(); @@ -101,6 +114,8 @@ public void testAuthCheck() throws Exception { "db", resource, null, + null, + true, true, false, false, @@ -126,6 +141,8 @@ public void testAuthCheck() throws Exception { "root.db", resource, null, + null, + true, true, false, false, @@ -157,6 +174,575 @@ public void testAuthCheck() throws Exception { } } + @Test + public void testLateCreatedModFileCanStillBeObservedAfterShallowCopy() throws Exception { + final File baseDir = new File(TestConstant.BASE_OUTPUT_PATH, "late-mod"); + final File tsFile = new File(baseDir, "late-mod.tsfile"); + PipeTsFileInsertionEvent originalEvent = null; + PipeTsFileInsertionEvent copiedEvent = null; + try { + Assert.assertTrue(baseDir.mkdirs() || baseDir.exists()); + Assert.assertTrue(tsFile.createNewFile() || tsFile.exists()); + + final TsFileResource resource = new TsFileResource(tsFile); + resource.setStatus(TsFileResourceStatus.NORMAL); + + originalEvent = + new PipeTsFileInsertionEvent( + false, + "root.db", + resource, + null, + null, + true, + true, + false, + false, + null, + "testPipe", + 1L, + null, + buildUnionPattern(true, Collections.singletonList(new IoTDBTreePattern(true, null))), + new TablePattern(false, null, null), + null, + null, + null, + true, + Long.MIN_VALUE, + Long.MAX_VALUE); + copiedEvent = + originalEvent.shallowCopySelfAndBindPipeTaskMetaForProgressReport( + "testPipeCopy", + 2L, + null, + buildUnionPattern(true, Collections.singletonList(new IoTDBTreePattern(true, null))), + new TablePattern(false, null, null), + null, + null, + null, + true, + Long.MIN_VALUE, + Long.MAX_VALUE); + + Assert.assertFalse(originalEvent.isWithMod()); + Assert.assertFalse(copiedEvent.isWithMod()); + + resource + .getExclusiveModFile() + .write(new TreeDeletionEntry(new MeasurementPath("root.db.d1.s1"), 0, 1)); + final File modFile = resource.getExclusiveModFile().getFile(); + Assert.assertTrue(modFile.exists()); + + Assert.assertTrue(originalEvent.isWithMod()); + Assert.assertEquals(modFile, originalEvent.getModFile()); + Assert.assertTrue(copiedEvent.isWithMod()); + Assert.assertEquals(modFile, copiedEvent.getModFile()); + + copiedEvent.disableMod4NonTransferPipes(false); + Assert.assertFalse(copiedEvent.isWithMod()); + Assert.assertNull(copiedEvent.getModFile()); + } finally { + if (originalEvent != null) { + originalEvent.close(); + } + if (copiedEvent != null) { + copiedEvent.close(); + } + FileUtils.deleteFileOrDirectory(baseDir); + } + } + + @Test + public void testShallowCopyKeepsPinnedModSnapshotAfterSourceModDisappears() throws Exception { + final File tsFile = + new File( + TsFileNameGenerator.generateNewTsFilePath( + TestConstant.BASE_OUTPUT_PATH + IoTDBConstant.SEQUENCE_FOLDER_NAME, 1, 1, 1, 4)); + PipeTsFileInsertionEvent originalEvent = null; + PipeTsFileInsertionEvent copiedEvent = null; + try { + Assert.assertTrue(tsFile.getParentFile().mkdirs() || tsFile.getParentFile().exists()); + Assert.assertTrue(tsFile.createNewFile() || tsFile.exists()); + + final TsFileResource resource = new TsFileResource(tsFile); + resource.setStatus(TsFileResourceStatus.NORMAL); + resource + .getExclusiveModFile() + .write(new TreeDeletionEntry(new MeasurementPath("root.db.d1.s1"), 0, 1)); + final File originalModFile = resource.getExclusiveModFile().getFile(); + Assert.assertTrue(originalModFile.exists()); + + originalEvent = + new PipeTsFileInsertionEvent( + false, + "root.db", + resource, + null, + null, + true, + true, + false, + false, + null, + null, + 1L, + null, + buildUnionPattern(true, Collections.singletonList(new IoTDBTreePattern(true, null))), + new TablePattern(false, null, null), + null, + null, + null, + true, + Long.MIN_VALUE, + Long.MAX_VALUE); + Assert.assertTrue(originalEvent.increaseReferenceCount("assigner")); + + final File assignerPinnedModFile = originalEvent.getModFile(); + Assert.assertNotNull(assignerPinnedModFile); + Assert.assertNotEquals( + originalModFile.getAbsolutePath(), assignerPinnedModFile.getAbsolutePath()); + + Assert.assertTrue(originalModFile.delete()); + Assert.assertFalse(originalModFile.exists()); + + copiedEvent = + originalEvent.shallowCopySelfAndBindPipeTaskMetaForProgressReport( + "testPipeCopy", + 2L, + null, + buildUnionPattern(true, Collections.singletonList(new IoTDBTreePattern(true, null))), + new TablePattern(false, null, null), + null, + null, + null, + true, + Long.MIN_VALUE, + Long.MAX_VALUE); + + Assert.assertTrue(copiedEvent.isWithMod()); + Assert.assertEquals( + assignerPinnedModFile.getAbsolutePath(), copiedEvent.getModFile().getAbsolutePath()); + + Assert.assertTrue(copiedEvent.increaseReferenceCount("source")); + final File pipePinnedModFile = copiedEvent.getModFile(); + Assert.assertNotNull(pipePinnedModFile); + Assert.assertTrue(pipePinnedModFile.exists()); + Assert.assertNotEquals( + originalModFile.getAbsolutePath(), pipePinnedModFile.getAbsolutePath()); + } finally { + if (copiedEvent != null) { + copiedEvent.clearReferenceCount("source"); + copiedEvent.close(); + } + if (originalEvent != null) { + originalEvent.clearReferenceCount("assigner"); + originalEvent.close(); + } + FileUtils.deleteFileOrDirectory(new File(TestConstant.BASE_OUTPUT_PATH)); + } + } + + @Test + public void testPinnedEventDoesNotAdoptFutureModFile() throws Exception { + final File tsFile = createTestTsFileUnderDataRegion(1, 1, 1, 5); + PipeTsFileInsertionEvent originalEvent = null; + PipeTsFileInsertionEvent copiedEvent = null; + try { + Assert.assertTrue(tsFile.getParentFile().mkdirs() || tsFile.getParentFile().exists()); + Assert.assertTrue(tsFile.createNewFile() || tsFile.exists()); + + final TsFileResource resource = new TsFileResource(tsFile); + resource.setStatus(TsFileResourceStatus.NORMAL); + + originalEvent = + new PipeTsFileInsertionEvent( + false, + "root.db", + resource, + null, + null, + true, + true, + false, + false, + null, + "testPipe", + 1L, + null, + buildUnionPattern(true, Collections.singletonList(new IoTDBTreePattern(true, null))), + new TablePattern(false, null, null), + null, + null, + null, + true, + Long.MIN_VALUE, + Long.MAX_VALUE); + + Assert.assertFalse(originalEvent.isWithMod()); + Assert.assertTrue(originalEvent.increaseReferenceCount("assigner")); + Assert.assertFalse(originalEvent.isWithMod()); + Assert.assertNull(originalEvent.getModFile()); + + resource + .getExclusiveModFile() + .write(new TreeDeletionEntry(new MeasurementPath("root.db.d1.s1"), 0, 1)); + Assert.assertTrue(resource.getExclusiveModFile().getFile().exists()); + + Assert.assertFalse(originalEvent.isWithMod()); + Assert.assertNull(originalEvent.getModFile()); + + copiedEvent = + originalEvent.shallowCopySelfAndBindPipeTaskMetaForProgressReport( + "testPipeCopy", + 2L, + null, + buildUnionPattern(true, Collections.singletonList(new IoTDBTreePattern(true, null))), + new TablePattern(false, null, null), + null, + null, + null, + true, + Long.MIN_VALUE, + Long.MAX_VALUE); + + Assert.assertFalse(copiedEvent.isWithMod()); + Assert.assertNull(copiedEvent.getModFile()); + + Assert.assertTrue(copiedEvent.increaseReferenceCount("source")); + Assert.assertTrue(copiedEvent.isWithMod()); + Assert.assertNotNull(copiedEvent.getModFile()); + Assert.assertNotEquals( + resource.getExclusiveModFile().getFile().getAbsolutePath(), + copiedEvent.getModFile().getAbsolutePath()); + } finally { + if (copiedEvent != null) { + copiedEvent.clearReferenceCount("source"); + copiedEvent.close(); + } + if (originalEvent != null) { + originalEvent.clearReferenceCount("assigner"); + originalEvent.close(); + } + FileUtils.deleteFileOrDirectory(new File(TestConstant.BASE_OUTPUT_PATH)); + } + } + + @Test + public void testPinnedModFilePathIsStableAfterIncreaseReferenceCount() throws Exception { + final File tsFile = + new File( + TsFileNameGenerator.generateNewTsFilePath( + TestConstant.BASE_OUTPUT_PATH + IoTDBConstant.SEQUENCE_FOLDER_NAME, 1, 1, 1, 2)); + PipeTsFileInsertionEvent event = null; + try { + Assert.assertTrue(tsFile.getParentFile().mkdirs() || tsFile.getParentFile().exists()); + Assert.assertTrue(tsFile.createNewFile() || tsFile.exists()); + + final TsFileResource resource = new TsFileResource(tsFile); + resource.setStatus(TsFileResourceStatus.NORMAL); + + event = + new PipeTsFileInsertionEvent( + false, + "root.db", + resource, + null, + null, + true, + true, + false, + false, + null, + "testPipe", + 1L, + null, + buildUnionPattern(true, Collections.singletonList(new IoTDBTreePattern(true, null))), + new TablePattern(false, null, null), + null, + null, + null, + true, + Long.MIN_VALUE, + Long.MAX_VALUE); + + Assert.assertFalse(event.isWithMod()); + + // Create the mod file after event construction but before pinning. + resource + .getExclusiveModFile() + .write(new TreeDeletionEntry(new MeasurementPath("root.db.d1.s1"), 0, 1)); + final File originalModFile = resource.getExclusiveModFile().getFile(); + Assert.assertTrue(originalModFile.exists()); + Assert.assertTrue(event.isWithMod()); + Assert.assertEquals(originalModFile.getAbsolutePath(), event.getModFile().getAbsolutePath()); + + // Pin the event: mod file should be copied to pipe dir and must not be overwritten by + // refresh. + Assert.assertTrue(event.increaseReferenceCount("test")); + final File pinnedModFile = event.getModFile(); + Assert.assertNotNull(pinnedModFile); + Assert.assertNotEquals(originalModFile.getAbsolutePath(), pinnedModFile.getAbsolutePath()); + + // Ensure subsequent refresh calls won't revert it to the original path. + Assert.assertTrue(event.isWithMod()); + Assert.assertEquals(pinnedModFile.getAbsolutePath(), event.getModFile().getAbsolutePath()); + } finally { + if (event != null) { + event.clearReferenceCount("test"); + event.close(); + } + FileUtils.deleteFileOrDirectory(new File(TestConstant.BASE_OUTPUT_PATH)); + } + } + + @Test + public void testDelayedAssignerRetainDoesNotFreezeFutureMod() throws Exception { + final File tsFile = + new File( + TsFileNameGenerator.generateNewTsFilePath( + TestConstant.BASE_OUTPUT_PATH + IoTDBConstant.SEQUENCE_FOLDER_NAME, 1, 1, 1, 3)); + PipeTsFileInsertionEvent event = null; + try { + Assert.assertTrue(tsFile.getParentFile().mkdirs() || tsFile.getParentFile().exists()); + Assert.assertTrue(tsFile.createNewFile() || tsFile.exists()); + + final TsFileResource resource = new TsFileResource(tsFile); + resource.setStatus(TsFileResourceStatus.NORMAL); + + event = createTestTsFileInsertionEvent(resource, null); + event.enableDelayModSnapshotUntilReplicateIndex(); + + Assert.assertTrue(event.increaseReferenceCount("assigner")); + Assert.assertFalse(event.isWithMod()); + Assert.assertFalse(event.isModPinned()); + Assert.assertNotEquals(tsFile.getAbsolutePath(), event.getTsFile().getAbsolutePath()); + + resource + .getExclusiveModFile() + .write(new TreeDeletionEntry(new MeasurementPath("root.db.d1.s1"), 0, 1)); + final File originalModFile = resource.getExclusiveModFile().getFile(); + Assert.assertTrue(originalModFile.exists()); + + Assert.assertTrue(event.isWithMod()); + Assert.assertEquals(originalModFile.getAbsolutePath(), event.getModFile().getAbsolutePath()); + Assert.assertFalse(event.isModPinned()); + } finally { + if (event != null) { + event.clearReferenceCount("assigner"); + event.close(); + } + FileUtils.deleteFileOrDirectory(new File(TestConstant.BASE_OUTPUT_PATH)); + } + } + + @Test + public void testDelayedPipeCopyWaitsForEarlierDeletionBeforeFreezingMod() throws Exception { + final File tsFile = createTestTsFileUnderDataRegion(1, 1, 1, 6); + PipeTsFileInsertionEvent originalEvent = null; + PipeTsFileInsertionEvent copiedEvent = null; + Thread t = null; + try { + Assert.assertTrue(tsFile.getParentFile().mkdirs() || tsFile.getParentFile().exists()); + Assert.assertTrue(tsFile.createNewFile() || tsFile.exists()); + + final TsFileResource resource = new TsFileResource(tsFile); + resource.setStatus(TsFileResourceStatus.NORMAL); + final PipeTsFileDeletionBarrier barrier = PipeTsFileDeletionBarrier.getInstance(); + final int regionId = Integer.parseInt(resource.getDataRegionId()); + final String tsFilePath = resource.getTsFilePath(); + + originalEvent = createTestTsFileInsertionEvent(resource, null); + originalEvent.enableDelayModSnapshotUntilReplicateIndex(); + Assert.assertTrue(originalEvent.increaseReferenceCount("assigner")); + + copiedEvent = createTestPipeCopy(originalEvent, "testPipeCopy"); + final long deleteSeq = barrier.beginDeletion(regionId); + barrier.resolveDeletionTargets(regionId, deleteSeq, Collections.singleton(tsFilePath)); + copiedEvent.setReplicateIndexForIoTV2(100L); + + final CountDownLatch started = new CountDownLatch(1); + final CountDownLatch finished = new CountDownLatch(1); + final AtomicBoolean increased = new AtomicBoolean(false); + final PipeTsFileInsertionEvent finalCopiedEvent = copiedEvent; + + t = + new Thread( + () -> { + started.countDown(); + increased.set(finalCopiedEvent.increaseReferenceCount("source")); + finished.countDown(); + }, + "test-tsfile-delete-barrier-wait-earlier-delete"); + t.start(); + + Assert.assertTrue(started.await(5, TimeUnit.SECONDS)); + Assert.assertFalse(finished.await(200, TimeUnit.MILLISECONDS)); + + resource + .getExclusiveModFile() + .write(new TreeDeletionEntry(new MeasurementPath("root.db.d1.s1"), 0, 1)); + final File originalModFile = resource.getExclusiveModFile().getFile(); + Assert.assertTrue(originalModFile.exists()); + + barrier.finishDeletion(deleteSeq, Collections.singleton(tsFilePath)); + + Assert.assertTrue(finished.await(5, TimeUnit.SECONDS)); + Assert.assertTrue(increased.get()); + Assert.assertTrue(copiedEvent.isWithMod()); + Assert.assertTrue(copiedEvent.isModPinned()); + Assert.assertNotNull(copiedEvent.getModFile()); + Assert.assertNotEquals( + originalModFile.getAbsolutePath(), copiedEvent.getModFile().getAbsolutePath()); + } finally { + if (t != null) { + t.join(TimeUnit.SECONDS.toMillis(5)); + } + if (copiedEvent != null && copiedEvent.getReferenceCount() > 0) { + copiedEvent.clearReferenceCount("source"); + copiedEvent.close(); + } + if (originalEvent != null && originalEvent.getReferenceCount() > 0) { + originalEvent.clearReferenceCount("assigner"); + originalEvent.close(); + } + FileUtils.deleteFileOrDirectory(new File(TestConstant.BASE_OUTPUT_PATH)); + } + } + + @Test + public void testDelayedPipeCopyDoesNotFreezeLaterDeletion() throws Exception { + final File tsFile = createTestTsFileUnderDataRegion(1, 1, 1, 7); + PipeTsFileInsertionEvent originalEvent = null; + PipeTsFileInsertionEvent copiedEvent = null; + Thread t = null; + try { + Assert.assertTrue(tsFile.getParentFile().mkdirs() || tsFile.getParentFile().exists()); + Assert.assertTrue(tsFile.createNewFile() || tsFile.exists()); + + final TsFileResource resource = new TsFileResource(tsFile); + resource.setStatus(TsFileResourceStatus.NORMAL); + final PipeTsFileDeletionBarrier barrier = PipeTsFileDeletionBarrier.getInstance(); + final int regionId = Integer.parseInt(resource.getDataRegionId()); + final String tsFilePath = resource.getTsFilePath(); + + originalEvent = createTestTsFileInsertionEvent(resource, null); + originalEvent.enableDelayModSnapshotUntilReplicateIndex(); + Assert.assertTrue(originalEvent.increaseReferenceCount("assigner")); + + copiedEvent = createTestPipeCopy(originalEvent, "testPipeCopy"); + copiedEvent.setReplicateIndexForIoTV2(100L); + + final long deleteSeq = barrier.beginDeletion(regionId); + barrier.resolveDeletionTargets(regionId, deleteSeq, Collections.singleton(tsFilePath)); + + final CountDownLatch started = new CountDownLatch(1); + final CountDownLatch finished = new CountDownLatch(1); + final AtomicReference failure = new AtomicReference<>(null); + t = + new Thread( + () -> { + try { + started.countDown(); + barrier.awaitSnapshotsBeforeMaterialization(tsFilePath, deleteSeq); + resource + .getExclusiveModFile() + .write(new TreeDeletionEntry(new MeasurementPath("root.db.d1.s1"), 0, 1)); + barrier.finishDeletion(deleteSeq, Collections.singleton(tsFilePath)); + } catch (final Throwable throwable) { + failure.set(throwable); + } finally { + finished.countDown(); + } + }, + "test-tsfile-delete-barrier-wait-later-delete"); + t.start(); + + Assert.assertTrue(started.await(5, TimeUnit.SECONDS)); + Assert.assertFalse(finished.await(200, TimeUnit.MILLISECONDS)); + + Assert.assertTrue(copiedEvent.increaseReferenceCount("source")); + Assert.assertFalse(copiedEvent.isWithMod()); + Assert.assertNull(copiedEvent.getModFile()); + + Assert.assertTrue(finished.await(5, TimeUnit.SECONDS)); + if (failure.get() != null) { + throw new AssertionError("Later deletion thread should finish successfully", failure.get()); + } + Assert.assertTrue(resource.getExclusiveModFile().getFile().exists()); + } finally { + if (t != null) { + t.join(TimeUnit.SECONDS.toMillis(5)); + } + if (copiedEvent != null && copiedEvent.getReferenceCount() > 0) { + copiedEvent.clearReferenceCount("source"); + copiedEvent.close(); + } + if (originalEvent != null && originalEvent.getReferenceCount() > 0) { + originalEvent.clearReferenceCount("assigner"); + originalEvent.close(); + } + FileUtils.deleteFileOrDirectory(new File(TestConstant.BASE_OUTPUT_PATH)); + } + } + + private PipeTsFileInsertionEvent createTestTsFileInsertionEvent( + final TsFileResource resource, final String pipeName) { + return new PipeTsFileInsertionEvent( + false, + "root.db", + resource, + null, + null, + true, + true, + false, + false, + null, + pipeName, + 1L, + null, + buildUnionPattern(true, Collections.singletonList(new IoTDBTreePattern(true, null))), + new TablePattern(false, null, null), + null, + null, + null, + true, + Long.MIN_VALUE, + Long.MAX_VALUE); + } + + private PipeTsFileInsertionEvent createTestPipeCopy( + final PipeTsFileInsertionEvent sourceEvent, final String pipeName) { + return sourceEvent.shallowCopySelfAndBindPipeTaskMetaForProgressReport( + pipeName, + 2L, + null, + buildUnionPattern(true, Collections.singletonList(new IoTDBTreePattern(true, null))), + new TablePattern(false, null, null), + null, + null, + null, + true, + Long.MIN_VALUE, + Long.MAX_VALUE); + } + + private File createTestTsFileUnderDataRegion( + final long time, + final long version, + final int innerSpaceCompactionCount, + final int crossSpaceCompactionCount) { + return new File( + new File( + new File( + new File( + TestConstant.BASE_OUTPUT_PATH + IoTDBConstant.SEQUENCE_FOLDER_NAME, "root.db"), + "1"), + "0"), + TsFileNameGenerator.generateNewTsFileName( + time, version, innerSpaceCompactionCount, crossSpaceCompactionCount)); + } + static class TestAccessControl implements AccessControl { @Override diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java index 8814190755e27..259ec83650db6 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java @@ -560,6 +560,8 @@ private void testTsFilePointNum( "", new TsFileResource(tsFile), null, + null, + true, true, false, false,