Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,14 @@ public Connection getConnection(String username, String password) throws SQLExce
this);
}

@Override
public Connection getAvailableConnection(String username, String password) throws SQLException {
return new ClusterTestConnection(
getWriteConnection(null, username, password),
getOneAvailableReadConnection(null, username, password),
this);
}

@Override
public Connection getConnection(
final DataNodeWrapper dataNodeWrapper, final String username, final String password)
Expand Down Expand Up @@ -656,6 +664,23 @@ protected List<NodeConnection> getReadConnections(
return readConnRequestDelegate.requestAll();
}

protected List<NodeConnection> getOneAvailableReadConnection(
final Constant.Version version, final String username, final String password)
throws SQLException {
final List<DataNodeWrapper> dataNodeWrapperListCopy = new ArrayList<>(dataNodeWrapperList);
Collections.shuffle(dataNodeWrapperListCopy);
SQLException lastException = null;
for (final DataNodeWrapper dataNode : dataNodeWrapperListCopy) {
try {
return getReadConnections(version, dataNode, username, password);
} catch (final SQLException e) {
lastException = e;
}
}
logger.error("Failed to get connection from any DataNode, last exception is ", lastException);
throw lastException;
}

// use this to avoid some runtimeExceptions when try to get jdbc connections.
// because it is hard to add retry and handle exception when getting jdbc connections in
// getWriteConnectionWithSpecifiedDataNode and getReadConnections.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,11 @@ public Connection getConnection(String username, String password) throws SQLExce
return connection;
}

@Override
public Connection getAvailableConnection(String username, String password) throws SQLException {
throw new UnsupportedOperationException();
}

@Override
public Connection getWriteOnlyConnectionWithSpecifiedDataNode(
DataNodeWrapper dataNode, String username, String password) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,12 @@ Connection getConnection(Constant.Version version, String username, String passw
Connection getConnection(DataNodeWrapper dataNodeWrapper, String username, String password)
throws SQLException;

default Connection getAvailableConnection() throws SQLException {
return getAvailableConnection(SessionConfig.DEFAULT_USER, SessionConfig.DEFAULT_PASSWORD);
}

Connection getAvailableConnection(String username, String password) throws SQLException;

default Connection getWriteOnlyConnectionWithSpecifiedDataNode(DataNodeWrapper dataNode)
throws SQLException {
return getWriteOnlyConnectionWithSpecifiedDataNode(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant;
import org.apache.iotdb.it.env.EnvFactory;
import org.apache.iotdb.it.env.cluster.env.SimpleEnv;
import org.apache.iotdb.it.env.cluster.node.DataNodeWrapper;
import org.apache.iotdb.it.framework.IoTDBTestRunner;
import org.apache.iotdb.itbase.category.ClusterIT;
Expand Down Expand Up @@ -48,11 +49,19 @@
import java.io.IOException;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Base64;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.apache.iotdb.consensus.ConsensusFactory.IOT_CONSENSUS;
import static org.apache.iotdb.consensus.ConsensusFactory.RATIS_CONSENSUS;
import static org.apache.iotdb.db.queryengine.common.header.ColumnHeaderConstant.COLUMN_TTL;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
Expand All @@ -78,7 +87,7 @@ public void tearDown() throws Exception {
EnvFactory.getEnv().cleanClusterEnvironment();
}

private String getAuthorization(String username, String password) {
public static String getAuthorization(String username, String password) {
return Base64.getEncoder()
.encodeToString((username + ":" + password).getBytes(StandardCharsets.UTF_8));
}
Expand Down Expand Up @@ -128,7 +137,7 @@ public void ping() {
}
}

private HttpPost getHttpPost(String url) {
public static HttpPost getHttpPost(String url) {
HttpPost httpPost = new HttpPost(url);
httpPost.addHeader("Content-type", "application/json; charset=utf-8");
httpPost.setHeader("Accept", "application/json");
Expand Down Expand Up @@ -242,6 +251,100 @@ public void errorInsertRecords(CloseableHttpClient httpClient, String json, Http
}
}

@Test
public void errorInsertRecords() throws SQLException, InterruptedException {
SimpleEnv simpleEnv = new SimpleEnv();
simpleEnv
.getConfig()
.getCommonConfig()
.setSchemaRegionConsensusProtocolClass(RATIS_CONSENSUS)
.setSchemaReplicationFactor(3)
.setDataRegionConsensusProtocolClass(IOT_CONSENSUS)
.setDataReplicationFactor(2);
simpleEnv.getConfig().getDataNodeConfig().setEnableRestService(true);
simpleEnv.initClusterEnvironment(1, 3);

CloseableHttpResponse response = null;
CloseableHttpClient httpClient = HttpClientBuilder.create().build();
try {
HttpPost httpPost =
getHttpPost(
"http://"
+ simpleEnv.getDataNodeWrapper(0).getIp()
+ ":"
+ simpleEnv.getDataNodeWrapper(0).getRestServicePort()
+ "/rest/v2/insertRecords");
String json =
"{\"timestamps\":[1635232113960,1635232151960,1635232143960,1635232143960],\"measurements_list\":[[\"s33\",\"s44\"],[\"s55\",\"s66\"],[\"s77\",\"s88\"],[\"s771\",\"s881\"]],\"data_types_list\":[[\"INT32\",\"INT64\"],[\"FLOAT\",\"DOUBLE\"],[\"FLOAT\",\"DOUBLE\"],[\"BOOLEAN\",\"TEXT\"]],\"values_list\":[[1,false],[2.1,2],[4,6],[false,\"cccccc\"]],\"is_aligned\":false,\"devices\":[\"root.s1\",\"root.s1\",\"root.s1\",\"root.s3\"]}";
httpPost.setEntity(new StringEntity(json, Charset.defaultCharset()));
for (int i = 0; i < 30; i++) {
try {
response = httpClient.execute(httpPost);
break;
} catch (Exception e) {
if (i == 29) {
throw e;
}
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
}

HttpEntity responseEntity = response.getEntity();
String message = EntityUtils.toString(responseEntity, "utf-8");
JsonObject result = JsonParser.parseString(message).getAsJsonObject();
assertEquals(507, Integer.parseInt(result.get("code").toString()));
} catch (IOException e) {
e.printStackTrace();
fail(e.getMessage());
} finally {
try {
if (response != null) {
response.close();
}
} catch (IOException e) {
e.printStackTrace();
fail(e.getMessage());
}
}
TimeUnit.SECONDS.sleep(5);

try {
for (DataNodeWrapper dataNodeWrapper : simpleEnv.getDataNodeWrapperList()) {
dataNodeWrapper.stop();
try (Connection connectionAfterNodeDown = simpleEnv.getAvailableConnection();
Statement statementAfterNodeDown = connectionAfterNodeDown.createStatement()) {
int count = 0;
try (ResultSet resultSet =
statementAfterNodeDown.executeQuery(
"select s88, s77, s66, s55, s44, s33 from root.s1")) {
ResultSetMetaData metaData = resultSet.getMetaData();
while (resultSet.next()) {
StringBuilder row = new StringBuilder();
for (int i = 0; i < metaData.getColumnCount(); i++) {
row.append(resultSet.getString(i + 1)).append(",");
}
System.out.println(row);
count++;
}
}
assertEquals(3, count);
}
dataNodeWrapper.start();
TimeUnit.SECONDS.sleep(1);
}
} catch (SQLException e) {
if (!e.getMessage().contains("Maybe server is down")) {
throw e;
}
} finally {
simpleEnv.cleanClusterEnvironment();
}
}

public void rightInsertTablet(CloseableHttpClient httpClient, String json, HttpPost httpPost) {
CloseableHttpResponse response = null;
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,16 @@
import org.apache.iotdb.itbase.category.MultiClusterIT2AutoCreateSchema;
import org.apache.iotdb.rpc.TSStatusCode;

import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.junit.Assert;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;

import java.nio.charset.Charset;
import java.sql.Connection;
import java.sql.Statement;
import java.util.Arrays;
Expand All @@ -42,11 +47,20 @@
import java.util.Map;
import java.util.function.Consumer;

import static org.apache.iotdb.db.it.IoTDBRestServiceIT.getHttpPost;

@RunWith(IoTDBTestRunner.class)
@Category({MultiClusterIT2AutoCreateSchema.class})
public class IoTDBPipeDataSinkIT extends AbstractPipeDualAutoIT {

@Override
protected void setupConfig() {
super.setupConfig();
senderEnv.getConfig().getDataNodeConfig().setEnableRestService(true);
}

@Test
public void testThriftConnectorWithRealtimeFirstDisabled() throws Exception {
public void testThriftSinkWithRealtimeFirstDisabled() throws Exception {
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);

final String receiverIp = receiverDataNode.getIp();
Expand Down Expand Up @@ -196,7 +210,7 @@ private void testSinkFormat(final String format) throws Exception {
}

@Test
public void testLegacyConnector() throws Exception {
public void testLegacySink() throws Exception {
final DataNodeWrapper receiverDataNode = receiverEnv.getDataNodeWrapper(0);

final String receiverIp = receiverDataNode.getIp();
Expand Down Expand Up @@ -503,4 +517,53 @@ private void testLoadTsFileWithoutVerify(final String loadTsFileStrategy) throws
Collections.unmodifiableSet(new HashSet<>(Arrays.asList("1,1.0,", "2,1.0,"))));
}
}

@Test
public void testSpecialPartialInsert() throws Exception {
try (final Connection connection = senderEnv.getConnection();
final Statement statement = connection.createStatement()) {
statement.execute(
String.format(
"create pipe a2b with sink ('node-urls'='%s')",
receiverEnv.getDataNodeWrapperList().get(0).getIpAndPortString()));
}

CloseableHttpClient httpClient = HttpClientBuilder.create().build();

HttpPost httpPost =
getHttpPost(
"http://"
+ senderEnv.getDataNodeWrapper(0).getIp()
+ ":"
+ senderEnv.getDataNodeWrapper(0).getRestServicePort()
+ "/rest/v2/insertRecords");
String json =
"{\"timestamps\":[1635232113960,1635232151960,1635232143960,1635232143960],\"measurements_list\":[[\"s33\",\"s44\"],[\"s55\",\"s66\"],[\"s77\",\"s88\"],[\"s771\",\"s881\"]],\"data_types_list\":[[\"INT32\",\"INT64\"],[\"FLOAT\",\"DOUBLE\"],[\"FLOAT\",\"DOUBLE\"],[\"BOOLEAN\",\"TEXT\"]],\"values_list\":[[1,false],[2.1,2],[4,6],[false,\"cccccc\"]],\"is_aligned\":false,\"devices\":[\"root.s1\",\"root.s1\",\"root.s1\",\"root.s3\"]}";
httpPost.setEntity(new StringEntity(json, Charset.defaultCharset()));
for (int i = 0; i < 30; i++) {
try {
httpClient.execute(httpPost);
break;
} catch (final Exception e) {
if (i == 29) {
throw e;
}
try {
Thread.sleep(1000);
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}
}

TestUtils.assertDataEventuallyOnEnv(
receiverEnv,
"select s88, s77, s66, s55, s44, s33 from root.s1",
"Time,root.s1.s88,root.s1.s77,root.s1.s66,root.s1.s55,root.s1.s33,",
new HashSet<>(
Arrays.asList(
"1635232113960,null,null,null,null,1,",
"1635232151960,null,null,2.0,2.1,null,",
"1635232143960,6.0,4.0,null,null,null,")));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -714,6 +714,11 @@ public PlanNode visitInsertRows(
insertRowStatement.getTime(),
insertRowStatement.getValues(),
insertRowStatement.isNeedInferType());
if (insertRowStatement.getFailedMeasurementInfoMap() != null) {
for (Integer index : insertRowStatement.getFailedMeasurementInfoMap().keySet()) {
insertRowNode.markFailedMeasurement(index);
}
}
insertRowNode.setFailedMeasurementNumber(insertRowStatement.getFailedMeasurementNumber());
insertRowsNode.addOneInsertRowNode(insertRowNode, i);
}
Expand Down
Loading