diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java index 547b010a322ac..60ff77aa03c52 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/cluster/env/AbstractEnv.java @@ -440,6 +440,14 @@ public Connection getConnection(String username, String password) throws SQLExce this); } + @Override + public Connection getAvailableConnection(String username, String password) throws SQLException { + return new ClusterTestConnection( + getWriteConnection(null, username, password), + getOneAvailableReadConnection(null, username, password), + this); + } + @Override public Connection getConnection( final DataNodeWrapper dataNodeWrapper, final String username, final String password) @@ -656,6 +664,23 @@ protected List getReadConnections( return readConnRequestDelegate.requestAll(); } + protected List getOneAvailableReadConnection( + final Constant.Version version, final String username, final String password) + throws SQLException { + final List dataNodeWrapperListCopy = new ArrayList<>(dataNodeWrapperList); + Collections.shuffle(dataNodeWrapperListCopy); + SQLException lastException = null; + for (final DataNodeWrapper dataNode : dataNodeWrapperListCopy) { + try { + return getReadConnections(version, dataNode, username, password); + } catch (final SQLException e) { + lastException = e; + } + } + logger.error("Failed to get connection from any DataNode, last exception is ", lastException); + throw lastException; + } + // use this to avoid some runtimeExceptions when try to get jdbc connections. // because it is hard to add retry and handle exception when getting jdbc connections in // getWriteConnectionWithSpecifiedDataNode and getReadConnections. diff --git a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java index efd883e41a9d3..5c4f91cf5c635 100644 --- a/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java +++ b/integration-test/src/main/java/org/apache/iotdb/it/env/remote/env/RemoteServerEnv.java @@ -136,6 +136,11 @@ public Connection getConnection(String username, String password) throws SQLExce return connection; } + @Override + public Connection getAvailableConnection(String username, String password) throws SQLException { + throw new UnsupportedOperationException(); + } + @Override public Connection getWriteOnlyConnectionWithSpecifiedDataNode( DataNodeWrapper dataNode, String username, String password) { diff --git a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java index c3dc9a3eb3222..bede2b001f709 100644 --- a/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java +++ b/integration-test/src/main/java/org/apache/iotdb/itbase/env/BaseEnv.java @@ -120,6 +120,12 @@ Connection getConnection(Constant.Version version, String username, String passw Connection getConnection(DataNodeWrapper dataNodeWrapper, String username, String password) throws SQLException; + default Connection getAvailableConnection() throws SQLException { + return getAvailableConnection(SessionConfig.DEFAULT_USER, SessionConfig.DEFAULT_PASSWORD); + } + + Connection getAvailableConnection(String username, String password) throws SQLException; + default Connection getWriteOnlyConnectionWithSpecifiedDataNode(DataNodeWrapper dataNode) throws SQLException { return getWriteOnlyConnectionWithSpecifiedDataNode( diff --git a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestServiceIT.java b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestServiceIT.java index 53b92918877d5..ae587bfea457a 100644 --- a/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestServiceIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/db/it/IoTDBRestServiceIT.java @@ -20,6 +20,7 @@ import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant; import org.apache.iotdb.it.env.EnvFactory; +import org.apache.iotdb.it.env.cluster.env.SimpleEnv; import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper; import org.apache.iotdb.it.framework.IoTDBTestRunner; import org.apache.iotdb.itbase.category.ClusterIT; @@ -48,11 +49,19 @@ import java.io.IOException; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; +import java.sql.Statement; import java.util.ArrayList; import java.util.Base64; import java.util.List; import java.util.Map; +import java.util.concurrent.TimeUnit; +import static org.apache.iotdb.consensus.ConsensusFactory.IOT_CONSENSUS; +import static org.apache.iotdb.consensus.ConsensusFactory.RATIS_CONSENSUS; import static org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant.COLUMN_TTL; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -78,7 +87,7 @@ public void tearDown() throws Exception { EnvFactory.getEnv().cleanClusterEnvironment(); } - private String getAuthorization(String username, String password) { + public static String getAuthorization(String username, String password) { return Base64.getEncoder() .encodeToString((username + ":" + password).getBytes(StandardCharsets.UTF_8)); } @@ -128,7 +137,7 @@ public void ping() { } } - private HttpPost getHttpPost(String url) { + public static HttpPost getHttpPost(String url) { HttpPost httpPost = new HttpPost(url); httpPost.addHeader("Content-type", "application/json; charset=utf-8"); httpPost.setHeader("Accept", "application/json"); @@ -242,6 +251,100 @@ public void errorInsertRecords(CloseableHttpClient httpClient, String json, Http } } + @Test + public void errorInsertRecords() throws SQLException, InterruptedException { + SimpleEnv simpleEnv = new SimpleEnv(); + simpleEnv + .getConfig() + .getCommonConfig() + .setSchemaRegionConsensusProtocolClass(RATIS_CONSENSUS) + .setSchemaReplicationFactor(3) + .setDataRegionConsensusProtocolClass(IOT_CONSENSUS) + .setDataReplicationFactor(2); + simpleEnv.getConfig().getDataNodeConfig().setEnableRestService(true); + simpleEnv.initClusterEnvironment(1, 3); + + CloseableHttpResponse response = null; + CloseableHttpClient httpClient = HttpClientBuilder.create().build(); + try { + HttpPost httpPost = + getHttpPost( + "http://" + + simpleEnv.getDataNodeWrapper(0).getIp() + + ":" + + simpleEnv.getDataNodeWrapper(0).getRestServicePort() + + "/rest/v2/insertRecords"); + String json = + "{\"timestamps\":[1635232113960,1635232151960,1635232143960,1635232143960],\"measurements_list\":[[\"s33\",\"s44\"],[\"s55\",\"s66\"],[\"s77\",\"s88\"],[\"s771\",\"s881\"]],\"data_types_list\":[[\"INT32\",\"INT64\"],[\"FLOAT\",\"DOUBLE\"],[\"FLOAT\",\"DOUBLE\"],[\"BOOLEAN\",\"TEXT\"]],\"values_list\":[[1,false],[2.1,2],[4,6],[false,\"cccccc\"]],\"is_aligned\":false,\"devices\":[\"root.s1\",\"root.s1\",\"root.s1\",\"root.s3\"]}"; + httpPost.setEntity(new StringEntity(json, Charset.defaultCharset())); + for (int i = 0; i < 30; i++) { + try { + response = httpClient.execute(httpPost); + break; + } catch (Exception e) { + if (i == 29) { + throw e; + } + try { + Thread.sleep(1000); + } catch (InterruptedException ex) { + throw new RuntimeException(ex); + } + } + } + + HttpEntity responseEntity = response.getEntity(); + String message = EntityUtils.toString(responseEntity, "utf-8"); + JsonObject result = JsonParser.parseString(message).getAsJsonObject(); + assertEquals(507, Integer.parseInt(result.get("code").toString())); + } catch (IOException e) { + e.printStackTrace(); + fail(e.getMessage()); + } finally { + try { + if (response != null) { + response.close(); + } + } catch (IOException e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + TimeUnit.SECONDS.sleep(5); + + try { + for (DataNodeWrapper dataNodeWrapper : simpleEnv.getDataNodeWrapperList()) { + dataNodeWrapper.stop(); + try (Connection connectionAfterNodeDown = simpleEnv.getAvailableConnection(); + Statement statementAfterNodeDown = connectionAfterNodeDown.createStatement()) { + int count = 0; + try (ResultSet resultSet = + statementAfterNodeDown.executeQuery( + "select s88, s77, s66, s55, s44, s33 from root.s1")) { + ResultSetMetaData metaData = resultSet.getMetaData(); + while (resultSet.next()) { + StringBuilder row = new StringBuilder(); + for (int i = 0; i < metaData.getColumnCount(); i++) { + row.append(resultSet.getString(i + 1)).append(","); + } + System.out.println(row); + count++; + } + } + assertEquals(3, count); + } + dataNodeWrapper.start(); + TimeUnit.SECONDS.sleep(1); + } + } catch (SQLException e) { + if (!e.getMessage().contains("Maybe server is down")) { + throw e; + } + } finally { + simpleEnv.cleanClusterEnvironment(); + } + } + public void rightInsertTablet(CloseableHttpClient httpClient, String json, HttpPost httpPost) { CloseableHttpResponse response = null; try { diff --git a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java index 5216fe20a27e4..86e534810745b 100644 --- a/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/pipe/it/autocreate/IoTDBPipeDataSinkIT.java @@ -28,11 +28,16 @@ import org.apache.iotdb.itbase.category.MultiClusterIT2AutoCreateSchema; import org.apache.iotdb.rpc.TSStatusCode; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClientBuilder; import org.junit.Assert; import org.junit.Test; import org.junit.experimental.categories.Category; import org.junit.runner.RunWith; +import java.nio.charset.Charset; import java.sql.Connection; import java.sql.Statement; import java.util.Arrays; @@ -42,11 +47,20 @@ import java.util.Map; import java.util.function.Consumer; +import static org.apache.iotdb.db.it.IoTDBRestServiceIT.getHttpPost; + @RunWith(IoTDBTestRunner.class) @Category({MultiClusterIT2AutoCreateSchema.class}) public class IoTDBPipeDataSinkIT extends AbstractPipeDualAutoIT { + + @Override + protected void setupConfig() { + super.setupConfig(); + senderEnv.getConfig().getDataNodeConfig().setEnableRestService(true); + } + @Test - public void testThriftConnectorWithRealtimeFirstDisabled() throws Exception { + public void testThriftSinkWithRealtimeFirstDisabled() throws Exception { final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0); final String receiverIp = receiverDataNode.getIp(); @@ -196,7 +210,7 @@ private void testSinkFormat(final String format) throws Exception { } @Test - public void testLegacyConnector() throws Exception { + public void testLegacySink() throws Exception { final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0); final String receiverIp = receiverDataNode.getIp(); @@ -503,4 +517,53 @@ private void testLoadTsFileWithoutVerify(final String loadTsFileStrategy) throws Collections.unmodifiableSet(new HashSet<>(Arrays.asList("1,1.0,", "2,1.0,")))); } } + + @Test + public void testSpecialPartialInsert() throws Exception { + try (final Connection connection = senderEnv.getConnection(); + final Statement statement = connection.createStatement()) { + statement.execute( + String.format( + "create pipe a2b with sink ('node-urls'='%s')", + receiverEnv.getDataNodeWrapperList().get(0).getIpAndPortString())); + } + + CloseableHttpClient httpClient = HttpClientBuilder.create().build(); + + HttpPost httpPost = + getHttpPost( + "http://" + + senderEnv.getDataNodeWrapper(0).getIp() + + ":" + + senderEnv.getDataNodeWrapper(0).getRestServicePort() + + "/rest/v2/insertRecords"); + String json = + "{\"timestamps\":[1635232113960,1635232151960,1635232143960,1635232143960],\"measurements_list\":[[\"s33\",\"s44\"],[\"s55\",\"s66\"],[\"s77\",\"s88\"],[\"s771\",\"s881\"]],\"data_types_list\":[[\"INT32\",\"INT64\"],[\"FLOAT\",\"DOUBLE\"],[\"FLOAT\",\"DOUBLE\"],[\"BOOLEAN\",\"TEXT\"]],\"values_list\":[[1,false],[2.1,2],[4,6],[false,\"cccccc\"]],\"is_aligned\":false,\"devices\":[\"root.s1\",\"root.s1\",\"root.s1\",\"root.s3\"]}"; + httpPost.setEntity(new StringEntity(json, Charset.defaultCharset())); + for (int i = 0; i < 30; i++) { + try { + httpClient.execute(httpPost); + break; + } catch (final Exception e) { + if (i == 29) { + throw e; + } + try { + Thread.sleep(1000); + } catch (InterruptedException ex) { + throw new RuntimeException(ex); + } + } + } + + TestUtils.assertDataEventuallyOnEnv( + receiverEnv, + "select s88, s77, s66, s55, s44, s33 from root.s1", + "Time,root.s1.s88,root.s1.s77,root.s1.s66,root.s1.s55,root.s1.s33,", + new HashSet<>( + Arrays.asList( + "1635232113960,null,null,null,null,1,", + "1635232151960,null,null,2.0,2.1,null,", + "1635232143960,6.0,4.0,null,null,null,"))); + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java index ee2ccc3058662..be0b28a0282ac 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java @@ -714,6 +714,11 @@ public PlanNode visitInsertRows( insertRowStatement.getTime(), insertRowStatement.getValues(), insertRowStatement.isNeedInferType()); + if (insertRowStatement.getFailedMeasurementInfoMap() != null) { + for (Integer index : insertRowStatement.getFailedMeasurementInfoMap().keySet()) { + insertRowNode.markFailedMeasurement(index); + } + } insertRowNode.setFailedMeasurementNumber(insertRowStatement.getFailedMeasurementNumber()); insertRowsNode.addOneInsertRowNode(insertRowNode, i); }