Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.iotdb.commons.schema.column.ColumnHeaderConstant;
import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.it.env.cluster.env.SimpleEnv;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.ClusterIT;
Expand All @@ -41,19 +42,28 @@
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Base64;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.apache.iotdb.commons.schema.column.ColumnHeaderConstant.COLUMN_TTL;
import static org.apache.iotdb.consensus.ConsensusFactory.IOT_CONSENSUS;
import static org.apache.iotdb.consensus.ConsensusFactory.RATIS_CONSENSUS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
Expand All @@ -79,7 +89,7 @@ public void tearDown() throws Exception {
EnvFactory.getEnv().cleanClusterEnvironment();
}

private String getAuthorization(String username, String password) {
public static String getAuthorization(String username, String password) {
return Base64.getEncoder()
.encodeToString((username + ":" + password).getBytes(StandardCharsets.UTF_8));
}
Expand Down Expand Up @@ -129,7 +139,7 @@ public void ping() {
}
}

private HttpPost getHttpPost(String url) {
public static HttpPost getHttpPost(String url) {
HttpPost httpPost = new HttpPost(url);
httpPost.addHeader("Content-type", "application/json; charset=utf-8");
httpPost.setHeader("Accept", "application/json");
Expand Down Expand Up @@ -243,6 +253,101 @@ public void errorInsertRecords(CloseableHttpClient httpClient, String json, Http
}
}

@Ignore // Flaky test
@Test
public void errorInsertRecords() throws SQLException, InterruptedException {
SimpleEnv simpleEnv = new SimpleEnv();
simpleEnv
.getConfig()
.getCommonConfig()
.setSchemaRegionConsensusProtocolClass(RATIS_CONSENSUS)
.setSchemaReplicationFactor(3)
.setDataRegionConsensusProtocolClass(IOT_CONSENSUS)
.setDataReplicationFactor(2);
simpleEnv.getConfig().getDataNodeConfig().setEnableRestService(true);
simpleEnv.initClusterEnvironment(1, 3);

CloseableHttpResponse response = null;
CloseableHttpClient httpClient = HttpClientBuilder.create().build();
try {
HttpPost httpPost =
getHttpPost(
"http://"
+ simpleEnv.getDataNodeWrapper(0).getIp()
+ ":"
+ simpleEnv.getDataNodeWrapper(0).getRestServicePort()
+ "/rest/v2/insertRecords");
String json =
"{\"timestamps\":[1635232113960,1635232151960,1635232143960,1635232143960],\"measurements_list\":[[\"s33\",\"s44\"],[\"s55\",\"s66\"],[\"s77\",\"s88\"],[\"s771\",\"s881\"]],\"data_types_list\":[[\"INT32\",\"INT64\"],[\"FLOAT\",\"DOUBLE\"],[\"FLOAT\",\"DOUBLE\"],[\"BOOLEAN\",\"TEXT\"]],\"values_list\":[[1,false],[2.1,2],[4,6],[false,\"cccccc\"]],\"is_aligned\":false,\"devices\":[\"root.s1\",\"root.s1\",\"root.s1\",\"root.s3\"]}";
httpPost.setEntity(new StringEntity(json, Charset.defaultCharset()));
for (int i = 0; i < 30; i++) {
try {
response = httpClient.execute(httpPost);
break;
} catch (Exception e) {
if (i == 29) {
throw e;
}
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
}

HttpEntity responseEntity = response.getEntity();
String message = EntityUtils.toString(responseEntity, "utf-8");
JsonObject result = JsonParser.parseString(message).getAsJsonObject();
assertEquals(507, Integer.parseInt(result.get("code").toString()));
} catch (IOException e) {
e.printStackTrace();
fail(e.getMessage());
} finally {
try {
if (response != null) {
response.close();
}
} catch (IOException e) {
e.printStackTrace();
fail(e.getMessage());
}
}
TimeUnit.SECONDS.sleep(5);

try {
for (DataNodeWrapper dataNodeWrapper : simpleEnv.getDataNodeWrapperList()) {
dataNodeWrapper.stop();
try (Connection connectionAfterNodeDown = simpleEnv.getAvailableConnection();
Statement statementAfterNodeDown = connectionAfterNodeDown.createStatement()) {
int count = 0;
try (ResultSet resultSet =
statementAfterNodeDown.executeQuery(
"select s88, s77, s66, s55, s44, s33 from root.s1")) {
ResultSetMetaData metaData = resultSet.getMetaData();
while (resultSet.next()) {
StringBuilder row = new StringBuilder();
for (int i = 0; i < metaData.getColumnCount(); i++) {
row.append(resultSet.getString(i + 1)).append(",");
}
System.out.println(row);
count++;
}
}
assertEquals(3, count);
}
dataNodeWrapper.start();
TimeUnit.SECONDS.sleep(1);
}
} catch (SQLException e) {
if (!e.getMessage().contains("Maybe server is down")) {
throw e;
}
} finally {
simpleEnv.cleanClusterEnvironment();
}
}

public void rightInsertTablet(CloseableHttpClient httpClient, String json, HttpPost httpPost) {
CloseableHttpResponse response = null;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,17 @@
import org.apache.iotdb.pipe.it.dual.treemodel.auto.AbstractPipeDualTreeModelAutoIT;
import org.apache.iotdb.rpc.TSStatusCode;

import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

import java.nio.charset.Charset;
import java.sql.Connection;
import java.sql.Statement;
import java.util.Arrays;
Expand All @@ -44,6 +49,8 @@
import java.util.Map;
import java.util.function.Consumer;

import static org.apache.iotdb.db.it.IoTDBRestServiceIT.getHttpPost;

@RunWith(IoTDBTestRunner.class)
@Category({MultiClusterIT2DualTreeAutoBasic.class})
public class IoTDBPipeDataSinkIT extends AbstractPipeDualTreeModelAutoIT {
Expand All @@ -54,8 +61,14 @@ public void setUp() {
super.setUp();
}

@Override
protected void setupConfig() {
super.setupConfig();
senderEnv.getConfig().getDataNodeConfig().setEnableRestService(true);
}

@Test
public void testThriftConnectorWithRealtimeFirstDisabled() throws Exception {
public void testThriftSinkWithRealtimeFirstDisabled() throws Exception {
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);

final String receiverIp = receiverDataNode.getIp();
Expand Down Expand Up @@ -207,7 +220,7 @@ private void testSinkFormat(final String format) throws Exception {
}

@Test
public void testLegacyConnector() throws Exception {
public void testLegacySink() throws Exception {
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);

final String receiverIp = receiverDataNode.getIp();
Expand Down Expand Up @@ -514,4 +527,53 @@ private void testLoadTsFileWithoutVerify(final String loadTsFileStrategy) throws
Collections.unmodifiableSet(new HashSet<>(Arrays.asList("1,1.0,", "2,1.0,"))));
}
}

@Test
public void testSpecialPartialInsert() throws Exception {
try (final Connection connection = senderEnv.getConnection();
final Statement statement = connection.createStatement()) {
statement.execute(
String.format(
"create pipe a2b with sink ('node-urls'='%s')",
receiverEnv.getDataNodeWrapperList().get(0).getIpAndPortString()));
}

CloseableHttpClient httpClient = HttpClientBuilder.create().build();

HttpPost httpPost =
getHttpPost(
"http://"
+ senderEnv.getDataNodeWrapper(0).getIp()
+ ":"
+ senderEnv.getDataNodeWrapper(0).getRestServicePort()
+ "/rest/v2/insertRecords");
String json =
"{\"timestamps\":[1635232113960,1635232151960,1635232143960,1635232143960],\"measurements_list\":[[\"s33\",\"s44\"],[\"s55\",\"s66\"],[\"s77\",\"s88\"],[\"s771\",\"s881\"]],\"data_types_list\":[[\"INT32\",\"INT64\"],[\"FLOAT\",\"DOUBLE\"],[\"FLOAT\",\"DOUBLE\"],[\"BOOLEAN\",\"TEXT\"]],\"values_list\":[[1,false],[2.1,2],[4,6],[false,\"cccccc\"]],\"is_aligned\":false,\"devices\":[\"root.s1\",\"root.s1\",\"root.s1\",\"root.s3\"]}";
httpPost.setEntity(new StringEntity(json, Charset.defaultCharset()));
for (int i = 0; i < 30; i++) {
try {
httpClient.execute(httpPost);
break;
} catch (final Exception e) {
if (i == 29) {
throw e;
}
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
}

TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
"select s88, s77, s66, s55, s44, s33 from root.s1",
"Time,root.s1.s88,root.s1.s77,root.s1.s66,root.s1.s55,root.s1.s33,",
new HashSet<>(
Arrays.asList(
"1635232113960,null,null,null,null,1,",
"1635232151960,null,null,2.0,2.1,null,",
"1635232143960,6.0,4.0,null,null,null,")));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@
}

@Override
public PlanNode visitQuery(QueryStatement queryStatement, MPPQueryContext context) {

Check warning on line 148 in iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/LogicalPlanVisitor.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

A "Brain Method" was detected. Refactor it to reduce at least one of the following metrics: LOC from 92 to 64, Complexity from 16 to 14, Nesting Level from 3 to 2, Number of Variables from 7 to 6.

See more on https://sonarcloud.io/project/issues?id=apache_iotdb&issues=AZ0fPL1OyvgWYOgM4t7h&open=AZ0fPL1OyvgWYOgM4t7h&pullRequest=17340
if (analysis.allDevicesInOneTemplate()) {
return new TemplatedLogicalPlan(analysis, queryStatement, context).visitQuery();
}
Expand Down Expand Up @@ -792,6 +792,11 @@
insertRowStatement.getTime(),
insertRowStatement.getValues(),
insertRowStatement.isNeedInferType());
if (insertRowStatement.getFailedMeasurementInfoMap() != null) {
for (Integer index : insertRowStatement.getFailedMeasurementInfoMap().keySet()) {
insertRowNode.markFailedMeasurement(index);
}
}
insertRowNode.setFailedMeasurementNumber(insertRowStatement.getFailedMeasurementNumber());
insertRowsNode.addOneInsertRowNode(insertRowNode, i);
}
Expand Down
Loading