From 37826e4e9aa4164b40310596626e99b5d8e636ba Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Mon, 23 Mar 2026 19:24:59 +0800 Subject: [PATCH 01/10] try --- .../auto/basic/IoTDBPipeDataSinkIT.java | 29 +++++++++++++++++-- 1 file changed, 27 insertions(+), 2 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java index 1fc21fd22b0b..1e3c9427c7ed 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java @@ -55,7 +55,7 @@ public void setUp() { } @Test - public void testThriftConnectorWithRealtimeFirstDisabled() throws Exception { + public void testThriftSinkWithRealtimeFirstDisabled() throws Exception { final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0); final String receiverIp = receiverDataNode.getIp(); @@ -207,7 +207,7 @@ private void testSinkFormat(final String format) throws Exception { } @Test - public void testLegacyConnector() throws Exception { + public void testLegacySink() throws Exception { final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0); final String receiverIp = receiverDataNode.getIp(); @@ -514,4 +514,29 @@ private void testLoadTsFileWithoutVerify(final String loadTsFileStrategy) throws Collections.unmodifiableSet(new HashSet<>(Arrays.asList("1,1.0,", "2,1.0,")))); } } + + @Test + public void testSpecialPartialInsert() throws Exception { + try (final Connection connection = senderEnv.getConnection(); + final Statement statement = connection.createStatement()) { + statement.execute( + String.format( + "create pipe a2b with sink ('node-urls'='%s')", + receiverEnv.getDataNodeWrapperList().get(0).getIpAndPortString())); + } + + TestUtils.executeNonQueries( + senderEnv, + Arrays.asList( + "create timeSeries root.vehicle.d0.s1 double", + "create timeSeries root.vehicle.d0.s2 float", + "insert into root.vehicle.d0(time, s1, s2) values (2, 1, abc)"), + null); + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "select * from root.vehicle.**", + "Time,root.vehicle.d0.s1,root.vehicle.d0.s2", + Collections.singleton("2,1.0,null")); + } } From 8e187793e6ce37f81a42edeab8255502c18538d3 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Mon, 23 Mar 2026 19:28:11 +0800 Subject: [PATCH 02/10] ' --- .../pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java index 1e3c9427c7ed..2c6e0b13249a 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java @@ -530,7 +530,7 @@ public void testSpecialPartialInsert() throws Exception { Arrays.asList( "create timeSeries root.vehicle.d0.s1 double", "create timeSeries root.vehicle.d0.s2 float", - "insert into root.vehicle.d0(time, s1, s2) values (2, 1, abc)"), + "insert into root.vehicle.d0(time, s1, s2) values (2, 1, 'abc')"), null); TestUtils.assertDataEventuallyOnEnv( From a66a46d08d999f54cb08c018094b05949bc62a1a Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Mon, 23 Mar 2026 20:02:03 +0800 Subject: [PATCH 03/10] no-bomb --- .../AbstractPipeTableModelDualManualIT.java | 8 +++++-- .../auto/basic/IoTDBPipeDataSinkIT.java | 24 ++++++++++++------- 2 files changed, 21 insertions(+), 11 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/AbstractPipeTableModelDualManualIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/AbstractPipeTableModelDualManualIT.java index 3b3fae80902f..88614682bec9 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/AbstractPipeTableModelDualManualIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/AbstractPipeTableModelDualManualIT.java @@ -37,8 +37,8 @@ public void setUp() { senderEnv = MultiEnvFactory.getEnv(0); receiverEnv = MultiEnvFactory.getEnv(1); setupConfig(); - senderEnv.initClusterEnvironment(); - receiverEnv.initClusterEnvironment(); + senderEnv.initClusterEnvironment(1, 1); + receiverEnv.initClusterEnvironment(1, 1); } protected void setupConfig() { @@ -51,6 +51,8 @@ protected void setupConfig() { .setEnforceStrongPassword(false) .setPipeMemoryManagementEnabled(false) .setIsPipeEnableMemoryCheck(false) + .setSchemaReplicationFactor(1) + .setDataReplicationFactor(1) .setPipeAutoSplitFullEnabled(false); receiverEnv .getConfig() @@ -60,6 +62,8 @@ protected void setupConfig() { .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setEnforceStrongPassword(false) .setPipeMemoryManagementEnabled(false) + .setSchemaReplicationFactor(1) + .setDataReplicationFactor(1) .setIsPipeEnableMemoryCheck(false) .setPipeAutoSplitFullEnabled(false); diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java index 2c6e0b13249a..e7cbc168caf8 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java @@ -525,18 +525,24 @@ public void testSpecialPartialInsert() throws Exception { receiverEnv.getDataNodeWrapperList().get(0).getIpAndPortString())); } - TestUtils.executeNonQueries( - senderEnv, - Arrays.asList( - "create timeSeries root.vehicle.d0.s1 double", - "create timeSeries root.vehicle.d0.s2 float", - "insert into root.vehicle.d0(time, s1, s2) values (2, 1, 'abc')"), - null); + try { + TestUtils.executeNonQueries( + senderEnv, + Arrays.asList( + "create timeSeries root.vehicle.d0.s1 double", + "create timeSeries root.vehicle.d0.s2 float", + "insert into root.vehicle.d0(time, s1, s2) values (2, 1, 'abc')"), + null); + } catch (final Exception e) { + Assert.assertEquals( + "org.apache.iotdb.jdbc.IoTDBSQLException: 507: Fail to insert measurements [s2] caused by [data type is not consistent, input 'abc', registered FLOAT]", + e.getMessage()); + } TestUtils.assertDataEventuallyOnEnv( receiverEnv, "select * from root.vehicle.**", - "Time,root.vehicle.d0.s1,root.vehicle.d0.s2", - Collections.singleton("2,1.0,null")); + "Time,root.vehicle.d0.s1,", + Collections.singleton("2,1.0,")); } } From 7a875b147e6dca5df1afa6ba141e7c282c3243c9 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Mon, 23 Mar 2026 20:05:51 +0800 Subject: [PATCH 04/10] fix --- .../manual/AbstractPipeTableModelDualManualIT.java | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/AbstractPipeTableModelDualManualIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/AbstractPipeTableModelDualManualIT.java index 88614682bec9..3b3fae80902f 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/AbstractPipeTableModelDualManualIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/tablemodel/manual/AbstractPipeTableModelDualManualIT.java @@ -37,8 +37,8 @@ public void setUp() { senderEnv = MultiEnvFactory.getEnv(0); receiverEnv = MultiEnvFactory.getEnv(1); setupConfig(); - senderEnv.initClusterEnvironment(1, 1); - receiverEnv.initClusterEnvironment(1, 1); + senderEnv.initClusterEnvironment(); + receiverEnv.initClusterEnvironment(); } protected void setupConfig() { @@ -51,8 +51,6 @@ protected void setupConfig() { .setEnforceStrongPassword(false) .setPipeMemoryManagementEnabled(false) .setIsPipeEnableMemoryCheck(false) - .setSchemaReplicationFactor(1) - .setDataReplicationFactor(1) .setPipeAutoSplitFullEnabled(false); receiverEnv .getConfig() @@ -62,8 +60,6 @@ protected void setupConfig() { .setSchemaRegionConsensusProtocolClass(ConsensusFactory.RATIS_CONSENSUS) .setEnforceStrongPassword(false) .setPipeMemoryManagementEnabled(false) - .setSchemaReplicationFactor(1) - .setDataReplicationFactor(1) .setIsPipeEnableMemoryCheck(false) .setPipeAutoSplitFullEnabled(false); From 5b4149981a1c0531627d566b97e0abd96abf5d02 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Mon, 23 Mar 2026 20:15:03 +0800 Subject: [PATCH 05/10] Revert "side-effect" This reverts commit cd104eaf5dd6dcd8d2a86b334c41c12506d2f72b. --- .../it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java index e7cbc168caf8..81777195770b 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java @@ -531,7 +531,7 @@ public void testSpecialPartialInsert() throws Exception { Arrays.asList( "create timeSeries root.vehicle.d0.s1 double", "create timeSeries root.vehicle.d0.s2 float", - "insert into root.vehicle.d0(time, s1, s2) values (2, 1, 'abc')"), + "insert into root.vehicle.d0(time, s1, s2) values (2, 1, 'abc'), (3, 1, 2)"), null); } catch (final Exception e) { Assert.assertEquals( @@ -542,7 +542,7 @@ public void testSpecialPartialInsert() throws Exception { TestUtils.assertDataEventuallyOnEnv( receiverEnv, "select * from root.vehicle.**", - "Time,root.vehicle.d0.s1,", - Collections.singleton("2,1.0,")); + "Time,root.vehicle.d0.s1,root.vehicle.d0.s2,", + new HashSet<>(Arrays.asList("2,1.0,null,", "3,1.0,2.0,"))); } } From 52dbfac2df6bc4922424fd76486523562c48dd71 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Tue, 24 Mar 2026 11:07:07 +0800 Subject: [PATCH 06/10] fix --- .../iotdb/db/it/IoTDBRestServiceIT.java | 4 +- .../auto/basic/IoTDBPipeDataSinkIT.java | 59 ++++++++++++++----- 2 files changed, 46 insertions(+), 17 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestServiceIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestServiceIT.java index d84a2493cc25..c8fa13cd7f6a 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestServiceIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestServiceIT.java @@ -79,7 +79,7 @@ public void tearDown() throws Exception { EnvFactory.getEnv().cleanClusterEnvironment(); } - private String getAuthorization(String username, String password) { + public static String getAuthorization(String username, String password) { return Base64.getEncoder() .encodeToString((username + ":" + password).getBytes(StandardCharsets.UTF_8)); } @@ -129,7 +129,7 @@ public void ping() { } } - private HttpPost getHttpPost(String url) { + public static HttpPost getHttpPost(String url) { HttpPost httpPost = new HttpPost(url); httpPost.addHeader("Content-type", "application/json; charset=utf-8"); httpPost.setHeader("Accept", "application/json"); diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java index 81777195770b..144accbc5b9d 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java @@ -29,12 +29,17 @@ import org.apache.iotdb.pipe.it.dual.treemodel.auto.AbstractPipeDualTreeModelAutoIT; import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClientBuilder; import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; +import java.nio.charset.Charset; import java.sql.Connection; import java.sql.Statement; import java.util.Arrays; @@ -44,6 +49,8 @@ import java.util.Map; import java.util.function.Consumer; +import static org.apache.iotdb.db.it.IoTDBRestServiceIT.getHttpPost; + @RunWith(IoTDBTestRunner.class) @Category({MultiClusterIT2DualTreeAutoBasic.class}) public class IoTDBPipeDataSinkIT extends AbstractPipeDualTreeModelAutoIT { @@ -54,6 +61,12 @@ public void setUp() { super.setUp(); } + @Override + protected void setupConfig() { + super.setupConfig(); + senderEnv.getConfig().getDataNodeConfig().setEnableRestService(true); + } + @Test public void testThriftSinkWithRealtimeFirstDisabled() throws Exception { final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0); @@ -525,24 +538,40 @@ public void testSpecialPartialInsert() throws Exception { receiverEnv.getDataNodeWrapperList().get(0).getIpAndPortString())); } - try { - TestUtils.executeNonQueries( - senderEnv, - Arrays.asList( - "create timeSeries root.vehicle.d0.s1 double", - "create timeSeries root.vehicle.d0.s2 float", - "insert into root.vehicle.d0(time, s1, s2) values (2, 1, 'abc'), (3, 1, 2)"), - null); - } catch (final Exception e) { - Assert.assertEquals( - "org.apache.iotdb.jdbc.IoTDBSQLException: 507: Fail to insert measurements [s2] caused by [data type is not consistent, input 'abc', registered FLOAT]", - e.getMessage()); + CloseableHttpClient httpClient = HttpClientBuilder.create().build(); + + HttpPost httpPost = + getHttpPost( + "http://127.0.0.1:" + + senderEnv.getDataNodeWrapper(0).getRestServicePort() + + "/rest/v2/insertRecords"); + String json = + "{\"timestamps\":[1635232113960,1635232151960,1635232143960,1635232143960],\"measurements_list\":[[\"s33\",\"s44\"],[\"s55\",\"s66\"],[\"s77\",\"s88\"],[\"s771\",\"s881\"]],\"data_types_list\":[[\"INT32\",\"INT64\"],[\"FLOAT\",\"DOUBLE\"],[\"FLOAT\",\"DOUBLE\"],[\"BOOLEAN\",\"TEXT\"]],\"values_list\":[[1,false],[2.1,2],[4,6],[false,\"cccccc\"]],\"is_aligned\":false,\"devices\":[\"root.s1\",\"root.s1\",\"root.s1\",\"root.s3\"]}"; + httpPost.setEntity(new StringEntity(json, Charset.defaultCharset())); + for (int i = 0; i < 30; i++) { + try { + httpClient.execute(httpPost); + break; + } catch (final Exception e) { + if (i == 29) { + throw e; + } + try { + Thread.sleep(1000); + } catch (InterruptedException ex) { + throw new RuntimeException(ex); + } + } } TestUtils.assertDataEventuallyOnEnv( receiverEnv, - "select * from root.vehicle.**", - "Time,root.vehicle.d0.s1,root.vehicle.d0.s2,", - new HashSet<>(Arrays.asList("2,1.0,null,", "3,1.0,2.0,"))); + "select * from root.s1", + "Time,root.s1.s88,root.s1.s77,root.s1.s66,root.s1.s55,root.s1.s44,root.s1.s33,", + new HashSet<>( + Arrays.asList( + "1635232113960,null,null,null,null,null,1,", + "1635232151960,null,null,2.0,2.1,null,null,", + "1635232143960,6.0,4.0,null,null,null,null,"))); } } From 7091ef754171ce5e04334b6d67ebd6d3a5dbf0ea Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Tue, 24 Mar 2026 11:11:50 +0800 Subject: [PATCH 07/10] fix --- .../rest/protocol/v2/handler/StatementConstructionHandler.java | 1 - 1 file changed, 1 deletion(-) diff --git a/external-service-impl/rest/src/main/java/org/apache/iotdb/rest/protocol/v2/handler/StatementConstructionHandler.java b/external-service-impl/rest/src/main/java/org/apache/iotdb/rest/protocol/v2/handler/StatementConstructionHandler.java index 6668244e9e4f..131d76bb60b5 100644 --- a/external-service-impl/rest/src/main/java/org/apache/iotdb/rest/protocol/v2/handler/StatementConstructionHandler.java +++ b/external-service-impl/rest/src/main/java/org/apache/iotdb/rest/protocol/v2/handler/StatementConstructionHandler.java @@ -253,7 +253,6 @@ public static InsertRowsStatement createInsertRowsStatement( entry.getValue())); // markFailedMeasurement will set datatype and measurements null // setting them back in order to pass the schema validation - statement.getDataTypes()[index] = dataType; statement.getMeasurements()[index] = measurement; } } From d157a828933ecdf4aabf1b154742323422d5ec01 Mon Sep 17 00:00:00 2001 From: HTHou Date: Tue, 24 Mar 2026 17:38:35 +0800 Subject: [PATCH 08/10] fix bug --- .../handler/StatementConstructionHandler.java | 1 + .../iotdb/db/it/IoTDBRestServiceIT.java | 103 ++++++++++++++++++ .../auto/basic/IoTDBPipeDataSinkIT.java | 14 +-- .../plan/planner/LogicalPlanVisitor.java | 5 + 4 files changed, 115 insertions(+), 8 deletions(-) diff --git a/external-service-impl/rest/src/main/java/org/apache/iotdb/rest/protocol/v2/handler/StatementConstructionHandler.java b/external-service-impl/rest/src/main/java/org/apache/iotdb/rest/protocol/v2/handler/StatementConstructionHandler.java index 131d76bb60b5..6668244e9e4f 100644 --- a/external-service-impl/rest/src/main/java/org/apache/iotdb/rest/protocol/v2/handler/StatementConstructionHandler.java +++ b/external-service-impl/rest/src/main/java/org/apache/iotdb/rest/protocol/v2/handler/StatementConstructionHandler.java @@ -253,6 +253,7 @@ public static InsertRowsStatement createInsertRowsStatement( entry.getValue())); // markFailedMeasurement will set datatype and measurements null // setting them back in order to pass the schema validation + statement.getDataTypes()[index] = dataType; statement.getMeasurements()[index] = measurement; } } diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestServiceIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestServiceIT.java index c8fa13cd7f6a..c85d9dc34f20 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestServiceIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestServiceIT.java @@ -20,6 +20,7 @@ import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant; import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.env.cluster.env.SimpleEnv; import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper; import org.apache.iotdb.it.framework.IoTDBTestRunner; import org.apache.iotdb.itbase.category.ClusterIT; @@ -48,12 +49,20 @@ import java.io.IOException; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Statement; import java.util.ArrayList; import java.util.Base64; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; import static org.apache.iotdb.commons.schema.column.ColumnHeaderConstant.COLUMN_TTL; +import static org.apache.iotdb.consensus.ConsensusFactory.IOT_CONSENSUS; +import static org.apache.iotdb.consensus.ConsensusFactory.RATIS_CONSENSUS; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -243,6 +252,100 @@ public void errorInsertRecords(CloseableHttpClient httpClient, String json, Http } } + @Test + public void errorInsertRecords() throws SQLException, InterruptedException { + SimpleEnv simpleEnv = new SimpleEnv(); + simpleEnv + .getConfig() + .getCommonConfig() + .setSchemaRegionConsensusProtocolClass(RATIS_CONSENSUS) + .setSchemaReplicationFactor(3) + .setDataRegionConsensusProtocolClass(IOT_CONSENSUS) + .setDataReplicationFactor(2); + simpleEnv.getConfig().getDataNodeConfig().setEnableRestService(true); + simpleEnv.initClusterEnvironment(1, 3); + + CloseableHttpResponse response = null; + CloseableHttpClient httpClient = HttpClientBuilder.create().build(); + try { + HttpPost httpPost = + getHttpPost( + "http://" + + simpleEnv.getDataNodeWrapper(0).getIp() + + ":" + + simpleEnv.getDataNodeWrapper(0).getRestServicePort() + + "/rest/v2/insertRecords"); + String json = + "{\"timestamps\":[1635232113960,1635232151960,1635232143960,1635232143960],\"measurements_list\":[[\"s33\",\"s44\"],[\"s55\",\"s66\"],[\"s77\",\"s88\"],[\"s771\",\"s881\"]],\"data_types_list\":[[\"INT32\",\"INT64\"],[\"FLOAT\",\"DOUBLE\"],[\"FLOAT\",\"DOUBLE\"],[\"BOOLEAN\",\"TEXT\"]],\"values_list\":[[1,false],[2.1,2],[4,6],[false,\"cccccc\"]],\"is_aligned\":false,\"devices\":[\"root.s1\",\"root.s1\",\"root.s1\",\"root.s3\"]}"; + httpPost.setEntity(new StringEntity(json, Charset.defaultCharset())); + for (int i = 0; i < 30; i++) { + try { + response = httpClient.execute(httpPost); + break; + } catch (Exception e) { + if (i == 29) { + throw e; + } + try { + Thread.sleep(1000); + } catch (InterruptedException ex) { + throw new RuntimeException(ex); + } + } + } + + HttpEntity responseEntity = response.getEntity(); + String message = EntityUtils.toString(responseEntity, "utf-8"); + JsonObject result = JsonParser.parseString(message).getAsJsonObject(); + assertEquals(507, Integer.parseInt(result.get("code").toString())); + } catch (IOException e) { + e.printStackTrace(); + fail(e.getMessage()); + } finally { + try { + if (response != null) { + response.close(); + } + } catch (IOException e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + TimeUnit.SECONDS.sleep(5); + + try { + for (DataNodeWrapper dataNodeWrapper : simpleEnv.getDataNodeWrapperList()) { + dataNodeWrapper.stop(); + try (Connection connectionAfterNodeDown = simpleEnv.getAvailableConnection(); + Statement statementAfterNodeDown = connectionAfterNodeDown.createStatement()) { + int count = 0; + try (ResultSet resultSet = + statementAfterNodeDown.executeQuery( + "select s88, s77, s66, s55, s44, s33 from root.s1")) { + ResultSetMetaData metaData = resultSet.getMetaData(); + while (resultSet.next()) { + StringBuilder row = new StringBuilder(); + for (int i = 0; i < metaData.getColumnCount(); i++) { + row.append(resultSet.getString(i + 1)).append(","); + } + System.out.println(row); + count++; + } + } + assertEquals(3, count); + } + dataNodeWrapper.start(); + TimeUnit.SECONDS.sleep(1); + } + } catch (SQLException e) { + if (!e.getMessage().contains("Maybe server is down")) { + throw e; + } + } finally { + simpleEnv.cleanClusterEnvironment(); + } + } + public void rightInsertTablet(CloseableHttpClient httpClient, String json, HttpPost httpPost) { CloseableHttpResponse response = null; try { diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java index 144accbc5b9d..8f193ce58c58 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java @@ -542,9 +542,7 @@ public void testSpecialPartialInsert() throws Exception { HttpPost httpPost = getHttpPost( - "http://127.0.0.1:" - + senderEnv.getDataNodeWrapper(0).getRestServicePort() - + "/rest/v2/insertRecords"); + "http://" + senderEnv.getDataNodeWrapper(0).getIp() + ":" + "/rest/v2/insertRecords"); String json = "{\"timestamps\":[1635232113960,1635232151960,1635232143960,1635232143960],\"measurements_list\":[[\"s33\",\"s44\"],[\"s55\",\"s66\"],[\"s77\",\"s88\"],[\"s771\",\"s881\"]],\"data_types_list\":[[\"INT32\",\"INT64\"],[\"FLOAT\",\"DOUBLE\"],[\"FLOAT\",\"DOUBLE\"],[\"BOOLEAN\",\"TEXT\"]],\"values_list\":[[1,false],[2.1,2],[4,6],[false,\"cccccc\"]],\"is_aligned\":false,\"devices\":[\"root.s1\",\"root.s1\",\"root.s1\",\"root.s3\"]}"; httpPost.setEntity(new StringEntity(json, Charset.defaultCharset())); @@ -566,12 +564,12 @@ public void testSpecialPartialInsert() throws Exception { TestUtils.assertDataEventuallyOnEnv( receiverEnv, - "select * from root.s1", - "Time,root.s1.s88,root.s1.s77,root.s1.s66,root.s1.s55,root.s1.s44,root.s1.s33,", + "select s88, s77, s66, s55, s44, s33 from root.s1", + "Time,root.s1.s88,root.s1.s77,root.s1.s66,root.s1.s55,root.s1.s33,", new HashSet<>( Arrays.asList( - "1635232113960,null,null,null,null,null,1,", - "1635232151960,null,null,2.0,2.1,null,null,", - "1635232143960,6.0,4.0,null,null,null,null,"))); + "1635232113960,null,null,null,null,1,", + "1635232151960,null,null,2.0,2.1,null,", + "1635232143960,6.0,4.0,null,null,null,"))); } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java index c2fba05dae69..f69fbd91196a 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java @@ -792,6 +792,11 @@ public PlanNode visitInsertRows( insertRowStatement.getTime(), insertRowStatement.getValues(), insertRowStatement.isNeedInferType()); + if (insertRowStatement.getFailedMeasurementInfoMap() != null) { + for (Integer index : insertRowStatement.getFailedMeasurementInfoMap().keySet()) { + insertRowNode.markFailedMeasurement(index); + } + } insertRowNode.setFailedMeasurementNumber(insertRowStatement.getFailedMeasurementNumber()); insertRowsNode.addOneInsertRowNode(insertRowNode, i); } From 1be8839b729c710ddd8c02ede0f00386c9f1b75e Mon Sep 17 00:00:00 2001 From: HTHou Date: Tue, 24 Mar 2026 18:34:04 +0800 Subject: [PATCH 09/10] fix bug --- .../it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java index 8f193ce58c58..36a0ea0cc92b 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/dual/treemodel/auto/basic/IoTDBPipeDataSinkIT.java @@ -542,7 +542,11 @@ public void testSpecialPartialInsert() throws Exception { HttpPost httpPost = getHttpPost( - "http://" + senderEnv.getDataNodeWrapper(0).getIp() + ":" + "/rest/v2/insertRecords"); + "http://" + + senderEnv.getDataNodeWrapper(0).getIp() + + ":" + + senderEnv.getDataNodeWrapper(0).getRestServicePort() + + "/rest/v2/insertRecords"); String json = "{\"timestamps\":[1635232113960,1635232151960,1635232143960,1635232143960],\"measurements_list\":[[\"s33\",\"s44\"],[\"s55\",\"s66\"],[\"s77\",\"s88\"],[\"s771\",\"s881\"]],\"data_types_list\":[[\"INT32\",\"INT64\"],[\"FLOAT\",\"DOUBLE\"],[\"FLOAT\",\"DOUBLE\"],[\"BOOLEAN\",\"TEXT\"]],\"values_list\":[[1,false],[2.1,2],[4,6],[false,\"cccccc\"]],\"is_aligned\":false,\"devices\":[\"root.s1\",\"root.s1\",\"root.s1\",\"root.s3\"]}"; httpPost.setEntity(new StringEntity(json, Charset.defaultCharset())); From 402b1cd630d5291b52028f2294378469710e7a4a Mon Sep 17 00:00:00 2001 From: HTHou Date: Wed, 25 Mar 2026 09:34:27 +0800 Subject: [PATCH 10/10] ignore flaky test --- .../test/java/org/apache/iotdb/db/it/IoTDBRestServiceIT.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestServiceIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestServiceIT.java index c85d9dc34f20..97287dfaba20 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestServiceIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestServiceIT.java @@ -42,6 +42,7 @@ import org.junit.After; import org.junit.Assert; import org.junit.Before; +import org.junit.Ignore; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; @@ -252,6 +253,7 @@ public void errorInsertRecords(CloseableHttpClient httpClient, String json, Http } } + @Ignore // Flaky test @Test public void errorInsertRecords() throws SQLException, InterruptedException { SimpleEnv simpleEnv = new SimpleEnv();