diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/PipeTsFileDeletionBarrier.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/PipeTsFileDeletionBarrier.java
new file mode 100644
index 0000000000000..954821dea0278
--- /dev/null
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/consensus/deletion/PipeTsFileDeletionBarrier.java
@@ -0,0 +1,334 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.consensus.deletion;
+
+import org.apache.iotdb.commons.utils.TestOnly;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.NavigableSet;
+import java.util.Objects;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * An ordered barrier for coordinating delete materialization and TsFile transfer.
+ *
+ *
Delete materialization and realtime TsFile transfer are observed by followers through
+ * replicateIndex order, but the leader locally materializes delete mods after publishing delete
+ * events. This barrier ensures that a TsFile snapshot:
+ *
+ *
1. includes every delete that entered the delete path before the TsFile got its replicate
+ * index, and
+ *
+ *
2. excludes deletes that entered after that replicate index.
+ */
+public class PipeTsFileDeletionBarrier {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(PipeTsFileDeletionBarrier.class);
+
+ private static final long INITIAL_DELETE_SEQ = 0L;
+
+ private final ConcurrentMap regionId2DeletionState =
+ new ConcurrentHashMap<>();
+ private final ConcurrentMap tsFilePath2BarrierState =
+ new ConcurrentHashMap<>();
+
+ public long beginDeletion(final int regionId) {
+ return getOrCreateRegionDeletionState(regionId).beginDeletion();
+ }
+
+ public void resolveDeletionTargets(
+ final int regionId, final long deleteSeq, final Collection tsFilePaths) {
+ if (deleteSeq <= INITIAL_DELETE_SEQ) {
+ return;
+ }
+
+ final RegionDeletionState regionDeletionState = getOrCreateRegionDeletionState(regionId);
+ final Set sanitizedTsFilePaths = sanitizeTsFilePaths(tsFilePaths);
+
+ synchronized (regionDeletionState.monitor) {
+ for (final String tsFilePath : sanitizedTsFilePaths) {
+ getOrCreateTsFileBarrierState(tsFilePath).registerPendingDeletion(deleteSeq);
+ }
+ regionDeletionState.resolveDeletion(deleteSeq);
+ }
+ }
+
+ public long beginSnapshot(final int regionId, final String tsFilePath) {
+ if (Objects.isNull(tsFilePath)) {
+ return INITIAL_DELETE_SEQ;
+ }
+
+ final RegionDeletionState regionDeletionState = getOrCreateRegionDeletionState(regionId);
+ synchronized (regionDeletionState.monitor) {
+ final long snapshotUpperBound = regionDeletionState.getMaxAllocatedDeletionSeq();
+ getOrCreateTsFileBarrierState(tsFilePath).registerSnapshot(snapshotUpperBound);
+ return snapshotUpperBound;
+ }
+ }
+
+ public void awaitDeletionResolutionUpTo(final int regionId, final long deleteSeqUpperBound)
+ throws InterruptedException {
+ if (deleteSeqUpperBound <= INITIAL_DELETE_SEQ) {
+ return;
+ }
+
+ getOrCreateRegionDeletionState(regionId).awaitDeletionResolutionUpTo(deleteSeqUpperBound);
+ }
+
+ public void awaitPendingDeletionsUpTo(final String tsFilePath, final long deleteSeqUpperBound)
+ throws InterruptedException {
+ if (Objects.isNull(tsFilePath) || deleteSeqUpperBound <= INITIAL_DELETE_SEQ) {
+ return;
+ }
+
+ final TsFileBarrierState barrierState = tsFilePath2BarrierState.get(tsFilePath);
+ if (Objects.isNull(barrierState)) {
+ return;
+ }
+
+ barrierState.awaitPendingDeletionsUpTo(deleteSeqUpperBound);
+ }
+
+ public void awaitSnapshotsBeforeMaterialization(final String tsFilePath, final long deleteSeq)
+ throws InterruptedException {
+ if (Objects.isNull(tsFilePath) || deleteSeq <= INITIAL_DELETE_SEQ) {
+ return;
+ }
+
+ final TsFileBarrierState barrierState = tsFilePath2BarrierState.get(tsFilePath);
+ if (Objects.isNull(barrierState)) {
+ return;
+ }
+
+ barrierState.awaitSnapshotsBeforeMaterialization(deleteSeq);
+ }
+
+ public void finishSnapshot(final String tsFilePath, final long snapshotUpperBound) {
+ if (Objects.isNull(tsFilePath)) {
+ return;
+ }
+
+ final TsFileBarrierState barrierState = tsFilePath2BarrierState.get(tsFilePath);
+ if (Objects.isNull(barrierState)) {
+ return;
+ }
+
+ barrierState.finishSnapshot(snapshotUpperBound);
+ cleanupTsFileBarrierStateIfEmpty(tsFilePath, barrierState);
+ }
+
+ public void finishDeletion(final long deleteSeq, final Collection tsFilePaths) {
+ if (deleteSeq <= INITIAL_DELETE_SEQ) {
+ return;
+ }
+
+ final Set sanitizedTsFilePaths = sanitizeTsFilePaths(tsFilePaths);
+ for (final String tsFilePath : sanitizedTsFilePaths) {
+ final TsFileBarrierState barrierState = tsFilePath2BarrierState.get(tsFilePath);
+ if (Objects.isNull(barrierState)) {
+ continue;
+ }
+
+ barrierState.finishDeletion(deleteSeq);
+ cleanupTsFileBarrierStateIfEmpty(tsFilePath, barrierState);
+ }
+ }
+
+ @TestOnly
+ public void clear() {
+ regionId2DeletionState.clear();
+ tsFilePath2BarrierState.clear();
+ }
+
+ private RegionDeletionState getOrCreateRegionDeletionState(final int regionId) {
+ return regionId2DeletionState.computeIfAbsent(regionId, ignored -> new RegionDeletionState());
+ }
+
+ private TsFileBarrierState getOrCreateTsFileBarrierState(final String tsFilePath) {
+ return tsFilePath2BarrierState.computeIfAbsent(tsFilePath, ignored -> new TsFileBarrierState());
+ }
+
+ private Set sanitizeTsFilePaths(final Collection tsFilePaths) {
+ if (Objects.isNull(tsFilePaths) || tsFilePaths.isEmpty()) {
+ return Collections.emptySet();
+ }
+
+ final Set sanitizedTsFilePaths = new HashSet<>();
+ for (final String tsFilePath : tsFilePaths) {
+ if (Objects.nonNull(tsFilePath)) {
+ sanitizedTsFilePaths.add(tsFilePath);
+ }
+ }
+ return sanitizedTsFilePaths;
+ }
+
+ private void cleanupTsFileBarrierStateIfEmpty(
+ final String tsFilePath, final TsFileBarrierState barrierState) {
+ tsFilePath2BarrierState.computeIfPresent(
+ tsFilePath,
+ (ignored, state) -> {
+ if (state != barrierState) {
+ return state;
+ }
+ synchronized (state.monitor) {
+ return state.isEmptyUnsafe() ? null : state;
+ }
+ });
+ }
+
+ private static class RegionDeletionState {
+ private final Object monitor = new Object();
+ private final NavigableSet unresolvedDeletionSeqs = new TreeSet<>();
+
+ private long maxAllocatedDeletionSeq = INITIAL_DELETE_SEQ;
+ private long maxResolvedDeletionSeq = INITIAL_DELETE_SEQ;
+
+ private long beginDeletion() {
+ synchronized (monitor) {
+ final long deleteSeq = ++maxAllocatedDeletionSeq;
+ unresolvedDeletionSeqs.add(deleteSeq);
+ return deleteSeq;
+ }
+ }
+
+ private long getMaxAllocatedDeletionSeq() {
+ return maxAllocatedDeletionSeq;
+ }
+
+ private void resolveDeletion(final long deleteSeq) {
+ synchronized (monitor) {
+ if (deleteSeq > maxAllocatedDeletionSeq || deleteSeq <= INITIAL_DELETE_SEQ) {
+ LOGGER.warn(
+ "PipeTsFileDeletionBarrier: try to resolve invalid delete seq {}, max allocated {}.",
+ deleteSeq,
+ maxAllocatedDeletionSeq);
+ return;
+ }
+
+ unresolvedDeletionSeqs.remove(deleteSeq);
+ while (maxResolvedDeletionSeq < maxAllocatedDeletionSeq
+ && !unresolvedDeletionSeqs.contains(maxResolvedDeletionSeq + 1)) {
+ maxResolvedDeletionSeq++;
+ }
+ monitor.notifyAll();
+ }
+ }
+
+ private void awaitDeletionResolutionUpTo(final long deleteSeqUpperBound)
+ throws InterruptedException {
+ synchronized (monitor) {
+ while (maxResolvedDeletionSeq < deleteSeqUpperBound) {
+ monitor.wait();
+ }
+ }
+ }
+ }
+
+ private static class TsFileBarrierState {
+ private final Object monitor = new Object();
+ private final NavigableSet pendingDeletionSeqs = new TreeSet<>();
+ private final NavigableMap snapshotUpperBound2ReferenceCount = new TreeMap<>();
+
+ private void registerPendingDeletion(final long deleteSeq) {
+ synchronized (monitor) {
+ pendingDeletionSeqs.add(deleteSeq);
+ monitor.notifyAll();
+ }
+ }
+
+ private void finishDeletion(final long deleteSeq) {
+ synchronized (monitor) {
+ pendingDeletionSeqs.remove(deleteSeq);
+ monitor.notifyAll();
+ }
+ }
+
+ private void registerSnapshot(final long snapshotUpperBound) {
+ synchronized (monitor) {
+ snapshotUpperBound2ReferenceCount.merge(snapshotUpperBound, 1, Integer::sum);
+ monitor.notifyAll();
+ }
+ }
+
+ private void finishSnapshot(final long snapshotUpperBound) {
+ synchronized (monitor) {
+ snapshotUpperBound2ReferenceCount.computeIfPresent(
+ snapshotUpperBound, (ignored, count) -> count == 1 ? null : count - 1);
+ monitor.notifyAll();
+ }
+ }
+
+ private void awaitPendingDeletionsUpTo(final long deleteSeqUpperBound)
+ throws InterruptedException {
+ synchronized (monitor) {
+ while (!pendingDeletionSeqs.headSet(deleteSeqUpperBound, true).isEmpty()) {
+ monitor.wait();
+ }
+ }
+ }
+
+ private void awaitSnapshotsBeforeMaterialization(final long deleteSeq)
+ throws InterruptedException {
+ synchronized (monitor) {
+ while (hasSnapshotEarlierThan(deleteSeq)) {
+ monitor.wait();
+ }
+ }
+ }
+
+ private boolean hasSnapshotEarlierThan(final long deleteSeq) {
+ final Map.Entry firstSnapshot = snapshotUpperBound2ReferenceCount.firstEntry();
+ return Objects.nonNull(firstSnapshot) && firstSnapshot.getKey() < deleteSeq;
+ }
+
+ private boolean isEmptyUnsafe() {
+ return pendingDeletionSeqs.isEmpty() && snapshotUpperBound2ReferenceCount.isEmpty();
+ }
+ }
+
+ /////////////////////////////// singleton ///////////////////////////////
+
+ private static class PipeTsFileDeletionBarrierHolder {
+ private static final PipeTsFileDeletionBarrier INSTANCE = new PipeTsFileDeletionBarrier();
+
+ private PipeTsFileDeletionBarrierHolder() {
+ // empty constructor
+ }
+ }
+
+ public static PipeTsFileDeletionBarrier getInstance() {
+ return PipeTsFileDeletionBarrierHolder.INSTANCE;
+ }
+
+ private PipeTsFileDeletionBarrier() {
+ // empty constructor
+ }
+}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeCompactedTsFileInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeCompactedTsFileInsertionEvent.java
index 343c8d8932931..47eb15a27aeec 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeCompactedTsFileInsertionEvent.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeCompactedTsFileInsertionEvent.java
@@ -53,6 +53,8 @@ public PipeCompactedTsFileInsertionEvent(
anyOfOriginalEvents.getTreeModelDatabaseName(),
tsFileResource,
null,
+ null,
+ true,
bindIsWithMod(originalEvents),
bindIsLoaded(originalEvents),
bindIsGeneratedByHistoricalExtractor(originalEvents),
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
index 316b728a278a9..5fdf8e22ec164 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/event/common/tsfile/PipeTsFileInsertionEvent.java
@@ -35,6 +35,7 @@
import org.apache.iotdb.commons.pipe.datastructure.pattern.TreePattern;
import org.apache.iotdb.commons.pipe.resource.ref.PipePhantomReferenceManager.PipeEventResource;
import org.apache.iotdb.db.auth.AuthorityChecker;
+import org.apache.iotdb.db.pipe.consensus.deletion.PipeTsFileDeletionBarrier;
import org.apache.iotdb.db.pipe.event.ReferenceTrackableEvent;
import org.apache.iotdb.db.pipe.event.common.PipeInsertionEvent;
import org.apache.iotdb.db.pipe.event.common.tablet.PipeRawTabletInsertionEvent;
@@ -48,6 +49,7 @@
import org.apache.iotdb.db.pipe.source.dataregion.realtime.assigner.PipeTsFileEpochProgressIndexKeeper;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName;
import org.apache.iotdb.db.storageengine.dataregion.memtable.TsFileProcessor;
+import org.apache.iotdb.db.storageengine.dataregion.modification.ModificationFile;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.pipe.api.event.dml.insertion.TabletInsertionEvent;
import org.apache.iotdb.pipe.api.event.dml.insertion.TsFileInsertionEvent;
@@ -81,10 +83,20 @@ public class PipeTsFileInsertionEvent extends PipeInsertionEvent
private File tsFile;
private long extractTime = 0;
+ // Whether this event should transfer mod files if they exist.
+ private boolean shouldTransferModFile;
+ // Whether the assigner-side first retain should delay mod snapshotting until replicateIndex is
+ // assigned to the pipe-specific copy.
+ private boolean shouldDelayModSnapshotUntilReplicateIndex;
+ // Whether this event should consult the live TsFileResource for mod visibility before pinning.
+ private boolean shouldRefreshModFileStateFromResource;
+ // Whether the current modFile is already pinned and should no longer be overwritten by refresh.
+ private boolean isModPinned;
// This is true iff the modFile exists and should be transferred
private boolean isWithMod;
private File modFile;
private final File sharedModFile;
+ private long snapshotUpperBoundForDelayedMod = NO_COMMIT_ID;
protected final boolean isLoaded;
protected final boolean isGeneratedByPipe;
@@ -114,6 +126,8 @@ public PipeTsFileInsertionEvent(
databaseNameFromDataRegion,
resource,
null,
+ null,
+ true,
true,
isLoaded,
false,
@@ -128,7 +142,8 @@ public PipeTsFileInsertionEvent(
null,
true,
Long.MIN_VALUE,
- Long.MAX_VALUE);
+ Long.MAX_VALUE,
+ false);
}
public PipeTsFileInsertionEvent(
@@ -136,6 +151,8 @@ public PipeTsFileInsertionEvent(
final String databaseNameFromDataRegion,
final TsFileResource resource,
final File tsFile,
+ final File modFile,
+ final boolean shouldRefreshModFileStateFromResource,
final boolean isWithMod,
final boolean isLoaded,
final boolean isGeneratedByHistoricalExtractor,
@@ -151,6 +168,54 @@ public PipeTsFileInsertionEvent(
final boolean skipIfNoPrivileges,
final long startTime,
final long endTime) {
+ this(
+ isTableModelEvent,
+ databaseNameFromDataRegion,
+ resource,
+ tsFile,
+ modFile,
+ shouldRefreshModFileStateFromResource,
+ isWithMod,
+ isLoaded,
+ isGeneratedByHistoricalExtractor,
+ tableNames,
+ pipeName,
+ creationTime,
+ pipeTaskMeta,
+ treePattern,
+ tablePattern,
+ userId,
+ userName,
+ cliHostname,
+ skipIfNoPrivileges,
+ startTime,
+ endTime,
+ false);
+ }
+
+ private PipeTsFileInsertionEvent(
+ final Boolean isTableModelEvent,
+ final String databaseNameFromDataRegion,
+ final TsFileResource resource,
+ final File tsFile,
+ final File modFile,
+ final boolean shouldRefreshModFileStateFromResource,
+ final boolean isWithMod,
+ final boolean isLoaded,
+ final boolean isGeneratedByHistoricalExtractor,
+ final Set tableNames,
+ final String pipeName,
+ final long creationTime,
+ final PipeTaskMeta pipeTaskMeta,
+ final TreePattern treePattern,
+ final TablePattern tablePattern,
+ final String userId,
+ final String userName,
+ final String cliHostname,
+ final boolean skipIfNoPrivileges,
+ final long startTime,
+ final long endTime,
+ final boolean shouldDelayModSnapshotUntilReplicateIndex) {
super(
pipeName,
creationTime,
@@ -174,8 +239,20 @@ public PipeTsFileInsertionEvent(
// hard-link it to each pipe dir
this.tsFile = Objects.isNull(tsFile) ? resource.getTsFile() : tsFile;
- this.isWithMod = isWithMod && resource.anyModFileExists();
- this.modFile = this.isWithMod ? resource.getExclusiveModFile().getFile() : null;
+ this.shouldTransferModFile = isWithMod;
+ this.shouldDelayModSnapshotUntilReplicateIndex = shouldDelayModSnapshotUntilReplicateIndex;
+ this.shouldRefreshModFileStateFromResource =
+ this.shouldTransferModFile && shouldRefreshModFileStateFromResource;
+ this.modFile = modFile;
+ this.isModPinned =
+ this.shouldTransferModFile
+ && !this.shouldRefreshModFileStateFromResource
+ && Objects.nonNull(this.modFile);
+ if (this.shouldRefreshModFileStateFromResource) {
+ refreshModFileState();
+ } else {
+ this.isWithMod = this.shouldTransferModFile && Objects.nonNull(this.modFile);
+ }
// TODO: process the shared mod file
this.sharedModFile =
resource.getSharedModFile() != null ? resource.getSharedModFile().getFile() : null;
@@ -276,6 +353,7 @@ public File getTsFile() {
}
public File getModFile() {
+ refreshModFileState();
return modFile;
}
@@ -284,13 +362,21 @@ public File getSharedModFile() {
}
public boolean isWithMod() {
+ refreshModFileState();
return isWithMod;
}
- // If the previous "isWithMod" is false, the modFile has been set to "null", then the isWithMod
- // can't be set to true
- public void disableMod4NonTransferPipes(final boolean isWithMod) {
- this.isWithMod = isWithMod && this.isWithMod;
+ public void disableMod4NonTransferPipes(final boolean shouldTransferModFile) {
+ this.shouldTransferModFile = shouldTransferModFile && this.shouldTransferModFile;
+ refreshModFileState();
+ }
+
+ public void enableDelayModSnapshotUntilReplicateIndex() {
+ shouldDelayModSnapshotUntilReplicateIndex = true;
+ }
+
+ public boolean isModPinned() {
+ return isModPinned;
}
public boolean isLoaded() {
@@ -319,17 +405,33 @@ public long getExtractTime() {
/////////////////////////// EnrichedEvent ///////////////////////////
+ @Override
+ public void setReplicateIndexForIoTV2(final long replicateIndexForIoTV2) {
+ super.setReplicateIndexForIoTV2(replicateIndexForIoTV2);
+
+ if (!shouldDelayModSnapshotUntilReplicateIndex
+ || !shouldTransferModFile
+ || Objects.isNull(resource)
+ || Objects.isNull(pipeName)
+ || snapshotUpperBoundForDelayedMod != NO_COMMIT_ID) {
+ return;
+ }
+
+ snapshotUpperBoundForDelayedMod =
+ PipeTsFileDeletionBarrier.getInstance()
+ .beginSnapshot(Integer.parseInt(resource.getDataRegionId()), resource.getTsFilePath());
+ }
+
@Override
public boolean internallyIncreaseResourceReferenceCount(final String holderMessage) {
extractTime = System.nanoTime();
try {
- tsFile = PipeDataNodeResourceManager.tsfile().increaseFileReference(tsFile, true, pipeName);
- if (isWithMod) {
- modFile =
- PipeDataNodeResourceManager.tsfile().increaseFileReference(modFile, false, pipeName);
- }
+ pinSnapshotForFirstReference();
return true;
} catch (final Exception e) {
+ if (e instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
LOGGER.warn(
String.format(
"Increase reference count for TsFile %s or modFile %s error. Holder Message: %s",
@@ -348,7 +450,7 @@ public boolean internallyIncreaseResourceReferenceCount(final String holderMessa
public boolean internallyDecreaseResourceReferenceCount(final String holderMessage) {
try {
PipeDataNodeResourceManager.tsfile().decreaseFileReference(tsFile, pipeName);
- if (isWithMod) {
+ if (isModPinned && Objects.nonNull(modFile)) {
PipeDataNodeResourceManager.tsfile().decreaseFileReference(modFile, pipeName);
}
close();
@@ -423,7 +525,9 @@ public PipeTsFileInsertionEvent shallowCopySelfAndBindPipeTaskMetaForProgressRep
getSourceDatabaseNameFromDataRegion(),
resource,
tsFile,
- isWithMod,
+ modFile,
+ shouldRefreshModFileStateFromResource,
+ shouldTransferModFile,
isLoaded,
isGeneratedByHistoricalExtractor,
tableNames,
@@ -437,7 +541,8 @@ public PipeTsFileInsertionEvent shallowCopySelfAndBindPipeTaskMetaForProgressRep
cliHostname,
skipIfNoPrivileges,
startTime,
- endTime);
+ endTime,
+ shouldDelayModSnapshotUntilReplicateIndex);
}
@Override
@@ -754,6 +859,7 @@ public boolean isGeneratedByHistoricalExtractor() {
private TsFileInsertionEventParser initEventParser() {
try {
+ refreshModFileState();
eventParser.compareAndSet(
null,
new TsFileInsertionEventParserProvider(
@@ -848,23 +954,220 @@ public void trackResource() {
@Override
public PipeEventResource eventResourceBuilder() {
+ refreshModFileState();
return new PipeTsFileInsertionEventResource(
this.isReleased,
this.referenceCount,
this.pipeName,
this.tsFile,
- this.isWithMod,
+ this.isModPinned,
this.modFile,
- this.sharedModFile,
this.eventParser);
}
- private static class PipeTsFileInsertionEventResource extends PipeEventResource {
+ private void refreshModFileState() {
+ if (!shouldTransferModFile) {
+ isWithMod = false;
+ if (!isModPinned) {
+ modFile = null;
+ }
+ return;
+ }
+
+ if (isModPinned || !shouldRefreshModFileStateFromResource) {
+ isWithMod = Objects.nonNull(modFile);
+ return;
+ }
+
+ if (Objects.isNull(resource)) {
+ isWithMod = Objects.nonNull(modFile);
+ return;
+ }
+
+ isWithMod = resource.anyModFileExists();
+ modFile = isWithMod ? resource.getExclusiveModFile().getFile() : null;
+ }
+
+ private void pinSnapshotForFirstReference() throws Exception {
+ final SnapshotPinResult snapshotPinResult =
+ Objects.nonNull(resource)
+ ? pinResourceBackedSnapshotForFirstReference()
+ : pinStandaloneSnapshotForFirstReference();
+
+ tsFile = snapshotPinResult.tsFile;
+ modFile = snapshotPinResult.modFile;
+ isWithMod = snapshotPinResult.isWithMod;
+ isModPinned = snapshotPinResult.isModPinned;
+ shouldRefreshModFileStateFromResource = snapshotPinResult.shouldRefreshModFileStateFromResource;
+ }
+
+ private SnapshotPinResult pinResourceBackedSnapshotForFirstReference() throws Exception {
+ final File pinnedTsFile = pinTsFileUnderResourceReadLock();
+ final boolean shouldFinishSnapshotBarrier = shouldDelayModSnapshotForPipeCopy();
+ boolean modSnapshotAttempted = false;
+ try {
+ if (shouldOnlyPinTsFileOnFirstReference()) {
+ refreshModFileState();
+ return new SnapshotPinResult(
+ pinnedTsFile, modFile, isWithMod, false, shouldRefreshModFileStateFromResource);
+ }
+ if (shouldFinishSnapshotBarrier) {
+ awaitDelayedDeletionMaterializationBeforeModSnapshot();
+ }
+
+ modSnapshotAttempted = true;
+ return pinModSnapshotUnderResourceReadLock(pinnedTsFile);
+ } catch (final Exception e) {
+ if (!modSnapshotAttempted) {
+ releasePinnedSnapshotQuietly(pinnedTsFile, null);
+ }
+ throw e;
+ } finally {
+ if (shouldFinishSnapshotBarrier) {
+ finishDelayedSnapshotBarrier();
+ }
+ }
+ }
+
+ private SnapshotPinResult pinStandaloneSnapshotForFirstReference() throws IOException {
+ final File pinnedTsFile =
+ PipeDataNodeResourceManager.tsfile().increaseFileReference(tsFile, true, pipeName);
+
+ if (Objects.isNull(modFile)) {
+ return new SnapshotPinResult(
+ pinnedTsFile, null, false, false, shouldRefreshModFileStateFromResource);
+ }
+
+ File pinnedModFile = null;
+ try {
+ pinnedModFile =
+ PipeDataNodeResourceManager.tsfile().increaseFileReference(modFile, false, pipeName);
+ return new SnapshotPinResult(pinnedTsFile, pinnedModFile, true, true, false);
+ } catch (final Exception e) {
+ releasePinnedSnapshotQuietly(pinnedTsFile, pinnedModFile);
+ throw e;
+ }
+ }
+
+ private File pinTsFileUnderResourceReadLock() throws IOException {
+ resource.readLock();
+ try {
+ return PipeDataNodeResourceManager.tsfile().increaseFileReference(tsFile, true, pipeName);
+ } finally {
+ resource.readUnlock();
+ }
+ }
+
+ private void awaitDelayedDeletionMaterializationBeforeModSnapshot() throws InterruptedException {
+ final int regionId = Integer.parseInt(resource.getDataRegionId());
+ final PipeTsFileDeletionBarrier barrier = PipeTsFileDeletionBarrier.getInstance();
+ barrier.awaitDeletionResolutionUpTo(regionId, snapshotUpperBoundForDelayedMod);
+ barrier.awaitPendingDeletionsUpTo(resource.getTsFilePath(), snapshotUpperBoundForDelayedMod);
+ }
+
+ private SnapshotPinResult pinModSnapshotUnderResourceReadLock(final File pinnedTsFile)
+ throws IOException {
+ File pinnedModFile = null;
+ boolean withMod = false;
+
+ resource.readLock();
+ try {
+ final ModificationFile resourceExclusiveModFile = resource.getExclusiveModFile();
+ resourceExclusiveModFile.writeLock();
+ try {
+ if (!shouldTransferModFile) {
+ return new SnapshotPinResult(pinnedTsFile, null, false, false, false);
+ }
+
+ if (!shouldRefreshModFileStateFromResource && Objects.nonNull(modFile)) {
+ pinnedModFile =
+ PipeDataNodeResourceManager.tsfile().increaseFileReference(modFile, false, pipeName);
+ withMod = true;
+ } else if (resourceExclusiveModFile.exists()) {
+ pinnedModFile =
+ PipeDataNodeResourceManager.tsfile()
+ .increaseFileReference(resourceExclusiveModFile.getFile(), false, pipeName);
+ withMod = true;
+ }
+ } finally {
+ resourceExclusiveModFile.writeUnlock();
+ }
+ } catch (final Exception e) {
+ releasePinnedSnapshotQuietly(pinnedTsFile, pinnedModFile);
+ throw e;
+ } finally {
+ resource.readUnlock();
+ }
+
+ return new SnapshotPinResult(
+ pinnedTsFile, pinnedModFile, withMod, Objects.nonNull(pinnedModFile), false);
+ }
+
+ private boolean shouldOnlyPinTsFileOnFirstReference() {
+ return shouldDelayModSnapshotUntilReplicateIndex && Objects.isNull(pipeName);
+ }
+
+ private boolean shouldDelayModSnapshotForPipeCopy() {
+ return shouldDelayModSnapshotUntilReplicateIndex
+ && Objects.nonNull(pipeName)
+ && snapshotUpperBoundForDelayedMod != NO_COMMIT_ID;
+ }
+
+ private void finishDelayedSnapshotBarrier() {
+ if (!shouldDelayModSnapshotForPipeCopy()) {
+ return;
+ }
+
+ PipeTsFileDeletionBarrier.getInstance()
+ .finishSnapshot(resource.getTsFilePath(), snapshotUpperBoundForDelayedMod);
+ snapshotUpperBoundForDelayedMod = NO_COMMIT_ID;
+ }
+
+ private void releasePinnedSnapshotQuietly(final File pinnedTsFile, final File pinnedModFile) {
+ try {
+ if (Objects.nonNull(pinnedModFile)) {
+ PipeDataNodeResourceManager.tsfile().decreaseFileReference(pinnedModFile, pipeName);
+ }
+ } catch (final Exception e) {
+ LOGGER.warn("Decrease reference count for modFile {} error.", pinnedModFile, e);
+ }
+
+ try {
+ if (Objects.nonNull(pinnedTsFile)) {
+ PipeDataNodeResourceManager.tsfile().decreaseFileReference(pinnedTsFile, pipeName);
+ }
+ } catch (final Exception e) {
+ LOGGER.warn("Decrease reference count for TsFile {} error.", pinnedTsFile, e);
+ }
+ }
+
+ private static class SnapshotPinResult {
private final File tsFile;
+ private final File modFile;
private final boolean isWithMod;
+ private final boolean isModPinned;
+ private final boolean shouldRefreshModFileStateFromResource;
+
+ private SnapshotPinResult(
+ final File tsFile,
+ final File modFile,
+ final boolean isWithMod,
+ final boolean isModPinned,
+ final boolean shouldRefreshModFileStateFromResource) {
+ this.tsFile = tsFile;
+ this.modFile = modFile;
+ this.isWithMod = isWithMod;
+ this.isModPinned = isModPinned;
+ this.shouldRefreshModFileStateFromResource = shouldRefreshModFileStateFromResource;
+ }
+ }
+
+ private static class PipeTsFileInsertionEventResource extends PipeEventResource {
+
+ private final File tsFile;
+ private final boolean isModPinned;
private final File modFile;
- private final File sharedModFile; // unused now
private final AtomicReference eventParser;
private final String pipeName;
@@ -873,16 +1176,14 @@ private PipeTsFileInsertionEventResource(
final AtomicInteger referenceCount,
final String pipeName,
final File tsFile,
- final boolean isWithMod,
+ final boolean isModPinned,
final File modFile,
- final File sharedModFile,
final AtomicReference eventParser) {
super(isReleased, referenceCount);
this.pipeName = pipeName;
this.tsFile = tsFile;
- this.isWithMod = isWithMod;
+ this.isModPinned = isModPinned;
this.modFile = modFile;
- this.sharedModFile = sharedModFile;
this.eventParser = eventParser;
}
@@ -891,7 +1192,7 @@ protected void finalizeResource() {
try {
// decrease reference count
PipeDataNodeResourceManager.tsfile().decreaseFileReference(tsFile, pipeName);
- if (isWithMod) {
+ if (isModPinned && Objects.nonNull(modFile)) {
PipeDataNodeResourceManager.tsfile().decreaseFileReference(modFile, pipeName);
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/IoTConsensusV2SyncSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/IoTConsensusV2SyncSink.java
index 5866a5ceeaa6c..76bebb976bbcd 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/IoTConsensusV2SyncSink.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/IoTConsensusV2SyncSink.java
@@ -382,7 +382,7 @@ private void doTransfer(final PipeTsFileInsertionEvent pipeTsFileInsertionEvent)
new TConsensusGroupId(TConsensusGroupType.DataRegion, consensusGroupId);
// 1. Transfer tsFile, and mod file if exists
- if (pipeTsFileInsertionEvent.isWithMod()) {
+ if (modFile != null) {
transferFilePieces(
modFile, syncIoTConsensusV2ServiceClient, true, tCommitId, tConsensusGroupId);
transferFilePieces(
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/handler/IoTConsensusV2TsFileInsertionEventHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/handler/IoTConsensusV2TsFileInsertionEventHandler.java
index 28850bc543dd9..90cdac84b896f 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/handler/IoTConsensusV2TsFileInsertionEventHandler.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/iotconsensusv2/handler/IoTConsensusV2TsFileInsertionEventHandler.java
@@ -46,7 +46,6 @@
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.Arrays;
-import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;
public class IoTConsensusV2TsFileInsertionEventHandler
@@ -101,17 +100,14 @@ public IoTConsensusV2TsFileInsertionEventHandler(
tsFile = event.getTsFile();
modFile = event.getModFile();
- transferMod = event.isWithMod();
+ transferMod = modFile != null;
currentFile = transferMod ? modFile : tsFile;
readFileBufferSize = PipeConfig.getInstance().getPipeSinkReadFileBufferSize();
readBuffer = new byte[readFileBufferSize];
position = 0;
- reader =
- Objects.nonNull(modFile)
- ? new RandomAccessFile(modFile, "r")
- : new RandomAccessFile(tsFile, "r");
+ reader = transferMod ? new RandomAccessFile(modFile, "r") : new RandomAccessFile(tsFile, "r");
isSealSignalSent = new AtomicBoolean(false);
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
index f8d0b104096f1..a10abf8272713 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
@@ -404,6 +404,9 @@ private boolean transferWithoutCheck(final TsFileInsertionEvent tsFileInsertionE
throw new FileNotFoundException(pipeTsFileInsertionEvent.getTsFile().getAbsolutePath());
}
+ final boolean supportMod = clientManager.supportModsIfIsDataNodeReceiver();
+ final File modFile = supportMod ? pipeTsFileInsertionEvent.getModFile() : null;
+
final PipeTransferTsFileHandler pipeTransferTsFileHandler =
new PipeTransferTsFileHandler(
this,
@@ -416,9 +419,8 @@ private boolean transferWithoutCheck(final TsFileInsertionEvent tsFileInsertionE
new AtomicInteger(1),
new AtomicBoolean(false),
pipeTsFileInsertionEvent.getTsFile(),
- pipeTsFileInsertionEvent.getModFile(),
- pipeTsFileInsertionEvent.isWithMod()
- && clientManager.supportModsIfIsDataNodeReceiver(),
+ modFile,
+ modFile != null,
pipeTsFileInsertionEvent.isTableModelEvent()
? pipeTsFileInsertionEvent.getTableModelDatabaseName()
: null);
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
index 809b40f6eba02..76b4dd1f30b30 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/historical/PipeHistoricalDataRegionTsFileAndDeletionSource.java
@@ -856,6 +856,8 @@ private Event supplyTsFileEvent(final TsFileResource resource) {
resource.getDatabaseName(),
resource,
null,
+ null,
+ true,
shouldTransferModFile,
false,
true,
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
index 9c7182f051c74..7041d723bd4d4 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/assigner/PipeDataRegionAssigner.java
@@ -93,13 +93,19 @@ public PipeDataRegionAssigner(final int dataRegionId) {
}
public void publishToAssign(final PipeRealtimeEvent event) {
+ final EnrichedEvent innerEvent = event.getEvent();
+ if (innerEvent instanceof PipeTsFileInsertionEvent
+ && DataRegionConsensusImpl.getInstance() instanceof IoTConsensusV2
+ && IoTConsensusV2Processor.isShouldReplicate(innerEvent)) {
+ ((PipeTsFileInsertionEvent) innerEvent).enableDelayModSnapshotUntilReplicateIndex();
+ }
+
if (!event.increaseReferenceCount(PipeDataRegionAssigner.class.getName())) {
LOGGER.warn(
"The reference count of the realtime event {} cannot be increased, skipping it.", event);
return;
}
- final EnrichedEvent innerEvent = event.getEvent();
eventCounter.increaseEventCount(innerEvent);
if (innerEvent instanceof PipeHeartbeatEvent) {
((PipeHeartbeatEvent) innerEvent).onPublished();
@@ -174,6 +180,12 @@ private void assignToSource(
source.getRealtimeDataExtractionStartTime(),
source.getRealtimeDataExtractionEndTime());
final EnrichedEvent innerEvent = copiedEvent.getEvent();
+ if (innerEvent instanceof PipeTsFileInsertionEvent) {
+ final PipeTsFileInsertionEvent tsFileInsertionEvent =
+ (PipeTsFileInsertionEvent) innerEvent;
+ tsFileInsertionEvent.disableMod4NonTransferPipes(source.isShouldTransferModFile());
+ }
+
// if using IoTV2, assign a replicateIndex for this realtime event
if (DataRegionConsensusImpl.getInstance() instanceof IoTConsensusV2
&& IoTConsensusV2Processor.isShouldReplicate(innerEvent)) {
@@ -187,12 +199,6 @@ private void assignToSource(
innerEvent);
}
- if (innerEvent instanceof PipeTsFileInsertionEvent) {
- final PipeTsFileInsertionEvent tsFileInsertionEvent =
- (PipeTsFileInsertionEvent) innerEvent;
- tsFileInsertionEvent.disableMod4NonTransferPipes(source.isShouldTransferModFile());
- }
-
if (innerEvent instanceof PipeDeleteDataNodeEvent) {
final PipeDeleteDataNodeEvent deleteDataNodeEvent =
(PipeDeleteDataNodeEvent) innerEvent;
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
index 3cce521a51e0b..541b3432272f6 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/source/dataregion/realtime/listener/PipeInsertionDataNodeListener.java
@@ -164,7 +164,6 @@ public DeletionResource listenToDeleteData(
}
assigner.publishToAssign(PipeRealtimeEventFactory.createRealtimeEvent(node));
-
return deletionResource;
}
diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
index eb37cb1d5d21b..3c09dd37d17f9 100644
--- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
+++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/storageengine/dataregion/DataRegion.java
@@ -67,6 +67,7 @@
import org.apache.iotdb.db.pipe.consensus.deletion.DeletionResource;
import org.apache.iotdb.db.pipe.consensus.deletion.DeletionResource.Status;
import org.apache.iotdb.db.pipe.consensus.deletion.DeletionResourceManager;
+import org.apache.iotdb.db.pipe.consensus.deletion.PipeTsFileDeletionBarrier;
import org.apache.iotdb.db.pipe.consensus.deletion.persist.PageCacheDeletionBuffer;
import org.apache.iotdb.db.pipe.resource.memory.PipeMemoryWeightUtil;
import org.apache.iotdb.db.pipe.source.dataregion.realtime.listener.PipeInsertionDataNodeListener;
@@ -2830,6 +2831,88 @@ private void getTwoKindsOfTsFiles(
.forEach(unsealedResource::add);
}
+ private DeleteMaterializationBarrierContext beginDeleteMaterializationBarrierIfNecessary(
+ final boolean shouldUseBarrier) {
+ if (!shouldUseBarrier) {
+ return null;
+ }
+
+ return new DeleteMaterializationBarrierContext(
+ dataRegionId.getId(),
+ PipeTsFileDeletionBarrier.getInstance().beginDeletion(dataRegionId.getId()));
+ }
+
+ private Set collectImpactedTsFilePaths(
+ final Collection sealedTsFiles, final ModEntry deletion) {
+ return sealedTsFiles.stream()
+ .filter(resource -> !canSkipDelete(resource, deletion))
+ .map(TsFileResource::getTsFilePath)
+ .collect(Collectors.toSet());
+ }
+
+ private void resolveDeleteMaterializationTargets(
+ final DeleteMaterializationBarrierContext barrierContext,
+ final Collection tsFilePaths) {
+ if (Objects.isNull(barrierContext) || barrierContext.resolved) {
+ return;
+ }
+
+ barrierContext.tsFilePaths.addAll(tsFilePaths);
+ PipeTsFileDeletionBarrier.getInstance()
+ .resolveDeletionTargets(
+ barrierContext.regionId, barrierContext.deleteSeq, barrierContext.tsFilePaths);
+ barrierContext.resolved = true;
+ }
+
+ private void awaitSnapshotsBeforeMaterializing(
+ final DeleteMaterializationBarrierContext barrierContext,
+ final Collection sealedTsFiles,
+ final ModEntry deletion)
+ throws InterruptedException {
+ if (Objects.isNull(barrierContext)) {
+ return;
+ }
+
+ final PipeTsFileDeletionBarrier barrier = PipeTsFileDeletionBarrier.getInstance();
+ for (final TsFileResource sealedTsFile : sealedTsFiles) {
+ if (!canSkipDelete(sealedTsFile, deletion)) {
+ barrier.awaitSnapshotsBeforeMaterialization(
+ sealedTsFile.getTsFilePath(), barrierContext.deleteSeq);
+ }
+ }
+ }
+
+ private void finishDeleteMaterializationBarrier(
+ final DeleteMaterializationBarrierContext barrierContext) {
+ if (Objects.isNull(barrierContext) || barrierContext.finished) {
+ return;
+ }
+
+ if (!barrierContext.resolved) {
+ PipeTsFileDeletionBarrier.getInstance()
+ .resolveDeletionTargets(
+ barrierContext.regionId, barrierContext.deleteSeq, Collections.emptySet());
+ barrierContext.resolved = true;
+ }
+
+ PipeTsFileDeletionBarrier.getInstance()
+ .finishDeletion(barrierContext.deleteSeq, barrierContext.tsFilePaths);
+ barrierContext.finished = true;
+ }
+
+ private static class DeleteMaterializationBarrierContext {
+ private final int regionId;
+ private final long deleteSeq;
+ private final Set tsFilePaths = new HashSet<>();
+ private boolean resolved = false;
+ private boolean finished = false;
+
+ private DeleteMaterializationBarrierContext(final int regionId, final long deleteSeq) {
+ this.regionId = regionId;
+ this.deleteSeq = deleteSeq;
+ }
+ }
+
public void deleteByDevice(final MeasurementPath pattern, final DeleteDataNode node)
throws IOException {
if (SettleService.getINSTANCE().getFilesToBeSettledCount().get() != 0) {
@@ -2863,6 +2946,9 @@ public void deleteByDevice(final MeasurementPath pattern, final DeleteDataNode n
}
ModEntry deletion = new TreeDeletionEntry(pattern, startTime, endTime);
+ final DeleteMaterializationBarrierContext barrierContext =
+ beginDeleteMaterializationBarrierIfNecessary(
+ DeletionResource.isDeleteNodeGeneratedInLocalByIoTV2(node));
List sealedTsFileResource = new ArrayList<>();
List unsealedTsFileResource = new ArrayList<>();
@@ -2870,19 +2956,29 @@ public void deleteByDevice(final MeasurementPath pattern, final DeleteDataNode n
// deviceMatchInfo is used for filter the matched deviceId in TsFileResource
// deviceMatchInfo contains the DeviceId means this device matched the pattern
deleteDataInUnsealedFiles(unsealedTsFileResource, deletion, sealedTsFileResource);
- // capture deleteDataNode and wait it to be persisted to DAL.
- DeletionResource deletionResource =
- PipeInsertionDataNodeListener.getInstance()
- .listenToDeleteData(dataRegionId.getId(), node);
- // just get result. We have already waited for result in `listenToDeleteData`
- if (deletionResource != null && deletionResource.waitForResult() == Status.FAILURE) {
- throw deletionResource.getCause();
- }
- writeUnlock();
- hasReleasedLock = true;
+ resolveDeleteMaterializationTargets(
+ barrierContext, collectImpactedTsFilePaths(sealedTsFileResource, deletion));
+ try {
+ // capture deleteDataNode and wait it to be persisted to DAL.
+ final DeletionResource deletionResource =
+ PipeInsertionDataNodeListener.getInstance()
+ .listenToDeleteData(dataRegionId.getId(), node);
+ // just get result. We have already waited for result in `listenToDeleteData`
+ if (deletionResource != null && deletionResource.waitForResult() == Status.FAILURE) {
+ throw deletionResource.getCause();
+ }
+ writeUnlock();
+ hasReleasedLock = true;
- deleteDataInSealedFiles(sealedTsFileResource, deletion);
+ awaitSnapshotsBeforeMaterializing(barrierContext, sealedTsFileResource, deletion);
+ deleteDataInSealedFiles(sealedTsFileResource, deletion);
+ } finally {
+ finishDeleteMaterializationBarrier(barrierContext);
+ }
} catch (Exception e) {
+ if (e instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
throw new IOException(e);
} finally {
if (!hasReleasedLock) {
@@ -2959,6 +3055,9 @@ public void deleteByTable(RelationalDeleteDataNode node) throws IOException {
}
}
+ final DeleteMaterializationBarrierContext barrierContext =
+ beginDeleteMaterializationBarrierIfNecessary(
+ DeletionResource.isDeleteNodeGeneratedInLocalByIoTV2(node));
List> sealedTsFileResourceLists = new ArrayList<>(modEntries.size());
for (TableDeletionEntry modEntry : modEntries) {
List sealedTsFileResource = new ArrayList<>();
@@ -2974,22 +3073,37 @@ public void deleteByTable(RelationalDeleteDataNode node) throws IOException {
sealedTsFileResourceLists.add(sealedTsFileResource);
}
- // capture deleteDataNode and wait it to be persisted to DAL.
- DeletionResource deletionResource =
- PipeInsertionDataNodeListener.getInstance()
- .listenToDeleteData(dataRegionId.getId(), node);
- // just get result. We have already waited for result in `listenToDeleteData`
- if (deletionResource != null && deletionResource.waitForResult() == Status.FAILURE) {
- throw deletionResource.getCause();
+ final Set impactedTsFilePaths = new HashSet<>();
+ for (int i = 0; i < modEntries.size(); i++) {
+ impactedTsFilePaths.addAll(
+ collectImpactedTsFilePaths(sealedTsFileResourceLists.get(i), modEntries.get(i)));
}
+ resolveDeleteMaterializationTargets(barrierContext, impactedTsFilePaths);
+ try {
+ // capture deleteDataNode and wait it to be persisted to DAL.
+ final DeletionResource deletionResource =
+ PipeInsertionDataNodeListener.getInstance()
+ .listenToDeleteData(dataRegionId.getId(), node);
+ // just get result. We have already waited for result in `listenToDeleteData`
+ if (deletionResource != null && deletionResource.waitForResult() == Status.FAILURE) {
+ throw deletionResource.getCause();
+ }
- writeUnlock();
- hasReleasedLock = true;
+ writeUnlock();
+ hasReleasedLock = true;
- for (int i = 0; i < modEntries.size(); i++) {
- deleteDataInSealedFiles(sealedTsFileResourceLists.get(i), modEntries.get(i));
+ for (int i = 0; i < modEntries.size(); i++) {
+ awaitSnapshotsBeforeMaterializing(
+ barrierContext, sealedTsFileResourceLists.get(i), modEntries.get(i));
+ deleteDataInSealedFiles(sealedTsFileResourceLists.get(i), modEntries.get(i));
+ }
+ } finally {
+ finishDeleteMaterializationBarrier(barrierContext);
}
} catch (Exception e) {
+ if (e instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
throw new IOException(e);
} finally {
if (!hasReleasedLock) {
@@ -3028,22 +3142,35 @@ public void deleteDataDirectly(MeasurementPath pathToDelete, DeleteDataNode node
}
}
TreeDeletionEntry deletion = new TreeDeletionEntry(pathToDelete, startTime, endTime);
+ final DeleteMaterializationBarrierContext barrierContext =
+ beginDeleteMaterializationBarrierIfNecessary(
+ DeletionResource.isDeleteNodeGeneratedInLocalByIoTV2(node));
List sealedTsFileResource = new ArrayList<>();
List unsealedTsFileResource = new ArrayList<>();
getTwoKindsOfTsFiles(sealedTsFileResource, unsealedTsFileResource, startTime, endTime);
deleteDataDirectlyInFile(unsealedTsFileResource, deletion);
- // capture deleteDataNode and wait it to be persisted to DAL.
- DeletionResource deletionResource =
- PipeInsertionDataNodeListener.getInstance()
- .listenToDeleteData(dataRegionId.getId(), node);
- // just get result. We have already waited for result in `listenToDeleteData`
- if (deletionResource != null && deletionResource.waitForResult() == Status.FAILURE) {
- throw deletionResource.getCause();
+ resolveDeleteMaterializationTargets(
+ barrierContext, collectImpactedTsFilePaths(sealedTsFileResource, deletion));
+ try {
+ // capture deleteDataNode and wait it to be persisted to DAL.
+ final DeletionResource deletionResource =
+ PipeInsertionDataNodeListener.getInstance()
+ .listenToDeleteData(dataRegionId.getId(), node);
+ // just get result. We have already waited for result in `listenToDeleteData`
+ if (deletionResource != null && deletionResource.waitForResult() == Status.FAILURE) {
+ throw deletionResource.getCause();
+ }
+ writeUnlock();
+ releasedLock = true;
+ awaitSnapshotsBeforeMaterializing(barrierContext, sealedTsFileResource, deletion);
+ deleteDataDirectlyInFile(sealedTsFileResource, deletion);
+ } finally {
+ finishDeleteMaterializationBarrier(barrierContext);
}
- writeUnlock();
- releasedLock = true;
- deleteDataDirectlyInFile(sealedTsFileResource, deletion);
} catch (Exception e) {
+ if (e instanceof InterruptedException) {
+ Thread.currentThread().interrupt();
+ }
throw new IOException(e);
} finally {
if (!releasedLock) {
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/deletion/PipeTsFileDeletionBarrierTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/deletion/PipeTsFileDeletionBarrierTest.java
new file mode 100644
index 0000000000000..5a7a4e8a0a115
--- /dev/null
+++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/consensus/deletion/PipeTsFileDeletionBarrierTest.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.pipe.consensus.deletion;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+
+public class PipeTsFileDeletionBarrierTest {
+
+ private static final int TEST_REGION_ID = 1;
+ private static final String TEST_TSFILE_PATH = "/tmp/test.tsfile";
+
+ @After
+ public void tearDown() {
+ PipeTsFileDeletionBarrier.getInstance().clear();
+ }
+
+ @Test
+ public void testSnapshotWaitsForEarlierDeletionResolution() throws Exception {
+ final PipeTsFileDeletionBarrier barrier = PipeTsFileDeletionBarrier.getInstance();
+ final long deleteSeq = barrier.beginDeletion(TEST_REGION_ID);
+
+ final CountDownLatch started = new CountDownLatch(1);
+ final CountDownLatch finished = new CountDownLatch(1);
+ final AtomicReference failure = new AtomicReference<>(null);
+
+ final Thread thread =
+ new Thread(
+ () -> {
+ try {
+ started.countDown();
+ barrier.awaitDeletionResolutionUpTo(TEST_REGION_ID, deleteSeq);
+ } catch (final Throwable throwable) {
+ failure.set(throwable);
+ } finally {
+ finished.countDown();
+ }
+ },
+ "test-wait-delete-resolution");
+ thread.start();
+
+ Assert.assertTrue(started.await(5, TimeUnit.SECONDS));
+ Assert.assertFalse(finished.await(200, TimeUnit.MILLISECONDS));
+
+ barrier.resolveDeletionTargets(TEST_REGION_ID, deleteSeq, Collections.emptySet());
+
+ Assert.assertTrue(finished.await(5, TimeUnit.SECONDS));
+ if (failure.get() != null) {
+ throw new AssertionError("Snapshot wait should finish successfully", failure.get());
+ }
+ thread.join(TimeUnit.SECONDS.toMillis(5));
+ }
+
+ @Test
+ public void testSnapshotWaitsForPendingDeletionUpToUpperBound() throws Exception {
+ final PipeTsFileDeletionBarrier barrier = PipeTsFileDeletionBarrier.getInstance();
+ final long deleteSeq = barrier.beginDeletion(TEST_REGION_ID);
+ barrier.resolveDeletionTargets(
+ TEST_REGION_ID, deleteSeq, Collections.singleton(TEST_TSFILE_PATH));
+
+ final long snapshotUpperBound = barrier.beginSnapshot(TEST_REGION_ID, TEST_TSFILE_PATH);
+ Assert.assertEquals(deleteSeq, snapshotUpperBound);
+
+ final CountDownLatch started = new CountDownLatch(1);
+ final CountDownLatch finished = new CountDownLatch(1);
+ final AtomicReference failure = new AtomicReference<>(null);
+
+ final Thread thread =
+ new Thread(
+ () -> {
+ try {
+ started.countDown();
+ barrier.awaitPendingDeletionsUpTo(TEST_TSFILE_PATH, snapshotUpperBound);
+ } catch (final Throwable throwable) {
+ failure.set(throwable);
+ } finally {
+ finished.countDown();
+ }
+ },
+ "test-wait-pending-delete");
+ thread.start();
+
+ Assert.assertTrue(started.await(5, TimeUnit.SECONDS));
+ Assert.assertFalse(finished.await(200, TimeUnit.MILLISECONDS));
+
+ barrier.finishDeletion(deleteSeq, Collections.singleton(TEST_TSFILE_PATH));
+
+ Assert.assertTrue(finished.await(5, TimeUnit.SECONDS));
+ if (failure.get() != null) {
+ throw new AssertionError("Pending delete wait should finish successfully", failure.get());
+ }
+
+ barrier.finishSnapshot(TEST_TSFILE_PATH, snapshotUpperBound);
+ thread.join(TimeUnit.SECONDS.toMillis(5));
+ }
+
+ @Test
+ public void testLaterDeletionWaitsForEarlierSnapshot() throws Exception {
+ final PipeTsFileDeletionBarrier barrier = PipeTsFileDeletionBarrier.getInstance();
+ final long snapshotUpperBound = barrier.beginSnapshot(TEST_REGION_ID, TEST_TSFILE_PATH);
+ Assert.assertEquals(0L, snapshotUpperBound);
+
+ final long deleteSeq = barrier.beginDeletion(TEST_REGION_ID);
+ barrier.resolveDeletionTargets(
+ TEST_REGION_ID, deleteSeq, Collections.singleton(TEST_TSFILE_PATH));
+
+ final CountDownLatch started = new CountDownLatch(1);
+ final CountDownLatch finished = new CountDownLatch(1);
+ final AtomicReference failure = new AtomicReference<>(null);
+
+ final Thread thread =
+ new Thread(
+ () -> {
+ try {
+ started.countDown();
+ barrier.awaitSnapshotsBeforeMaterialization(TEST_TSFILE_PATH, deleteSeq);
+ } catch (final Throwable throwable) {
+ failure.set(throwable);
+ } finally {
+ finished.countDown();
+ }
+ },
+ "test-wait-earlier-snapshot");
+ thread.start();
+
+ Assert.assertTrue(started.await(5, TimeUnit.SECONDS));
+ Assert.assertFalse(finished.await(200, TimeUnit.MILLISECONDS));
+
+ barrier.finishSnapshot(TEST_TSFILE_PATH, snapshotUpperBound);
+
+ Assert.assertTrue(finished.await(5, TimeUnit.SECONDS));
+ if (failure.get() != null) {
+ throw new AssertionError("Later deletion wait should finish successfully", failure.get());
+ }
+
+ barrier.finishDeletion(deleteSeq, Collections.singleton(TEST_TSFILE_PATH));
+ thread.join(TimeUnit.SECONDS.toMillis(5));
+ }
+}
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTsFileInsertionEventTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTsFileInsertionEventTest.java
index 5ba0843bf8003..90f3d3b002f28 100644
--- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTsFileInsertionEventTest.java
+++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/PipeTsFileInsertionEventTest.java
@@ -24,11 +24,13 @@
import org.apache.iotdb.commons.auth.entity.PrivilegeType;
import org.apache.iotdb.commons.conf.IoTDBConstant;
import org.apache.iotdb.commons.exception.auth.AccessDeniedException;
+import org.apache.iotdb.commons.path.MeasurementPath;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.commons.pipe.datastructure.pattern.IoTDBTreePattern;
import org.apache.iotdb.commons.pipe.datastructure.pattern.TablePattern;
import org.apache.iotdb.commons.utils.FileUtils;
import org.apache.iotdb.db.auth.AuthorityChecker;
+import org.apache.iotdb.db.pipe.consensus.deletion.PipeTsFileDeletionBarrier;
import org.apache.iotdb.db.pipe.event.common.tsfile.PipeTsFileInsertionEvent;
import org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName;
import org.apache.iotdb.db.queryengine.plan.relational.security.AccessControl;
@@ -36,6 +38,7 @@
import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.RelationalAuthorStatement;
import org.apache.iotdb.db.queryengine.plan.statement.Statement;
import org.apache.iotdb.db.storageengine.dataregion.compaction.utils.CompactionTestFileWriter;
+import org.apache.iotdb.db.storageengine.dataregion.modification.TreeDeletionEntry;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResource;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.TsFileResourceStatus;
import org.apache.iotdb.db.storageengine.dataregion.tsfile.generator.TsFileNameGenerator;
@@ -48,6 +51,7 @@
import org.apache.tsfile.file.metadata.enums.CompressionType;
import org.apache.tsfile.file.metadata.enums.TSEncoding;
import org.apache.tsfile.read.common.TimeRange;
+import org.junit.After;
import org.junit.Assert;
import org.junit.Test;
@@ -56,6 +60,10 @@
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -64,6 +72,11 @@
import static org.apache.iotdb.db.auth.AuthorityChecker.NO_PERMISSION_PROMOTION;
public class PipeTsFileInsertionEventTest {
+ @After
+ public void tearDown() {
+ PipeTsFileDeletionBarrier.getInstance().clear();
+ }
+
@Test
public void testAuthCheck() throws Exception {
final AccessControl oldControl = AuthorityChecker.getAccessControl();
@@ -101,6 +114,8 @@ public void testAuthCheck() throws Exception {
"db",
resource,
null,
+ null,
+ true,
true,
false,
false,
@@ -126,6 +141,8 @@ public void testAuthCheck() throws Exception {
"root.db",
resource,
null,
+ null,
+ true,
true,
false,
false,
@@ -157,6 +174,575 @@ public void testAuthCheck() throws Exception {
}
}
+ @Test
+ public void testLateCreatedModFileCanStillBeObservedAfterShallowCopy() throws Exception {
+ final File baseDir = new File(TestConstant.BASE_OUTPUT_PATH, "late-mod");
+ final File tsFile = new File(baseDir, "late-mod.tsfile");
+ PipeTsFileInsertionEvent originalEvent = null;
+ PipeTsFileInsertionEvent copiedEvent = null;
+ try {
+ Assert.assertTrue(baseDir.mkdirs() || baseDir.exists());
+ Assert.assertTrue(tsFile.createNewFile() || tsFile.exists());
+
+ final TsFileResource resource = new TsFileResource(tsFile);
+ resource.setStatus(TsFileResourceStatus.NORMAL);
+
+ originalEvent =
+ new PipeTsFileInsertionEvent(
+ false,
+ "root.db",
+ resource,
+ null,
+ null,
+ true,
+ true,
+ false,
+ false,
+ null,
+ "testPipe",
+ 1L,
+ null,
+ buildUnionPattern(true, Collections.singletonList(new IoTDBTreePattern(true, null))),
+ new TablePattern(false, null, null),
+ null,
+ null,
+ null,
+ true,
+ Long.MIN_VALUE,
+ Long.MAX_VALUE);
+ copiedEvent =
+ originalEvent.shallowCopySelfAndBindPipeTaskMetaForProgressReport(
+ "testPipeCopy",
+ 2L,
+ null,
+ buildUnionPattern(true, Collections.singletonList(new IoTDBTreePattern(true, null))),
+ new TablePattern(false, null, null),
+ null,
+ null,
+ null,
+ true,
+ Long.MIN_VALUE,
+ Long.MAX_VALUE);
+
+ Assert.assertFalse(originalEvent.isWithMod());
+ Assert.assertFalse(copiedEvent.isWithMod());
+
+ resource
+ .getExclusiveModFile()
+ .write(new TreeDeletionEntry(new MeasurementPath("root.db.d1.s1"), 0, 1));
+ final File modFile = resource.getExclusiveModFile().getFile();
+ Assert.assertTrue(modFile.exists());
+
+ Assert.assertTrue(originalEvent.isWithMod());
+ Assert.assertEquals(modFile, originalEvent.getModFile());
+ Assert.assertTrue(copiedEvent.isWithMod());
+ Assert.assertEquals(modFile, copiedEvent.getModFile());
+
+ copiedEvent.disableMod4NonTransferPipes(false);
+ Assert.assertFalse(copiedEvent.isWithMod());
+ Assert.assertNull(copiedEvent.getModFile());
+ } finally {
+ if (originalEvent != null) {
+ originalEvent.close();
+ }
+ if (copiedEvent != null) {
+ copiedEvent.close();
+ }
+ FileUtils.deleteFileOrDirectory(baseDir);
+ }
+ }
+
+ @Test
+ public void testShallowCopyKeepsPinnedModSnapshotAfterSourceModDisappears() throws Exception {
+ final File tsFile =
+ new File(
+ TsFileNameGenerator.generateNewTsFilePath(
+ TestConstant.BASE_OUTPUT_PATH + IoTDBConstant.SEQUENCE_FOLDER_NAME, 1, 1, 1, 4));
+ PipeTsFileInsertionEvent originalEvent = null;
+ PipeTsFileInsertionEvent copiedEvent = null;
+ try {
+ Assert.assertTrue(tsFile.getParentFile().mkdirs() || tsFile.getParentFile().exists());
+ Assert.assertTrue(tsFile.createNewFile() || tsFile.exists());
+
+ final TsFileResource resource = new TsFileResource(tsFile);
+ resource.setStatus(TsFileResourceStatus.NORMAL);
+ resource
+ .getExclusiveModFile()
+ .write(new TreeDeletionEntry(new MeasurementPath("root.db.d1.s1"), 0, 1));
+ final File originalModFile = resource.getExclusiveModFile().getFile();
+ Assert.assertTrue(originalModFile.exists());
+
+ originalEvent =
+ new PipeTsFileInsertionEvent(
+ false,
+ "root.db",
+ resource,
+ null,
+ null,
+ true,
+ true,
+ false,
+ false,
+ null,
+ null,
+ 1L,
+ null,
+ buildUnionPattern(true, Collections.singletonList(new IoTDBTreePattern(true, null))),
+ new TablePattern(false, null, null),
+ null,
+ null,
+ null,
+ true,
+ Long.MIN_VALUE,
+ Long.MAX_VALUE);
+ Assert.assertTrue(originalEvent.increaseReferenceCount("assigner"));
+
+ final File assignerPinnedModFile = originalEvent.getModFile();
+ Assert.assertNotNull(assignerPinnedModFile);
+ Assert.assertNotEquals(
+ originalModFile.getAbsolutePath(), assignerPinnedModFile.getAbsolutePath());
+
+ Assert.assertTrue(originalModFile.delete());
+ Assert.assertFalse(originalModFile.exists());
+
+ copiedEvent =
+ originalEvent.shallowCopySelfAndBindPipeTaskMetaForProgressReport(
+ "testPipeCopy",
+ 2L,
+ null,
+ buildUnionPattern(true, Collections.singletonList(new IoTDBTreePattern(true, null))),
+ new TablePattern(false, null, null),
+ null,
+ null,
+ null,
+ true,
+ Long.MIN_VALUE,
+ Long.MAX_VALUE);
+
+ Assert.assertTrue(copiedEvent.isWithMod());
+ Assert.assertEquals(
+ assignerPinnedModFile.getAbsolutePath(), copiedEvent.getModFile().getAbsolutePath());
+
+ Assert.assertTrue(copiedEvent.increaseReferenceCount("source"));
+ final File pipePinnedModFile = copiedEvent.getModFile();
+ Assert.assertNotNull(pipePinnedModFile);
+ Assert.assertTrue(pipePinnedModFile.exists());
+ Assert.assertNotEquals(
+ originalModFile.getAbsolutePath(), pipePinnedModFile.getAbsolutePath());
+ } finally {
+ if (copiedEvent != null) {
+ copiedEvent.clearReferenceCount("source");
+ copiedEvent.close();
+ }
+ if (originalEvent != null) {
+ originalEvent.clearReferenceCount("assigner");
+ originalEvent.close();
+ }
+ FileUtils.deleteFileOrDirectory(new File(TestConstant.BASE_OUTPUT_PATH));
+ }
+ }
+
+ @Test
+ public void testPinnedEventDoesNotAdoptFutureModFile() throws Exception {
+ final File tsFile = createTestTsFileUnderDataRegion(1, 1, 1, 5);
+ PipeTsFileInsertionEvent originalEvent = null;
+ PipeTsFileInsertionEvent copiedEvent = null;
+ try {
+ Assert.assertTrue(tsFile.getParentFile().mkdirs() || tsFile.getParentFile().exists());
+ Assert.assertTrue(tsFile.createNewFile() || tsFile.exists());
+
+ final TsFileResource resource = new TsFileResource(tsFile);
+ resource.setStatus(TsFileResourceStatus.NORMAL);
+
+ originalEvent =
+ new PipeTsFileInsertionEvent(
+ false,
+ "root.db",
+ resource,
+ null,
+ null,
+ true,
+ true,
+ false,
+ false,
+ null,
+ "testPipe",
+ 1L,
+ null,
+ buildUnionPattern(true, Collections.singletonList(new IoTDBTreePattern(true, null))),
+ new TablePattern(false, null, null),
+ null,
+ null,
+ null,
+ true,
+ Long.MIN_VALUE,
+ Long.MAX_VALUE);
+
+ Assert.assertFalse(originalEvent.isWithMod());
+ Assert.assertTrue(originalEvent.increaseReferenceCount("assigner"));
+ Assert.assertFalse(originalEvent.isWithMod());
+ Assert.assertNull(originalEvent.getModFile());
+
+ resource
+ .getExclusiveModFile()
+ .write(new TreeDeletionEntry(new MeasurementPath("root.db.d1.s1"), 0, 1));
+ Assert.assertTrue(resource.getExclusiveModFile().getFile().exists());
+
+ Assert.assertFalse(originalEvent.isWithMod());
+ Assert.assertNull(originalEvent.getModFile());
+
+ copiedEvent =
+ originalEvent.shallowCopySelfAndBindPipeTaskMetaForProgressReport(
+ "testPipeCopy",
+ 2L,
+ null,
+ buildUnionPattern(true, Collections.singletonList(new IoTDBTreePattern(true, null))),
+ new TablePattern(false, null, null),
+ null,
+ null,
+ null,
+ true,
+ Long.MIN_VALUE,
+ Long.MAX_VALUE);
+
+ Assert.assertFalse(copiedEvent.isWithMod());
+ Assert.assertNull(copiedEvent.getModFile());
+
+ Assert.assertTrue(copiedEvent.increaseReferenceCount("source"));
+ Assert.assertTrue(copiedEvent.isWithMod());
+ Assert.assertNotNull(copiedEvent.getModFile());
+ Assert.assertNotEquals(
+ resource.getExclusiveModFile().getFile().getAbsolutePath(),
+ copiedEvent.getModFile().getAbsolutePath());
+ } finally {
+ if (copiedEvent != null) {
+ copiedEvent.clearReferenceCount("source");
+ copiedEvent.close();
+ }
+ if (originalEvent != null) {
+ originalEvent.clearReferenceCount("assigner");
+ originalEvent.close();
+ }
+ FileUtils.deleteFileOrDirectory(new File(TestConstant.BASE_OUTPUT_PATH));
+ }
+ }
+
+ @Test
+ public void testPinnedModFilePathIsStableAfterIncreaseReferenceCount() throws Exception {
+ final File tsFile =
+ new File(
+ TsFileNameGenerator.generateNewTsFilePath(
+ TestConstant.BASE_OUTPUT_PATH + IoTDBConstant.SEQUENCE_FOLDER_NAME, 1, 1, 1, 2));
+ PipeTsFileInsertionEvent event = null;
+ try {
+ Assert.assertTrue(tsFile.getParentFile().mkdirs() || tsFile.getParentFile().exists());
+ Assert.assertTrue(tsFile.createNewFile() || tsFile.exists());
+
+ final TsFileResource resource = new TsFileResource(tsFile);
+ resource.setStatus(TsFileResourceStatus.NORMAL);
+
+ event =
+ new PipeTsFileInsertionEvent(
+ false,
+ "root.db",
+ resource,
+ null,
+ null,
+ true,
+ true,
+ false,
+ false,
+ null,
+ "testPipe",
+ 1L,
+ null,
+ buildUnionPattern(true, Collections.singletonList(new IoTDBTreePattern(true, null))),
+ new TablePattern(false, null, null),
+ null,
+ null,
+ null,
+ true,
+ Long.MIN_VALUE,
+ Long.MAX_VALUE);
+
+ Assert.assertFalse(event.isWithMod());
+
+ // Create the mod file after event construction but before pinning.
+ resource
+ .getExclusiveModFile()
+ .write(new TreeDeletionEntry(new MeasurementPath("root.db.d1.s1"), 0, 1));
+ final File originalModFile = resource.getExclusiveModFile().getFile();
+ Assert.assertTrue(originalModFile.exists());
+ Assert.assertTrue(event.isWithMod());
+ Assert.assertEquals(originalModFile.getAbsolutePath(), event.getModFile().getAbsolutePath());
+
+ // Pin the event: mod file should be copied to pipe dir and must not be overwritten by
+ // refresh.
+ Assert.assertTrue(event.increaseReferenceCount("test"));
+ final File pinnedModFile = event.getModFile();
+ Assert.assertNotNull(pinnedModFile);
+ Assert.assertNotEquals(originalModFile.getAbsolutePath(), pinnedModFile.getAbsolutePath());
+
+ // Ensure subsequent refresh calls won't revert it to the original path.
+ Assert.assertTrue(event.isWithMod());
+ Assert.assertEquals(pinnedModFile.getAbsolutePath(), event.getModFile().getAbsolutePath());
+ } finally {
+ if (event != null) {
+ event.clearReferenceCount("test");
+ event.close();
+ }
+ FileUtils.deleteFileOrDirectory(new File(TestConstant.BASE_OUTPUT_PATH));
+ }
+ }
+
+ @Test
+ public void testDelayedAssignerRetainDoesNotFreezeFutureMod() throws Exception {
+ final File tsFile =
+ new File(
+ TsFileNameGenerator.generateNewTsFilePath(
+ TestConstant.BASE_OUTPUT_PATH + IoTDBConstant.SEQUENCE_FOLDER_NAME, 1, 1, 1, 3));
+ PipeTsFileInsertionEvent event = null;
+ try {
+ Assert.assertTrue(tsFile.getParentFile().mkdirs() || tsFile.getParentFile().exists());
+ Assert.assertTrue(tsFile.createNewFile() || tsFile.exists());
+
+ final TsFileResource resource = new TsFileResource(tsFile);
+ resource.setStatus(TsFileResourceStatus.NORMAL);
+
+ event = createTestTsFileInsertionEvent(resource, null);
+ event.enableDelayModSnapshotUntilReplicateIndex();
+
+ Assert.assertTrue(event.increaseReferenceCount("assigner"));
+ Assert.assertFalse(event.isWithMod());
+ Assert.assertFalse(event.isModPinned());
+ Assert.assertNotEquals(tsFile.getAbsolutePath(), event.getTsFile().getAbsolutePath());
+
+ resource
+ .getExclusiveModFile()
+ .write(new TreeDeletionEntry(new MeasurementPath("root.db.d1.s1"), 0, 1));
+ final File originalModFile = resource.getExclusiveModFile().getFile();
+ Assert.assertTrue(originalModFile.exists());
+
+ Assert.assertTrue(event.isWithMod());
+ Assert.assertEquals(originalModFile.getAbsolutePath(), event.getModFile().getAbsolutePath());
+ Assert.assertFalse(event.isModPinned());
+ } finally {
+ if (event != null) {
+ event.clearReferenceCount("assigner");
+ event.close();
+ }
+ FileUtils.deleteFileOrDirectory(new File(TestConstant.BASE_OUTPUT_PATH));
+ }
+ }
+
+ @Test
+ public void testDelayedPipeCopyWaitsForEarlierDeletionBeforeFreezingMod() throws Exception {
+ final File tsFile = createTestTsFileUnderDataRegion(1, 1, 1, 6);
+ PipeTsFileInsertionEvent originalEvent = null;
+ PipeTsFileInsertionEvent copiedEvent = null;
+ Thread t = null;
+ try {
+ Assert.assertTrue(tsFile.getParentFile().mkdirs() || tsFile.getParentFile().exists());
+ Assert.assertTrue(tsFile.createNewFile() || tsFile.exists());
+
+ final TsFileResource resource = new TsFileResource(tsFile);
+ resource.setStatus(TsFileResourceStatus.NORMAL);
+ final PipeTsFileDeletionBarrier barrier = PipeTsFileDeletionBarrier.getInstance();
+ final int regionId = Integer.parseInt(resource.getDataRegionId());
+ final String tsFilePath = resource.getTsFilePath();
+
+ originalEvent = createTestTsFileInsertionEvent(resource, null);
+ originalEvent.enableDelayModSnapshotUntilReplicateIndex();
+ Assert.assertTrue(originalEvent.increaseReferenceCount("assigner"));
+
+ copiedEvent = createTestPipeCopy(originalEvent, "testPipeCopy");
+ final long deleteSeq = barrier.beginDeletion(regionId);
+ barrier.resolveDeletionTargets(regionId, deleteSeq, Collections.singleton(tsFilePath));
+ copiedEvent.setReplicateIndexForIoTV2(100L);
+
+ final CountDownLatch started = new CountDownLatch(1);
+ final CountDownLatch finished = new CountDownLatch(1);
+ final AtomicBoolean increased = new AtomicBoolean(false);
+ final PipeTsFileInsertionEvent finalCopiedEvent = copiedEvent;
+
+ t =
+ new Thread(
+ () -> {
+ started.countDown();
+ increased.set(finalCopiedEvent.increaseReferenceCount("source"));
+ finished.countDown();
+ },
+ "test-tsfile-delete-barrier-wait-earlier-delete");
+ t.start();
+
+ Assert.assertTrue(started.await(5, TimeUnit.SECONDS));
+ Assert.assertFalse(finished.await(200, TimeUnit.MILLISECONDS));
+
+ resource
+ .getExclusiveModFile()
+ .write(new TreeDeletionEntry(new MeasurementPath("root.db.d1.s1"), 0, 1));
+ final File originalModFile = resource.getExclusiveModFile().getFile();
+ Assert.assertTrue(originalModFile.exists());
+
+ barrier.finishDeletion(deleteSeq, Collections.singleton(tsFilePath));
+
+ Assert.assertTrue(finished.await(5, TimeUnit.SECONDS));
+ Assert.assertTrue(increased.get());
+ Assert.assertTrue(copiedEvent.isWithMod());
+ Assert.assertTrue(copiedEvent.isModPinned());
+ Assert.assertNotNull(copiedEvent.getModFile());
+ Assert.assertNotEquals(
+ originalModFile.getAbsolutePath(), copiedEvent.getModFile().getAbsolutePath());
+ } finally {
+ if (t != null) {
+ t.join(TimeUnit.SECONDS.toMillis(5));
+ }
+ if (copiedEvent != null && copiedEvent.getReferenceCount() > 0) {
+ copiedEvent.clearReferenceCount("source");
+ copiedEvent.close();
+ }
+ if (originalEvent != null && originalEvent.getReferenceCount() > 0) {
+ originalEvent.clearReferenceCount("assigner");
+ originalEvent.close();
+ }
+ FileUtils.deleteFileOrDirectory(new File(TestConstant.BASE_OUTPUT_PATH));
+ }
+ }
+
+ @Test
+ public void testDelayedPipeCopyDoesNotFreezeLaterDeletion() throws Exception {
+ final File tsFile = createTestTsFileUnderDataRegion(1, 1, 1, 7);
+ PipeTsFileInsertionEvent originalEvent = null;
+ PipeTsFileInsertionEvent copiedEvent = null;
+ Thread t = null;
+ try {
+ Assert.assertTrue(tsFile.getParentFile().mkdirs() || tsFile.getParentFile().exists());
+ Assert.assertTrue(tsFile.createNewFile() || tsFile.exists());
+
+ final TsFileResource resource = new TsFileResource(tsFile);
+ resource.setStatus(TsFileResourceStatus.NORMAL);
+ final PipeTsFileDeletionBarrier barrier = PipeTsFileDeletionBarrier.getInstance();
+ final int regionId = Integer.parseInt(resource.getDataRegionId());
+ final String tsFilePath = resource.getTsFilePath();
+
+ originalEvent = createTestTsFileInsertionEvent(resource, null);
+ originalEvent.enableDelayModSnapshotUntilReplicateIndex();
+ Assert.assertTrue(originalEvent.increaseReferenceCount("assigner"));
+
+ copiedEvent = createTestPipeCopy(originalEvent, "testPipeCopy");
+ copiedEvent.setReplicateIndexForIoTV2(100L);
+
+ final long deleteSeq = barrier.beginDeletion(regionId);
+ barrier.resolveDeletionTargets(regionId, deleteSeq, Collections.singleton(tsFilePath));
+
+ final CountDownLatch started = new CountDownLatch(1);
+ final CountDownLatch finished = new CountDownLatch(1);
+ final AtomicReference failure = new AtomicReference<>(null);
+ t =
+ new Thread(
+ () -> {
+ try {
+ started.countDown();
+ barrier.awaitSnapshotsBeforeMaterialization(tsFilePath, deleteSeq);
+ resource
+ .getExclusiveModFile()
+ .write(new TreeDeletionEntry(new MeasurementPath("root.db.d1.s1"), 0, 1));
+ barrier.finishDeletion(deleteSeq, Collections.singleton(tsFilePath));
+ } catch (final Throwable throwable) {
+ failure.set(throwable);
+ } finally {
+ finished.countDown();
+ }
+ },
+ "test-tsfile-delete-barrier-wait-later-delete");
+ t.start();
+
+ Assert.assertTrue(started.await(5, TimeUnit.SECONDS));
+ Assert.assertFalse(finished.await(200, TimeUnit.MILLISECONDS));
+
+ Assert.assertTrue(copiedEvent.increaseReferenceCount("source"));
+ Assert.assertFalse(copiedEvent.isWithMod());
+ Assert.assertNull(copiedEvent.getModFile());
+
+ Assert.assertTrue(finished.await(5, TimeUnit.SECONDS));
+ if (failure.get() != null) {
+ throw new AssertionError("Later deletion thread should finish successfully", failure.get());
+ }
+ Assert.assertTrue(resource.getExclusiveModFile().getFile().exists());
+ } finally {
+ if (t != null) {
+ t.join(TimeUnit.SECONDS.toMillis(5));
+ }
+ if (copiedEvent != null && copiedEvent.getReferenceCount() > 0) {
+ copiedEvent.clearReferenceCount("source");
+ copiedEvent.close();
+ }
+ if (originalEvent != null && originalEvent.getReferenceCount() > 0) {
+ originalEvent.clearReferenceCount("assigner");
+ originalEvent.close();
+ }
+ FileUtils.deleteFileOrDirectory(new File(TestConstant.BASE_OUTPUT_PATH));
+ }
+ }
+
+ private PipeTsFileInsertionEvent createTestTsFileInsertionEvent(
+ final TsFileResource resource, final String pipeName) {
+ return new PipeTsFileInsertionEvent(
+ false,
+ "root.db",
+ resource,
+ null,
+ null,
+ true,
+ true,
+ false,
+ false,
+ null,
+ pipeName,
+ 1L,
+ null,
+ buildUnionPattern(true, Collections.singletonList(new IoTDBTreePattern(true, null))),
+ new TablePattern(false, null, null),
+ null,
+ null,
+ null,
+ true,
+ Long.MIN_VALUE,
+ Long.MAX_VALUE);
+ }
+
+ private PipeTsFileInsertionEvent createTestPipeCopy(
+ final PipeTsFileInsertionEvent sourceEvent, final String pipeName) {
+ return sourceEvent.shallowCopySelfAndBindPipeTaskMetaForProgressReport(
+ pipeName,
+ 2L,
+ null,
+ buildUnionPattern(true, Collections.singletonList(new IoTDBTreePattern(true, null))),
+ new TablePattern(false, null, null),
+ null,
+ null,
+ null,
+ true,
+ Long.MIN_VALUE,
+ Long.MAX_VALUE);
+ }
+
+ private File createTestTsFileUnderDataRegion(
+ final long time,
+ final long version,
+ final int innerSpaceCompactionCount,
+ final int crossSpaceCompactionCount) {
+ return new File(
+ new File(
+ new File(
+ new File(
+ TestConstant.BASE_OUTPUT_PATH + IoTDBConstant.SEQUENCE_FOLDER_NAME, "root.db"),
+ "1"),
+ "0"),
+ TsFileNameGenerator.generateNewTsFileName(
+ time, version, innerSpaceCompactionCount, crossSpaceCompactionCount));
+ }
+
static class TestAccessControl implements AccessControl {
@Override
diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java
index 8814190755e27..259ec83650db6 100644
--- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java
+++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/event/TsFileInsertionEventParserTest.java
@@ -560,6 +560,8 @@ private void testTsFilePointNum(
"",
new TsFileResource(tsFile),
null,
+ null,
+ true,
true,
false,
false,