From 628c40dfbcf6f599740dd3594ef707c26ad370c1 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Mon, 23 Mar 2026 19:24:59 +0800 Subject: [PATCH 01/11] try --- .../it/autocreate/IoTDBPipeDataSinkIT.java | 29 +++++++++++++++++-- 1 file changed, 27 insertions(+), 2 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java index 5216fe20a27e..9b9da301c423 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java @@ -46,7 +46,7 @@ @Category({MultiClusterIT2AutoCreateSchema.class}) public class IoTDBPipeDataSinkIT extends AbstractPipeDualAutoIT { @Test - public void testThriftConnectorWithRealtimeFirstDisabled() throws Exception { + public void testThriftSinkWithRealtimeFirstDisabled() throws Exception { final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0); final String receiverIp = receiverDataNode.getIp(); @@ -196,7 +196,7 @@ private void testSinkFormat(final String format) throws Exception { } @Test - public void testLegacyConnector() throws Exception { + public void testLegacySink() throws Exception { final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0); final String receiverIp = receiverDataNode.getIp(); @@ -503,4 +503,29 @@ private void testLoadTsFileWithoutVerify(final String loadTsFileStrategy) throws Collections.unmodifiableSet(new HashSet<>(Arrays.asList("1,1.0,", "2,1.0,")))); } } + + @Test + public void testSpecialPartialInsert() throws Exception { + try (final Connection connection = senderEnv.getConnection(); + final Statement statement = connection.createStatement()) { + statement.execute( + String.format( + "create pipe a2b with sink ('node-urls'='%s')", + receiverEnv.getDataNodeWrapperList().get(0).getIpAndPortString())); + } + + TestUtils.executeNonQueries( + senderEnv, + Arrays.asList( + "create timeSeries root.vehicle.d0.s1 double", + "create timeSeries root.vehicle.d0.s2 float", + "insert into root.vehicle.d0(time, s1, s2) values (2, 1, abc)"), + null); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "select * from root.vehicle.**", + "Time,root.vehicle.d0.s1,root.vehicle.d0.s2", + Collections.singleton("2,1.0,null")); + } } From e23a3f350d2593871f1436f8329a2239bf240358 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Mon, 23 Mar 2026 19:28:11 +0800 Subject: [PATCH 02/11] ' --- .../apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java index 9b9da301c423..06d9096c6abc 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java @@ -519,7 +519,7 @@ public void testSpecialPartialInsert() throws Exception { Arrays.asList( "create timeSeries root.vehicle.d0.s1 double", "create timeSeries root.vehicle.d0.s2 float", - "insert into root.vehicle.d0(time, s1, s2) values (2, 1, abc)"), + "insert into root.vehicle.d0(time, s1, s2) values (2, 1, 'abc')"), null); TestUtils.assertDataEventuallyOnEnv( From 7292e7a152b8d4a7a641914046d697679e566b95 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Mon, 23 Mar 2026 20:02:03 +0800 Subject: [PATCH 03/11] no-bomb --- .../it/autocreate/AbstractPipeDualAutoIT.java | 8 +++++-- .../it/autocreate/IoTDBPipeDataSinkIT.java | 24 ++++++++++++------- 2 files changed, 21 insertions(+), 11 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/AbstractPipeDualAutoIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/AbstractPipeDualAutoIT.java index 7914c7cb4ffd..07ffde9977f9 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/AbstractPipeDualAutoIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/AbstractPipeDualAutoIT.java @@ -37,8 +37,8 @@ public void setUp() { senderEnv = MultiEnvFactory.getEnv(0); receiverEnv = MultiEnvFactory.getEnv(1); setupConfig(); - senderEnv.initClusterEnvironment(); - receiverEnv.initClusterEnvironment(); + senderEnv.initClusterEnvironment(1, 1); + receiverEnv.initClusterEnvironment(1, 1); } protected void setupConfig() { @@ -50,6 +50,8 @@ protected void setupConfig() { .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setPipeMemoryManagementEnabled(false) .setIsPipeEnableMemoryCheck(false) + .setSchemaReplicationFactor(1) + .setDataReplicationFactor(1) .setPipeAutoSplitFullEnabled(false); receiverEnv .getConfig() @@ -58,6 +60,8 @@ protected void setupConfig() { .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setPipeMemoryManagementEnabled(false) + .setSchemaReplicationFactor(1) + .setDataReplicationFactor(1) .setIsPipeEnableMemoryCheck(false) .setPipeAutoSplitFullEnabled(false); diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java index 06d9096c6abc..a8b483f1e312 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java @@ -514,18 +514,24 @@ public void testSpecialPartialInsert() throws Exception { receiverEnv.getDataNodeWrapperList().get(0).getIpAndPortString())); } - TestUtils.executeNonQueries( - senderEnv, - Arrays.asList( - "create timeSeries root.vehicle.d0.s1 double", - "create timeSeries root.vehicle.d0.s2 float", - "insert into root.vehicle.d0(time, s1, s2) values (2, 1, 'abc')"), - null); + try { + TestUtils.executeNonQueries( + senderEnv, + Arrays.asList( + "create timeSeries root.vehicle.d0.s1 double", + "create timeSeries root.vehicle.d0.s2 float", + "insert into root.vehicle.d0(time, s1, s2) values (2, 1, 'abc')"), + null); + } catch (final Exception e) { + Assert.assertEquals( + "org.apache.iotdb.jdbc.IoTDBSQLException: 507: Fail to insert measurements [s2] caused by [data type is not consistent, input 'abc', registered FLOAT]", + e.getMessage()); + } TestUtils.assertDataEventuallyOnEnv( receiverEnv, "select * from root.vehicle.**", - "Time,root.vehicle.d0.s1,root.vehicle.d0.s2", - Collections.singleton("2,1.0,null")); + "Time,root.vehicle.d0.s1,", + Collections.singleton("2,1.0,")); } } From cd104eaf5dd6dcd8d2a86b334c41c12506d2f72b Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Mon, 23 Mar 2026 20:07:14 +0800 Subject: [PATCH 04/11] side-effect --- .../iotdb/pipe/it/autocreate/AbstractPipeDualAutoIT.java | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/AbstractPipeDualAutoIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/AbstractPipeDualAutoIT.java index 07ffde9977f9..7914c7cb4ffd 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/AbstractPipeDualAutoIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/AbstractPipeDualAutoIT.java @@ -37,8 +37,8 @@ public void setUp() { senderEnv = MultiEnvFactory.getEnv(0); receiverEnv = MultiEnvFactory.getEnv(1); setupConfig(); - senderEnv.initClusterEnvironment(1, 1); - receiverEnv.initClusterEnvironment(1, 1); + senderEnv.initClusterEnvironment(); + receiverEnv.initClusterEnvironment(); } protected void setupConfig() { @@ -50,8 +50,6 @@ protected void setupConfig() { .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setPipeMemoryManagementEnabled(false) .setIsPipeEnableMemoryCheck(false) - .setSchemaReplicationFactor(1) - .setDataReplicationFactor(1) .setPipeAutoSplitFullEnabled(false); receiverEnv .getConfig() @@ -60,8 +58,6 @@ protected void setupConfig() { .setConfigNodeConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setPipeMemoryManagementEnabled(false) - .setSchemaReplicationFactor(1) - .setDataReplicationFactor(1) .setIsPipeEnableMemoryCheck(false) .setPipeAutoSplitFullEnabled(false); From 4095372af227b551e83a3e0a566f5da4fdb0e37d Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Mon, 23 Mar 2026 20:15:03 +0800 Subject: [PATCH 05/11] Revert "side-effect" This reverts commit cd104eaf5dd6dcd8d2a86b334c41c12506d2f72b. --- .../iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java index a8b483f1e312..122f66582cec 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java @@ -520,7 +520,7 @@ public void testSpecialPartialInsert() throws Exception { Arrays.asList( "create timeSeries root.vehicle.d0.s1 double", "create timeSeries root.vehicle.d0.s2 float", - "insert into root.vehicle.d0(time, s1, s2) values (2, 1, 'abc')"), + "insert into root.vehicle.d0(time, s1, s2) values (2, 1, 'abc'), (3, 1, 2)"), null); } catch (final Exception e) { Assert.assertEquals( @@ -531,7 +531,7 @@ public void testSpecialPartialInsert() throws Exception { TestUtils.assertDataEventuallyOnEnv( receiverEnv, "select * from root.vehicle.**", - "Time,root.vehicle.d0.s1,", - Collections.singleton("2,1.0,")); + "Time,root.vehicle.d0.s1,root.vehicle.d0.s2,", + new HashSet<>(Arrays.asList("2,1.0,null,", "3,1.0,2.0,"))); } } From e4dcfbbf7d5e27962fc320bfab15e717c0cc39a8 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Tue, 24 Mar 2026 11:07:07 +0800 Subject: [PATCH 06/11] under-flow --- .../iotdb/db/it/IoTDBRestServiceIT.java | 4 +- .../it/autocreate/IoTDBPipeDataSinkIT.java | 60 ++++++++++++++----- 2 files changed, 47 insertions(+), 17 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestServiceIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestServiceIT.java index 53b92918877d..2af450f3a817 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestServiceIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestServiceIT.java @@ -78,7 +78,7 @@ public void tearDown() throws Exception { EnvFactory.getEnv().cleanClusterEnvironment(); } - private String getAuthorization(String username, String password) { + public static String getAuthorization(String username, String password) { return Base64.getEncoder() .encodeToString((username + ":" + password).getBytes(StandardCharsets.UTF_8)); } @@ -128,7 +128,7 @@ public void ping() { } } - private HttpPost getHttpPost(String url) { + public static HttpPost getHttpPost(String url) { HttpPost httpPost = new HttpPost(url); httpPost.addHeader("Content-type", "application/json; charset=utf-8"); httpPost.setHeader("Accept", "application/json"); diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java index 122f66582cec..f864d30134f3 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java @@ -28,11 +28,16 @@ import org.apache.iotdb.itbase.category.MultiClusterIT2AutoCreateSchema; import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClientBuilder; import org.junit.Assert; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; +import java.nio.charset.Charset; import java.sql.Connection; import java.sql.Statement; import java.util.Arrays; @@ -42,9 +47,18 @@ import java.util.Map; import java.util.function.Consumer; +import static org.apache.iotdb.db.it.IoTDBRestServiceIT.getHttpPost; + @RunWith(IoTDBTestRunner.class) @Category({MultiClusterIT2AutoCreateSchema.class}) public class IoTDBPipeDataSinkIT extends AbstractPipeDualAutoIT { + + @Override + protected void setupConfig() { + super.setupConfig(); + senderEnv.getConfig().getDataNodeConfig().setEnableRestService(true); + } + @Test public void testThriftSinkWithRealtimeFirstDisabled() throws Exception { final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0); @@ -514,24 +528,40 @@ public void testSpecialPartialInsert() throws Exception { receiverEnv.getDataNodeWrapperList().get(0).getIpAndPortString())); } - try { - TestUtils.executeNonQueries( - senderEnv, - Arrays.asList( - "create timeSeries root.vehicle.d0.s1 double", - "create timeSeries root.vehicle.d0.s2 float", - "insert into root.vehicle.d0(time, s1, s2) values (2, 1, 'abc'), (3, 1, 2)"), - null); - } catch (final Exception e) { - Assert.assertEquals( - "org.apache.iotdb.jdbc.IoTDBSQLException: 507: Fail to insert measurements [s2] caused by [data type is not consistent, input 'abc', registered FLOAT]", - e.getMessage()); + CloseableHttpClient httpClient = HttpClientBuilder.create().build(); + + HttpPost httpPost = + getHttpPost( + "http://127.0.0.1:" + + senderEnv.getDataNodeWrapper(0).getRestServicePort() + + "/rest/v2/insertRecords"); + String json = + "{\"timestamps\":[1635232113960,1635232151960,1635232143960,1635232143960],\"measurements_list\":[[\"s33\",\"s44\"],[\"s55\",\"s66\"],[\"s77\",\"s88\"],[\"s771\",\"s881\"]],\"data_types_list\":[[\"INT32\",\"INT64\"],[\"FLOAT\",\"DOUBLE\"],[\"FLOAT\",\"DOUBLE\"],[\"BOOLEAN\",\"TEXT\"]],\"values_list\":[[1,false],[2.1,2],[4,6],[false,\"cccccc\"]],\"is_aligned\":false,\"devices\":[\"root.s1\",\"root.s1\",\"root.s1\",\"root.s3\"]}"; + httpPost.setEntity(new StringEntity(json, Charset.defaultCharset())); + for (int i = 0; i < 30; i++) { + try { + httpClient.execute(httpPost); + break; + } catch (final Exception e) { + if (i == 29) { + throw e; + } + try { + Thread.sleep(1000); + } catch (InterruptedException ex) { + throw new RuntimeException(ex); + } + } } TestUtils.assertDataEventuallyOnEnv( receiverEnv, - "select * from root.vehicle.**", - "Time,root.vehicle.d0.s1,root.vehicle.d0.s2,", - new HashSet<>(Arrays.asList("2,1.0,null,", "3,1.0,2.0,"))); + "select * from root.s1", + "Time,root.s1.s88,root.s1.s77,root.s1.s66,root.s1.s55,root.s1.s44,root.s1.s33,", + new HashSet<>( + Arrays.asList( + "1635232113960,null,null,null,null,null,1,", + "1635232151960,null,null,2.0,2.1,null,null,", + "1635232143960,6.0,4.0,null,null,null,null,"))); } } From 47ae64bbdbb097165f55956520a555e6117d2ac3 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Tue, 24 Mar 2026 11:11:50 +0800 Subject: [PATCH 07/11] fix --- .../protocol/rest/v2/handler/StatementConstructionHandler.java | 1 - 1 file changed, 1 deletion(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/handler/StatementConstructionHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/handler/StatementConstructionHandler.java index c0559b40c2d9..9542d8c9c6d8 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/handler/StatementConstructionHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/handler/StatementConstructionHandler.java @@ -251,7 +251,6 @@ public static InsertRowsStatement createInsertRowsStatement( entry.getValue())); // markFailedMeasurement will set datatype and measurements null // setting them back in order to pass the schema validation - statement.getDataTypes()[index] = dataType; statement.getMeasurements()[index] = measurement; } } From 3ec92fda880fcd7064e7042c765919b5dacb7ca0 Mon Sep 17 00:00:00 2001 From: HTHou Date: Tue, 24 Mar 2026 14:36:22 +0800 Subject: [PATCH 08/11] Fix rest partial insert serde error --- .../rest/v2/handler/StatementConstructionHandler.java | 1 + .../db/queryengine/plan/planner/LogicalPlanVisitor.java | 5 +++++ 2 files changed, 6 insertions(+) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/handler/StatementConstructionHandler.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/handler/StatementConstructionHandler.java index 9542d8c9c6d8..c0559b40c2d9 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/handler/StatementConstructionHandler.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/rest/v2/handler/StatementConstructionHandler.java @@ -251,6 +251,7 @@ public static InsertRowsStatement createInsertRowsStatement( entry.getValue())); // markFailedMeasurement will set datatype and measurements null // setting them back in order to pass the schema validation + statement.getDataTypes()[index] = dataType; statement.getMeasurements()[index] = measurement; } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java index ee2ccc305866..be0b28a0282a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java @@ -714,6 +714,11 @@ public PlanNode visitInsertRows( insertRowStatement.getTime(), insertRowStatement.getValues(), insertRowStatement.isNeedInferType()); + if (insertRowStatement.getFailedMeasurementInfoMap() != null) { + for (Integer index : insertRowStatement.getFailedMeasurementInfoMap().keySet()) { + insertRowNode.markFailedMeasurement(index); + } + } insertRowNode.setFailedMeasurementNumber(insertRowStatement.getFailedMeasurementNumber()); insertRowsNode.addOneInsertRowNode(insertRowNode, i); } From 804cce09716fee51bf240a55c4fdbc01304867b7 Mon Sep 17 00:00:00 2001 From: HTHou Date: Tue, 24 Mar 2026 15:15:58 +0800 Subject: [PATCH 09/11] fix testSpecialPartialInsert --- .../pipe/it/autocreate/IoTDBPipeDataSinkIT.java | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java index f864d30134f3..86e534810745 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java @@ -532,7 +532,9 @@ public void testSpecialPartialInsert() throws Exception { HttpPost httpPost = getHttpPost( - "http://127.0.0.1:" + "http://" + + senderEnv.getDataNodeWrapper(0).getIp() + + ":" + senderEnv.getDataNodeWrapper(0).getRestServicePort() + "/rest/v2/insertRecords"); String json = @@ -556,12 +558,12 @@ public void testSpecialPartialInsert() throws Exception { TestUtils.assertDataEventuallyOnEnv( receiverEnv, - "select * from root.s1", - "Time,root.s1.s88,root.s1.s77,root.s1.s66,root.s1.s55,root.s1.s44,root.s1.s33,", + "select s88, s77, s66, s55, s44, s33 from root.s1", + "Time,root.s1.s88,root.s1.s77,root.s1.s66,root.s1.s55,root.s1.s33,", new HashSet<>( Arrays.asList( - "1635232113960,null,null,null,null,null,1,", - "1635232151960,null,null,2.0,2.1,null,null,", - "1635232143960,6.0,4.0,null,null,null,null,"))); + "1635232113960,null,null,null,null,1,", + "1635232151960,null,null,2.0,2.1,null,", + "1635232143960,6.0,4.0,null,null,null,"))); } } From ae7e07480f0598a5e6df1fbfc04f7da76bcc59a7 Mon Sep 17 00:00:00 2001 From: HTHou Date: Tue, 24 Mar 2026 16:09:01 +0800 Subject: [PATCH 10/11] add IT --- .../iotdb/db/it/IoTDBRestServiceIT.java | 104 ++++++++++++++++++ 1 file changed, 104 insertions(+) diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestServiceIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestServiceIT.java index 2af450f3a817..246150b90faa 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestServiceIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestServiceIT.java @@ -20,12 +20,14 @@ import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant; import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.env.cluster.env.SimpleEnv; import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper; import org.apache.iotdb.it.framework.IoTDBTestRunner; import org.apache.iotdb.itbase.category.ClusterIT; import org.apache.iotdb.itbase.category.LocalStandaloneIT; import org.apache.iotdb.itbase.category.RemoteIT; import org.apache.iotdb.itbase.env.BaseEnv; +import org.apache.iotdb.itbase.exception.InconsistentDataException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.gson.JsonObject; @@ -48,11 +50,17 @@ import java.io.IOException; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Statement; import java.util.ArrayList; import java.util.Base64; import java.util.List; import java.util.Map; +import static org.apache.iotdb.consensus.ConsensusFactory.IOT_CONSENSUS; import static org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant.COLUMN_TTL; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -242,6 +250,102 @@ public void errorInsertRecords(CloseableHttpClient httpClient, String json, Http } } + @Test + public void errorInsertRecords() throws SQLException { + SimpleEnv simpleEnv = new SimpleEnv(); + simpleEnv + .getConfig() + .getCommonConfig() + .setDataRegionConsensusProtocolClass(IOT_CONSENSUS) + .setDataReplicationFactor(3); + simpleEnv.getConfig().getDataNodeConfig().setEnableRestService(true); + simpleEnv.initClusterEnvironment(1, 3); + + CloseableHttpResponse response = null; + CloseableHttpClient httpClient = HttpClientBuilder.create().build(); + try { + HttpPost httpPost = + getHttpPost( + "http://" + + simpleEnv.getDataNodeWrapper(0).getIp() + + ":" + + simpleEnv.getDataNodeWrapper(0).getRestServicePort() + + "/rest/v2/insertRecords"); + String json = + "{\"timestamps\":[1635232113960,1635232151960,1635232143960,1635232143960],\"measurements_list\":[[\"s33\",\"s44\"],[\"s55\",\"s66\"],[\"s77\",\"s88\"],[\"s771\",\"s881\"]],\"data_types_list\":[[\"INT32\",\"INT64\"],[\"FLOAT\",\"DOUBLE\"],[\"FLOAT\",\"DOUBLE\"],[\"BOOLEAN\",\"TEXT\"]],\"values_list\":[[1,false],[2.1,2],[4,6],[false,\"cccccc\"]],\"is_aligned\":false,\"devices\":[\"root.s1\",\"root.s1\",\"root.s1\",\"root.s3\"]}"; + httpPost.setEntity(new StringEntity(json, Charset.defaultCharset())); + for (int i = 0; i < 30; i++) { + try { + response = httpClient.execute(httpPost); + break; + } catch (Exception e) { + if (i == 29) { + throw e; + } + try { + Thread.sleep(1000); + } catch (InterruptedException ex) { + throw new RuntimeException(ex); + } + } + } + + HttpEntity responseEntity = response.getEntity(); + String message = EntityUtils.toString(responseEntity, "utf-8"); + JsonObject result = JsonParser.parseString(message).getAsJsonObject(); + assertEquals(507, Integer.parseInt(result.get("code").toString())); + } catch (IOException e) { + e.printStackTrace(); + fail(e.getMessage()); + } finally { + try { + if (response != null) { + response.close(); + } + } catch (IOException e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + + try { + for (int d = 0; d < 3; d++) { + DataNodeWrapper dataNodeWrapper = simpleEnv.getDataNodeWrapper(d); + dataNodeWrapper.stop(); + + try (Connection connectionAfterNodeDown = + simpleEnv.getConnectionWithSpecifiedDataNode( + simpleEnv.getDataNodeWrapper(d == 2 ? 0 : d + 1)); + Statement statementAfterNodeDown = connectionAfterNodeDown.createStatement()) { + int count = 0; + try (ResultSet resultSet = + statementAfterNodeDown.executeQuery( + "select s88, s77, s66, s55, s44, s33 from root.s1")) { + ResultSetMetaData metaData = resultSet.getMetaData(); + while (resultSet.next()) { + StringBuilder row = new StringBuilder(); + for (int i = 0; i < metaData.getColumnCount(); i++) { + row.append(resultSet.getString(i + 1)).append(","); + } + System.out.println(row); + count++; + } + } + assertEquals(3, count); + } + dataNodeWrapper.start(); + } + } catch (InconsistentDataException e) { + // ignore + } catch (SQLException e) { + if (!e.getMessage().contains("Maybe server is down")) { + throw e; + } + } finally { + simpleEnv.cleanClusterEnvironment(); + } + } + public void rightInsertTablet(CloseableHttpClient httpClient, String json, HttpPost httpPost) { CloseableHttpResponse response = null; try { From 427634be50e17190a6ed31551cb529d377a24aa5 Mon Sep 17 00:00:00 2001 From: HTHou Date: Tue, 24 Mar 2026 17:22:16 +0800 Subject: [PATCH 11/11] fix IT --- .../iotdb/it/env/cluster/env/AbstractEnv.java | 25 +++++++++++++++++++ .../it/env/remote/env/RemoteServerEnv.java | 5 ++++ .../org/apache/iotdb/itbase/env/BaseEnv.java | 6 +++++ .../iotdb/db/it/IoTDBRestServiceIT.java | 21 ++++++++-------- 4 files changed, 46 insertions(+), 11 deletions(-) diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java index 547b010a322a..60ff77aa03c5 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java @@ -440,6 +440,14 @@ public Connection getConnection(String username, String password) throws SQLExce this); } + @Override + public Connection getAvailableConnection(String username, String password) throws SQLException { + return new ClusterTestConnection( + getWriteConnection(null, username, password), + getOneAvailableReadConnection(null, username, password), + this); + } + @Override public Connection getConnection( final DataNodeWrapper dataNodeWrapper, final String username, final String password) @@ -656,6 +664,23 @@ protected List getReadConnections( return readConnRequestDelegate.requestAll(); } + protected List getOneAvailableReadConnection( + final Constant.Version version, final String username, final String password) + throws SQLException { + final List dataNodeWrapperListCopy = new ArrayList<>(dataNodeWrapperList); + Collections.shuffle(dataNodeWrapperListCopy); + SQLException lastException = null; + for (final DataNodeWrapper dataNode : dataNodeWrapperListCopy) { + try { + return getReadConnections(version, dataNode, username, password); + } catch (final SQLException e) { + lastException = e; + } + } + logger.error("Failed to get connection from any DataNode, last exception is ", lastException); + throw lastException; + } + // use this to avoid some runtimeExceptions when try to get jdbc connections. // because it is hard to add retry and handle exception when getting jdbc connections in // getWriteConnectionWithSpecifiedDataNode and getReadConnections. diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java index efd883e41a9d..5c4f91cf5c63 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java @@ -136,6 +136,11 @@ public Connection getConnection(String username, String password) throws SQLExce return connection; } + @Override + public Connection getAvailableConnection(String username, String password) throws SQLException { + throw new UnsupportedOperationException(); + } + @Override public Connection getWriteOnlyConnectionWithSpecifiedDataNode( DataNodeWrapper dataNode, String username, String password) { diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java index c3dc9a3eb322..bede2b001f70 100644 --- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java +++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java @@ -120,6 +120,12 @@ Connection getConnection(Constant.Version version, String username, String passw Connection getConnection(DataNodeWrapper dataNodeWrapper, String username, String password) throws SQLException; + default Connection getAvailableConnection() throws SQLException { + return getAvailableConnection(SessionConfig.DEFAULT_USER, SessionConfig.DEFAULT_PASSWORD); + } + + Connection getAvailableConnection(String username, String password) throws SQLException; + default Connection getWriteOnlyConnectionWithSpecifiedDataNode(DataNodeWrapper dataNode) throws SQLException { return getWriteOnlyConnectionWithSpecifiedDataNode( diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestServiceIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestServiceIT.java index 246150b90faa..ae587bfea457 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestServiceIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestServiceIT.java @@ -27,7 +27,6 @@ import org.apache.iotdb.itbase.category.LocalStandaloneIT; import org.apache.iotdb.itbase.category.RemoteIT; import org.apache.iotdb.itbase.env.BaseEnv; -import org.apache.iotdb.itbase.exception.InconsistentDataException; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.gson.JsonObject; @@ -59,8 +58,10 @@ import java.util.Base64; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import static org.apache.iotdb.consensus.ConsensusFactory.IOT_CONSENSUS; +import static org.apache.iotdb.consensus.ConsensusFactory.RATIS_CONSENSUS; import static org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant.COLUMN_TTL; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -251,13 +252,15 @@ public void errorInsertRecords(CloseableHttpClient httpClient, String json, Http } @Test - public void errorInsertRecords() throws SQLException { + public void errorInsertRecords() throws SQLException, InterruptedException { SimpleEnv simpleEnv = new SimpleEnv(); simpleEnv .getConfig() .getCommonConfig() + .setSchemaRegionConsensusProtocolClass(RATIS_CONSENSUS) + .setSchemaReplicationFactor(3) .setDataRegionConsensusProtocolClass(IOT_CONSENSUS) - .setDataReplicationFactor(3); + .setDataReplicationFactor(2); simpleEnv.getConfig().getDataNodeConfig().setEnableRestService(true); simpleEnv.initClusterEnvironment(1, 3); @@ -307,15 +310,12 @@ public void errorInsertRecords() throws SQLException { fail(e.getMessage()); } } + TimeUnit.SECONDS.sleep(5); try { - for (int d = 0; d < 3; d++) { - DataNodeWrapper dataNodeWrapper = simpleEnv.getDataNodeWrapper(d); + for (DataNodeWrapper dataNodeWrapper : simpleEnv.getDataNodeWrapperList()) { dataNodeWrapper.stop(); - - try (Connection connectionAfterNodeDown = - simpleEnv.getConnectionWithSpecifiedDataNode( - simpleEnv.getDataNodeWrapper(d == 2 ? 0 : d + 1)); + try (Connection connectionAfterNodeDown = simpleEnv.getAvailableConnection(); Statement statementAfterNodeDown = connectionAfterNodeDown.createStatement()) { int count = 0; try (ResultSet resultSet = @@ -334,9 +334,8 @@ public void errorInsertRecords() throws SQLException { assertEquals(3, count); } dataNodeWrapper.start(); + TimeUnit.SECONDS.sleep(1); } - } catch (InconsistentDataException e) { - // ignore } catch (SQLException e) { if (!e.getMessage().contains("Maybe server is down")) { throw e;