From 0de2ad0e8e900c9be63e052696a72e31b79b6d9b Mon Sep 17 00:00:00 2001 From: Peng Junzhi <201250214@smail.nju.edu.cn> Date: Thu, 2 Apr 2026 21:33:55 +0800 Subject: [PATCH 1/3] Fix pipe consensus compatibility during rolling upgrade --- .../conf/SystemPropertiesUtils.java | 71 ++++++++++++++++--- .../iotdb/consensus/ConsensusFactory.java | 15 +++- .../apache/iotdb/db/conf/IoTDBStartCheck.java | 41 +++++++++-- .../PipeDataRegionProcessorConstructor.java | 3 + .../PipeDataRegionSinkConstructor.java | 6 ++ .../plugin/PipeDataNodePluginAgentTest.java | 31 ++++++++ .../plugin/builtin/BuiltinPipePlugin.java | 9 ++- 7 files changed, 161 insertions(+), 15 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java index 529f15d06cd44..47e592e2a0a2d 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java @@ -27,6 +27,7 @@ import org.apache.iotdb.commons.exception.BadNodeUrlException; import org.apache.iotdb.commons.file.SystemPropertiesHandler; import org.apache.iotdb.commons.utils.NodeUrlUtils; +import org.apache.iotdb.consensus.ConsensusFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -125,9 +126,24 @@ public static void checkSystemProperties() throws IOException { } // Consensus protocol configuration - String configNodeConsensusProtocolClass = + boolean needRewriteConsensusProtocol = false; + + String persistedConfigNodeConsensusProtocolClass = systemProperties.getProperty(CN_CONSENSUS_PROTOCOL, null); - if (!configNodeConsensusProtocolClass.equals(conf.getConfigNodeConsensusProtocolClass())) { + String configNodeConsensusProtocolClass = + ConsensusFactory.normalizeConsensusProtocolClass(persistedConfigNodeConsensusProtocolClass); + if (!Objects.equals( + persistedConfigNodeConsensusProtocolClass, configNodeConsensusProtocolClass)) { + systemProperties.setProperty(CN_CONSENSUS_PROTOCOL, configNodeConsensusProtocolClass); + needRewriteConsensusProtocol = true; + LOGGER.warn( + "[SystemProperties] Normalize {} from {} to {} for compatibility.", + CN_CONSENSUS_PROTOCOL, + persistedConfigNodeConsensusProtocolClass, + configNodeConsensusProtocolClass); + } + if (!Objects.equals( + configNodeConsensusProtocolClass, conf.getConfigNodeConsensusProtocolClass())) { LOGGER.warn( format, CN_CONSENSUS_PROTOCOL, @@ -136,9 +152,22 @@ public static void checkSystemProperties() throws IOException { conf.setConfigNodeConsensusProtocolClass(configNodeConsensusProtocolClass); } - String dataRegionConsensusProtocolClass = + String persistedDataRegionConsensusProtocolClass = systemProperties.getProperty(DATA_CONSENSUS_PROTOCOL, null); - if (!dataRegionConsensusProtocolClass.equals(conf.getDataRegionConsensusProtocolClass())) { + String dataRegionConsensusProtocolClass = + ConsensusFactory.normalizeConsensusProtocolClass(persistedDataRegionConsensusProtocolClass); + if (!Objects.equals( + persistedDataRegionConsensusProtocolClass, dataRegionConsensusProtocolClass)) { + systemProperties.setProperty(DATA_CONSENSUS_PROTOCOL, dataRegionConsensusProtocolClass); + needRewriteConsensusProtocol = true; + LOGGER.warn( + "[SystemProperties] Normalize {} from {} to {} for compatibility.", + DATA_CONSENSUS_PROTOCOL, + persistedDataRegionConsensusProtocolClass, + dataRegionConsensusProtocolClass); + } + if (!Objects.equals( + dataRegionConsensusProtocolClass, conf.getDataRegionConsensusProtocolClass())) { LOGGER.warn( format, DATA_CONSENSUS_PROTOCOL, @@ -147,9 +176,23 @@ public static void checkSystemProperties() throws IOException { conf.setDataRegionConsensusProtocolClass(dataRegionConsensusProtocolClass); } - String schemaRegionConsensusProtocolClass = + String persistedSchemaRegionConsensusProtocolClass = systemProperties.getProperty(SCHEMA_CONSENSUS_PROTOCOL, null); - if (!schemaRegionConsensusProtocolClass.equals(conf.getSchemaRegionConsensusProtocolClass())) { + String schemaRegionConsensusProtocolClass = + ConsensusFactory.normalizeConsensusProtocolClass( + persistedSchemaRegionConsensusProtocolClass); + if (!Objects.equals( + persistedSchemaRegionConsensusProtocolClass, schemaRegionConsensusProtocolClass)) { + systemProperties.setProperty(SCHEMA_CONSENSUS_PROTOCOL, schemaRegionConsensusProtocolClass); + needRewriteConsensusProtocol = true; + LOGGER.warn( + "[SystemProperties] Normalize {} from {} to {} for compatibility.", + SCHEMA_CONSENSUS_PROTOCOL, + persistedSchemaRegionConsensusProtocolClass, + schemaRegionConsensusProtocolClass); + } + if (!Objects.equals( + schemaRegionConsensusProtocolClass, conf.getSchemaRegionConsensusProtocolClass())) { LOGGER.warn( format, SCHEMA_CONSENSUS_PROTOCOL, @@ -157,6 +200,9 @@ public static void checkSystemProperties() throws IOException { schemaRegionConsensusProtocolClass); conf.setSchemaRegionConsensusProtocolClass(schemaRegionConsensusProtocolClass); } + if (needRewriteConsensusProtocol) { + systemPropertiesHandler.overwrite(systemProperties); + } // PartitionSlot configuration if (systemProperties.getProperty(SERIES_PARTITION_SLOT_NUM, null) != null) { @@ -263,11 +309,18 @@ public static void storeSystemParameters() throws IOException { systemProperties.setProperty(CN_CONSENSUS_PORT, String.valueOf(conf.getConsensusPort())); // Consensus protocol configuration - systemProperties.setProperty(CN_CONSENSUS_PROTOCOL, conf.getConfigNodeConsensusProtocolClass()); systemProperties.setProperty( - DATA_CONSENSUS_PROTOCOL, conf.getDataRegionConsensusProtocolClass()); + CN_CONSENSUS_PROTOCOL, + ConsensusFactory.normalizeConsensusProtocolClass( + conf.getConfigNodeConsensusProtocolClass())); + systemProperties.setProperty( + DATA_CONSENSUS_PROTOCOL, + ConsensusFactory.normalizeConsensusProtocolClass( + conf.getDataRegionConsensusProtocolClass())); systemProperties.setProperty( - SCHEMA_CONSENSUS_PROTOCOL, conf.getSchemaRegionConsensusProtocolClass()); + SCHEMA_CONSENSUS_PROTOCOL, + ConsensusFactory.normalizeConsensusProtocolClass( + conf.getSchemaRegionConsensusProtocolClass())); // PartitionSlot configuration systemProperties.setProperty( diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java index 896a877e3e776..b303a6d5de396 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java @@ -37,6 +37,8 @@ public class ConsensusFactory { public static final String SIMPLE_CONSENSUS = "org.apache.iotdb.consensus.simple.SimpleConsensus"; public static final String RATIS_CONSENSUS = "org.apache.iotdb.consensus.ratis.RatisConsensus"; public static final String IOT_CONSENSUS = "org.apache.iotdb.consensus.iot.IoTConsensus"; + public static final String LEGACY_IOT_CONSENSUS_V2 = + "org.apache.iotdb.consensus.pipe.PipeConsensus"; public static final String REAL_IOT_CONSENSUS_V2 = "org.apache.iotdb.consensus.pipe.IoTConsensusV2"; public static final String IOT_CONSENSUS_V2 = "org.apache.iotdb.consensus.iot.IoTConsensusV2"; @@ -49,11 +51,22 @@ private ConsensusFactory() { throw new IllegalStateException("Utility class ConsensusFactory"); } + public static String normalizeConsensusProtocolClass(String className) { + if (className == null) { + return null; + } + if (LEGACY_IOT_CONSENSUS_V2.equals(className) || REAL_IOT_CONSENSUS_V2.equals(className)) { + return IOT_CONSENSUS_V2; + } + return className; + } + public static Optional getConsensusImpl( String className, ConsensusConfig config, IStateMachine.Registry registry) { try { + className = normalizeConsensusProtocolClass(className); // special judge for IoTConsensusV2 - if (className.equals(IOT_CONSENSUS_V2)) { + if (IOT_CONSENSUS_V2.equals(className)) { className = REAL_IOT_CONSENSUS_V2; // initialize iotConsensusV2's thrift component IoTV2GlobalComponentContainer.build(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBStartCheck.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBStartCheck.java index e2aa1950af6a5..8451a120d1c14 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBStartCheck.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBStartCheck.java @@ -282,13 +282,46 @@ private void checkImmutableSystemProperties() throws IOException { if (properties.containsKey(CLUSTER_ID)) { config.setClusterId(properties.getProperty(CLUSTER_ID)); } + boolean needRewriteConsensusProtocol = false; if (properties.containsKey(SCHEMA_REGION_CONSENSUS_PROTOCOL)) { - config.setSchemaRegionConsensusProtocolClass( - properties.getProperty(SCHEMA_REGION_CONSENSUS_PROTOCOL)); + final String persistedSchemaRegionConsensusProtocolClass = + properties.getProperty(SCHEMA_REGION_CONSENSUS_PROTOCOL); + final String schemaRegionConsensusProtocolClass = + ConsensusFactory.normalizeConsensusProtocolClass( + persistedSchemaRegionConsensusProtocolClass); + if (!Objects.equals( + persistedSchemaRegionConsensusProtocolClass, schemaRegionConsensusProtocolClass)) { + properties.setProperty( + SCHEMA_REGION_CONSENSUS_PROTOCOL, schemaRegionConsensusProtocolClass); + needRewriteConsensusProtocol = true; + logger.warn( + "[SystemProperties] Normalize {} from {} to {} for compatibility.", + SCHEMA_REGION_CONSENSUS_PROTOCOL, + persistedSchemaRegionConsensusProtocolClass, + schemaRegionConsensusProtocolClass); + } + config.setSchemaRegionConsensusProtocolClass(schemaRegionConsensusProtocolClass); } if (properties.containsKey(DATA_REGION_CONSENSUS_PROTOCOL)) { - config.setDataRegionConsensusProtocolClass( - properties.getProperty(DATA_REGION_CONSENSUS_PROTOCOL)); + final String persistedDataRegionConsensusProtocolClass = + properties.getProperty(DATA_REGION_CONSENSUS_PROTOCOL); + final String dataRegionConsensusProtocolClass = + ConsensusFactory.normalizeConsensusProtocolClass( + persistedDataRegionConsensusProtocolClass); + if (!Objects.equals( + persistedDataRegionConsensusProtocolClass, dataRegionConsensusProtocolClass)) { + properties.setProperty(DATA_REGION_CONSENSUS_PROTOCOL, dataRegionConsensusProtocolClass); + needRewriteConsensusProtocol = true; + logger.warn( + "[SystemProperties] Normalize {} from {} to {} for compatibility.", + DATA_REGION_CONSENSUS_PROTOCOL, + persistedDataRegionConsensusProtocolClass, + dataRegionConsensusProtocolClass); + } + config.setDataRegionConsensusProtocolClass(dataRegionConsensusProtocolClass); + } + if (needRewriteConsensusProtocol) { + systemPropertiesHandler.overwrite(properties); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java index 5bd2ef17509d8..2991bb066caa6 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java @@ -69,6 +69,9 @@ protected void initConstructors() { pluginConstructors.put( BuiltinPipePlugin.IOT_CONSENSUS_V2_PROCESSOR.getPipePluginName(), IoTConsensusV2Processor::new); + pluginConstructors.put( + BuiltinPipePlugin.PIPE_CONSENSUS_PROCESSOR.getPipePluginName(), + IoTConsensusV2Processor::new); pluginConstructors.put( BuiltinPipePlugin.RENAME_DATABASE_PROCESSOR.getPipePluginName(), RenameDatabaseProcessor::new); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionSinkConstructor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionSinkConstructor.java index cbd77ebc9025e..db7995c9be1a3 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionSinkConstructor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionSinkConstructor.java @@ -56,6 +56,9 @@ protected void initConstructors() { pluginConstructors.put( BuiltinPipePlugin.IOT_CONSENSUS_V2_ASYNC_CONNECTOR.getPipePluginName(), IoTConsensusV2AsyncSink::new); + pluginConstructors.put( + BuiltinPipePlugin.PIPE_CONSENSUS_ASYNC_CONNECTOR.getPipePluginName(), + IoTConsensusV2AsyncSink::new); pluginConstructors.put( BuiltinPipePlugin.IOTDB_LEGACY_PIPE_CONNECTOR.getPipePluginName(), IoTDBLegacyPipeSink::new); @@ -97,5 +100,8 @@ protected void initConstructors() { pluginConstructors.put( BuiltinPipePlugin.IOT_CONSENSUS_V2_ASYNC_SINK.getPipePluginName(), IoTConsensusV2AsyncSink::new); + pluginConstructors.put( + BuiltinPipePlugin.PIPE_CONSENSUS_ASYNC_SINK.getPipePluginName(), + IoTConsensusV2AsyncSink::new); } } diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgentTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgentTest.java index 5d4d6a640b668..16f96b25d5724 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgentTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgentTest.java @@ -28,6 +28,8 @@ import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant; import org.apache.iotdb.db.pipe.sink.protocol.thrift.async.IoTDBDataRegionAsyncSink; import org.apache.iotdb.db.pipe.source.dataregion.IoTDBDataRegionSource; +import org.apache.iotdb.db.pipe.processor.iotconsensusv2.IoTConsensusV2Processor; +import org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.IoTConsensusV2AsyncSink; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.junit.After; @@ -118,6 +120,20 @@ public void testPipePluginAgent() { } })) .getClass()); + Assert.assertEquals( + IoTConsensusV2Processor.class, + agent + .dataRegion() + .reflectProcessor( + new PipeParameters( + new HashMap() { + { + put( + PipeProcessorConstant.PROCESSOR_KEY, + BuiltinPipePlugin.PIPE_CONSENSUS_PROCESSOR.getPipePluginName()); + } + })) + .getClass()); Assert.assertEquals( IoTDBDataRegionAsyncSink.class, agent @@ -132,5 +148,20 @@ public void testPipePluginAgent() { } })) .getClass()); + Assert.assertEquals( + IoTConsensusV2AsyncSink.class, + agent + .dataRegion() + .reflectSink( + new PipeParameters( + new HashMap() { + { + put( + PipeSinkConstant.CONNECTOR_KEY, + BuiltinPipePlugin.PIPE_CONSENSUS_ASYNC_CONNECTOR + .getPipePluginName()); + } + })) + .getClass()); } } diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java index e95ddb61e1a2c..a73f0ab2ca19a 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java @@ -74,6 +74,7 @@ public enum BuiltinPipePlugin { STANDARD_STATISTICS_PROCESSOR("standard-statistics-processor", StandardStatisticsProcessor.class), TUMBLING_WINDOWING_PROCESSOR("tumbling-windowing-processor", TumblingWindowingProcessor.class), IOT_CONSENSUS_V2_PROCESSOR("iot-consensus-v2-processor", IoTConsensusV2Processor.class), + PIPE_CONSENSUS_PROCESSOR("pipe-consensus-processor", IoTConsensusV2Processor.class), RENAME_DATABASE_PROCESSOR("rename-database-processor", RenameDatabaseProcessor.class), // connectors @@ -86,6 +87,8 @@ public enum BuiltinPipePlugin { IOTDB_AIR_GAP_CONNECTOR("iotdb-air-gap-connector", IoTDBAirGapSink.class), IOT_CONSENSUS_V2_ASYNC_CONNECTOR( "iot-consensus-v2-async-connector", IoTConsensusV2AsyncSink.class), + PIPE_CONSENSUS_ASYNC_CONNECTOR( + "pipe-consensus-async-connector", IoTConsensusV2AsyncSink.class), WEBSOCKET_CONNECTOR("websocket-connector", WebSocketSink.class), OPC_UA_CONNECTOR("opc-ua-connector", OpcUaSink.class), @@ -105,6 +108,7 @@ public enum BuiltinPipePlugin { WRITE_BACK_SINK("write-back-sink", WriteBackSink.class), SUBSCRIPTION_SINK("subscription-sink", DoNothingSink.class), IOT_CONSENSUS_V2_ASYNC_SINK("iot-consensus-v2-async-sink", IoTConsensusV2AsyncSink.class), + PIPE_CONSENSUS_ASYNC_SINK("pipe-consensus-async-sink", IoTConsensusV2AsyncSink.class), ; private final String pipePluginName; @@ -158,6 +162,7 @@ public String getClassName() { STANDARD_STATISTICS_PROCESSOR.getPipePluginName().toUpperCase(), TUMBLING_WINDOWING_PROCESSOR.getPipePluginName().toUpperCase(), IOT_CONSENSUS_V2_PROCESSOR.getPipePluginName().toUpperCase(), + PIPE_CONSENSUS_PROCESSOR.getPipePluginName().toUpperCase(), RENAME_DATABASE_PROCESSOR.getPipePluginName().toUpperCase(), // Connectors DO_NOTHING_CONNECTOR.getPipePluginName().toUpperCase(), @@ -172,6 +177,7 @@ public String getClassName() { OPC_DA_CONNECTOR.getPipePluginName().toUpperCase(), WRITE_BACK_CONNECTOR.getPipePluginName().toUpperCase(), IOT_CONSENSUS_V2_ASYNC_CONNECTOR.getPipePluginName().toUpperCase(), + PIPE_CONSENSUS_ASYNC_CONNECTOR.getPipePluginName().toUpperCase(), // Sinks IOTDB_THRIFT_SYNC_SINK.getPipePluginName().toUpperCase(), IOTDB_THRIFT_ASYNC_SINK.getPipePluginName().toUpperCase(), @@ -180,5 +186,6 @@ public String getClassName() { OPC_UA_SINK.getPipePluginName().toUpperCase(), OPC_DA_SINK.getPipePluginName().toUpperCase(), SUBSCRIPTION_SINK.getPipePluginName().toUpperCase(), - IOT_CONSENSUS_V2_ASYNC_SINK.getPipePluginName().toUpperCase()))); + IOT_CONSENSUS_V2_ASYNC_SINK.getPipePluginName().toUpperCase(), + PIPE_CONSENSUS_ASYNC_SINK.getPipePluginName().toUpperCase()))); } From ec3f8a35255e1cc2fb29226dd15054edd036f62a Mon Sep 17 00:00:00 2001 From: Peng Junzhi <201250214@smail.nju.edu.cn> Date: Thu, 2 Apr 2026 21:38:42 +0800 Subject: [PATCH 2/3] Apply spotless formatting --- .../db/pipe/agent/plugin/PipeDataNodePluginAgentTest.java | 7 +++---- .../pipe/agent/plugin/builtin/BuiltinPipePlugin.java | 3 +-- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgentTest.java b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgentTest.java index 16f96b25d5724..b0f0b34e92a51 100644 --- a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgentTest.java +++ b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/pipe/agent/plugin/PipeDataNodePluginAgentTest.java @@ -26,10 +26,10 @@ import org.apache.iotdb.commons.pipe.config.constant.PipeProcessorConstant; import org.apache.iotdb.commons.pipe.config.constant.PipeSinkConstant; import org.apache.iotdb.commons.pipe.config.constant.PipeSourceConstant; -import org.apache.iotdb.db.pipe.sink.protocol.thrift.async.IoTDBDataRegionAsyncSink; -import org.apache.iotdb.db.pipe.source.dataregion.IoTDBDataRegionSource; import org.apache.iotdb.db.pipe.processor.iotconsensusv2.IoTConsensusV2Processor; import org.apache.iotdb.db.pipe.sink.protocol.iotconsensusv2.IoTConsensusV2AsyncSink; +import org.apache.iotdb.db.pipe.sink.protocol.thrift.async.IoTDBDataRegionAsyncSink; +import org.apache.iotdb.db.pipe.source.dataregion.IoTDBDataRegionSource; import org.apache.iotdb.pipe.api.customizer.parameter.PipeParameters; import org.junit.After; @@ -158,8 +158,7 @@ public void testPipePluginAgent() { { put( PipeSinkConstant.CONNECTOR_KEY, - BuiltinPipePlugin.PIPE_CONSENSUS_ASYNC_CONNECTOR - .getPipePluginName()); + BuiltinPipePlugin.PIPE_CONSENSUS_ASYNC_CONNECTOR.getPipePluginName()); } })) .getClass()); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java index a73f0ab2ca19a..40033d8a9a779 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java @@ -87,8 +87,7 @@ public enum BuiltinPipePlugin { IOTDB_AIR_GAP_CONNECTOR("iotdb-air-gap-connector", IoTDBAirGapSink.class), IOT_CONSENSUS_V2_ASYNC_CONNECTOR( "iot-consensus-v2-async-connector", IoTConsensusV2AsyncSink.class), - PIPE_CONSENSUS_ASYNC_CONNECTOR( - "pipe-consensus-async-connector", IoTConsensusV2AsyncSink.class), + PIPE_CONSENSUS_ASYNC_CONNECTOR("pipe-consensus-async-connector", IoTConsensusV2AsyncSink.class), WEBSOCKET_CONNECTOR("websocket-connector", WebSocketSink.class), OPC_UA_CONNECTOR("opc-ua-connector", OpcUaSink.class), From d30267a5daf74cdaeb9a19f9d4abbd3401dc8f67 Mon Sep 17 00:00:00 2001 From: Peng Junzhi <201250214@smail.nju.edu.cn> Date: Thu, 2 Apr 2026 22:00:01 +0800 Subject: [PATCH 3/3] Fix pipe consensus upgrade compatibility --- .../conf/SystemPropertiesUtils.java | 40 +++---------------- .../iotdb/consensus/ConsensusFactory.java | 4 ++ .../apache/iotdb/db/conf/IoTDBStartCheck.java | 21 ++-------- .../PipeDataRegionProcessorConstructor.java | 1 + .../PipeDataRegionSinkConstructor.java | 2 + .../plugin/builtin/BuiltinPipePlugin.java | 3 ++ 6 files changed, 20 insertions(+), 51 deletions(-) diff --git a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java index 47e592e2a0a2d..a3ee036f6ab05 100644 --- a/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java +++ b/iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/conf/SystemPropertiesUtils.java @@ -125,23 +125,13 @@ public static void checkSystemProperties() throws IOException { } } + // Only the data region protocol could have been persisted as the old PipeConsensus name + // during a jar-only upgrade, so only that field needs compatibility normalization. // Consensus protocol configuration boolean needRewriteConsensusProtocol = false; - String persistedConfigNodeConsensusProtocolClass = - systemProperties.getProperty(CN_CONSENSUS_PROTOCOL, null); String configNodeConsensusProtocolClass = - ConsensusFactory.normalizeConsensusProtocolClass(persistedConfigNodeConsensusProtocolClass); - if (!Objects.equals( - persistedConfigNodeConsensusProtocolClass, configNodeConsensusProtocolClass)) { - systemProperties.setProperty(CN_CONSENSUS_PROTOCOL, configNodeConsensusProtocolClass); - needRewriteConsensusProtocol = true; - LOGGER.warn( - "[SystemProperties] Normalize {} from {} to {} for compatibility.", - CN_CONSENSUS_PROTOCOL, - persistedConfigNodeConsensusProtocolClass, - configNodeConsensusProtocolClass); - } + systemProperties.getProperty(CN_CONSENSUS_PROTOCOL, null); if (!Objects.equals( configNodeConsensusProtocolClass, conf.getConfigNodeConsensusProtocolClass())) { LOGGER.warn( @@ -176,21 +166,8 @@ public static void checkSystemProperties() throws IOException { conf.setDataRegionConsensusProtocolClass(dataRegionConsensusProtocolClass); } - String persistedSchemaRegionConsensusProtocolClass = - systemProperties.getProperty(SCHEMA_CONSENSUS_PROTOCOL, null); String schemaRegionConsensusProtocolClass = - ConsensusFactory.normalizeConsensusProtocolClass( - persistedSchemaRegionConsensusProtocolClass); - if (!Objects.equals( - persistedSchemaRegionConsensusProtocolClass, schemaRegionConsensusProtocolClass)) { - systemProperties.setProperty(SCHEMA_CONSENSUS_PROTOCOL, schemaRegionConsensusProtocolClass); - needRewriteConsensusProtocol = true; - LOGGER.warn( - "[SystemProperties] Normalize {} from {} to {} for compatibility.", - SCHEMA_CONSENSUS_PROTOCOL, - persistedSchemaRegionConsensusProtocolClass, - schemaRegionConsensusProtocolClass); - } + systemProperties.getProperty(SCHEMA_CONSENSUS_PROTOCOL, null); if (!Objects.equals( schemaRegionConsensusProtocolClass, conf.getSchemaRegionConsensusProtocolClass())) { LOGGER.warn( @@ -309,18 +286,13 @@ public static void storeSystemParameters() throws IOException { systemProperties.setProperty(CN_CONSENSUS_PORT, String.valueOf(conf.getConsensusPort())); // Consensus protocol configuration - systemProperties.setProperty( - CN_CONSENSUS_PROTOCOL, - ConsensusFactory.normalizeConsensusProtocolClass( - conf.getConfigNodeConsensusProtocolClass())); + systemProperties.setProperty(CN_CONSENSUS_PROTOCOL, conf.getConfigNodeConsensusProtocolClass()); systemProperties.setProperty( DATA_CONSENSUS_PROTOCOL, ConsensusFactory.normalizeConsensusProtocolClass( conf.getDataRegionConsensusProtocolClass())); systemProperties.setProperty( - SCHEMA_CONSENSUS_PROTOCOL, - ConsensusFactory.normalizeConsensusProtocolClass( - conf.getSchemaRegionConsensusProtocolClass())); + SCHEMA_CONSENSUS_PROTOCOL, conf.getSchemaRegionConsensusProtocolClass()); // PartitionSlot configuration systemProperties.setProperty( diff --git a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java index b303a6d5de396..d24955aba7396 100644 --- a/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java +++ b/iotdb-core/consensus/src/main/java/org/apache/iotdb/consensus/ConsensusFactory.java @@ -37,6 +37,8 @@ public class ConsensusFactory { public static final String SIMPLE_CONSENSUS = "org.apache.iotdb.consensus.simple.SimpleConsensus"; public static final String RATIS_CONSENSUS = "org.apache.iotdb.consensus.ratis.RatisConsensus"; public static final String IOT_CONSENSUS = "org.apache.iotdb.consensus.iot.IoTConsensus"; + // Keep the pre-rename class name for stale system properties / snapshots restored after a + // jar-only upgrade. public static final String LEGACY_IOT_CONSENSUS_V2 = "org.apache.iotdb.consensus.pipe.PipeConsensus"; public static final String REAL_IOT_CONSENSUS_V2 = @@ -51,6 +53,8 @@ private ConsensusFactory() { throw new IllegalStateException("Utility class ConsensusFactory"); } + // Downstream code compares against IOT_CONSENSUS_V2 directly, so persisted legacy names must be + // normalized to the canonical constant before they fan out. public static String normalizeConsensusProtocolClass(String className) { if (className == null) { return null; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBStartCheck.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBStartCheck.java index 8451a120d1c14..d85444a56d454 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBStartCheck.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBStartCheck.java @@ -282,25 +282,12 @@ private void checkImmutableSystemProperties() throws IOException { if (properties.containsKey(CLUSTER_ID)) { config.setClusterId(properties.getProperty(CLUSTER_ID)); } + // Only the data region protocol could have been persisted as the old PipeConsensus name + // during a jar-only upgrade, so only that field needs compatibility normalization. boolean needRewriteConsensusProtocol = false; if (properties.containsKey(SCHEMA_REGION_CONSENSUS_PROTOCOL)) { - final String persistedSchemaRegionConsensusProtocolClass = - properties.getProperty(SCHEMA_REGION_CONSENSUS_PROTOCOL); - final String schemaRegionConsensusProtocolClass = - ConsensusFactory.normalizeConsensusProtocolClass( - persistedSchemaRegionConsensusProtocolClass); - if (!Objects.equals( - persistedSchemaRegionConsensusProtocolClass, schemaRegionConsensusProtocolClass)) { - properties.setProperty( - SCHEMA_REGION_CONSENSUS_PROTOCOL, schemaRegionConsensusProtocolClass); - needRewriteConsensusProtocol = true; - logger.warn( - "[SystemProperties] Normalize {} from {} to {} for compatibility.", - SCHEMA_REGION_CONSENSUS_PROTOCOL, - persistedSchemaRegionConsensusProtocolClass, - schemaRegionConsensusProtocolClass); - } - config.setSchemaRegionConsensusProtocolClass(schemaRegionConsensusProtocolClass); + config.setSchemaRegionConsensusProtocolClass( + properties.getProperty(SCHEMA_REGION_CONSENSUS_PROTOCOL)); } if (properties.containsKey(DATA_REGION_CONSENSUS_PROTOCOL)) { final String persistedDataRegionConsensusProtocolClass = diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java index 2991bb066caa6..a6160e6b90088 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionProcessorConstructor.java @@ -69,6 +69,7 @@ protected void initConstructors() { pluginConstructors.put( BuiltinPipePlugin.IOT_CONSENSUS_V2_PROCESSOR.getPipePluginName(), IoTConsensusV2Processor::new); + // Keep the pre-rename plugin name wired to the new implementation for stale PipeMeta. pluginConstructors.put( BuiltinPipePlugin.PIPE_CONSENSUS_PROCESSOR.getPipePluginName(), IoTConsensusV2Processor::new); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionSinkConstructor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionSinkConstructor.java index db7995c9be1a3..c8e87890afd69 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionSinkConstructor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/agent/plugin/dataregion/PipeDataRegionSinkConstructor.java @@ -56,6 +56,7 @@ protected void initConstructors() { pluginConstructors.put( BuiltinPipePlugin.IOT_CONSENSUS_V2_ASYNC_CONNECTOR.getPipePluginName(), IoTConsensusV2AsyncSink::new); + // Keep the pre-rename plugin name wired to the new implementation for stale PipeMeta. pluginConstructors.put( BuiltinPipePlugin.PIPE_CONSENSUS_ASYNC_CONNECTOR.getPipePluginName(), IoTConsensusV2AsyncSink::new); @@ -100,6 +101,7 @@ protected void initConstructors() { pluginConstructors.put( BuiltinPipePlugin.IOT_CONSENSUS_V2_ASYNC_SINK.getPipePluginName(), IoTConsensusV2AsyncSink::new); + // Keep the pre-rename plugin name wired to the new implementation for stale PipeMeta. pluginConstructors.put( BuiltinPipePlugin.PIPE_CONSENSUS_ASYNC_SINK.getPipePluginName(), IoTConsensusV2AsyncSink::new); diff --git a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java index 40033d8a9a779..76b45d1e79895 100644 --- a/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java +++ b/iotdb-core/node-commons/src/main/java/org/apache/iotdb/commons/pipe/agent/plugin/builtin/BuiltinPipePlugin.java @@ -74,6 +74,7 @@ public enum BuiltinPipePlugin { STANDARD_STATISTICS_PROCESSOR("standard-statistics-processor", StandardStatisticsProcessor.class), TUMBLING_WINDOWING_PROCESSOR("tumbling-windowing-processor", TumblingWindowingProcessor.class), IOT_CONSENSUS_V2_PROCESSOR("iot-consensus-v2-processor", IoTConsensusV2Processor.class), + // Legacy alias for stale PipeMeta written before the PipeConsensus -> IoTConsensusV2 rename. PIPE_CONSENSUS_PROCESSOR("pipe-consensus-processor", IoTConsensusV2Processor.class), RENAME_DATABASE_PROCESSOR("rename-database-processor", RenameDatabaseProcessor.class), @@ -87,6 +88,7 @@ public enum BuiltinPipePlugin { IOTDB_AIR_GAP_CONNECTOR("iotdb-air-gap-connector", IoTDBAirGapSink.class), IOT_CONSENSUS_V2_ASYNC_CONNECTOR( "iot-consensus-v2-async-connector", IoTConsensusV2AsyncSink.class), + // Legacy alias for stale PipeMeta written before the PipeConsensus -> IoTConsensusV2 rename. PIPE_CONSENSUS_ASYNC_CONNECTOR("pipe-consensus-async-connector", IoTConsensusV2AsyncSink.class), WEBSOCKET_CONNECTOR("websocket-connector", WebSocketSink.class), @@ -107,6 +109,7 @@ public enum BuiltinPipePlugin { WRITE_BACK_SINK("write-back-sink", WriteBackSink.class), SUBSCRIPTION_SINK("subscription-sink", DoNothingSink.class), IOT_CONSENSUS_V2_ASYNC_SINK("iot-consensus-v2-async-sink", IoTConsensusV2AsyncSink.class), + // Legacy alias for stale PipeMeta written before the PipeConsensus -> IoTConsensusV2 rename. PIPE_CONSENSUS_ASYNC_SINK("pipe-consensus-async-sink", IoTConsensusV2AsyncSink.class), ;