Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,7 @@ jobs:
uses: actions/setup-java@v4
with:
distribution: "temurin"
java-version: |
8
21
java-version: 21
cache: "maven"
- name: Build and install libraries
run: mvn --batch-mode --no-transfer-progress --show-version --strict-checksums --threads 2 -Dmaven.wagon.rto=30000 -Dj8 -DskipITs install
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ public class ClickHouseTestClient implements ClickHouseClient {

@Override
public boolean accept(ClickHouseProtocol protocol) {
return true;
return protocol == ClickHouseProtocol.MYSQL; // to avoid taking this client in other tests
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,39 @@
import java.util.zip.GZIPOutputStream;

public abstract class ClientIntegrationTest extends BaseIntegrationTest {

public static final ClickHouseOption CUSTOM_HTTP_PARAMS_OPT = new ClickHouseOption() {

Check warning on line 99 in clickhouse-client/src/test/java/com/clickhouse/client/ClientIntegrationTest.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this use of "ClickHouseOption"; it is deprecated.

See more on https://sonarcloud.io/project/issues?id=ClickHouse_clickhouse-java&issues=AZzj14Y45tXfSMydqK8w&open=AZzj14Y45tXfSMydqK8w&pullRequest=2782
@Override
public Serializable getDefaultValue() {
return null;
}

@Override
public String getDescription() {
return "";
}

@Override
public String getKey() {
return "custom_http_params";
}

@Override
public Class<? extends Serializable> getValueType() {
return String.class;
}

@Override
public boolean isSensitive() {
return false;
}

@Override
public String name() {
return "custom_http_params";
}
};

protected void checkRowCount(String queryOrTableName, int expectedRowCount) throws ClickHouseException {
try (ClickHouseClient client = getClient()) {
checkRowCount(newRequest(client, getServer()).format(ClickHouseFormat.RowBinaryWithNamesAndTypes),
Expand Down Expand Up @@ -1869,7 +1902,7 @@
try (ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory.getInstance()
.createPipedOutputStream(config)) {
// start the worker thread which transfer data from the input into ClickHouse
future = request.data(stream.getInputStream()).execute();
future = request.data(stream.getInputStream()).set("async_insert", "0").execute();
// write bytes into the piped stream
for (int i = 0; i < rows; i++) {
BinaryStreamUtils.writeInt64(stream, i);
Expand Down Expand Up @@ -1999,7 +2032,7 @@
CompletableFuture<ClickHouseResponse> future;
try (ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory.getInstance().createPipedOutputStream(config)) {
// start the worker thread which transfer data from the input into ClickHouse
future = request.data(stream.getInputStream()).execute();
future = request.data(stream.getInputStream()).set("async_insert", "0").execute();
for (int i = 0; i < numberOfRecords; i++) {
BinaryStreamUtils.writeBytes(stream, String.format("{\"i\": %s, \"\": \"JSON\"}", i).getBytes(StandardCharsets.UTF_8));
}
Expand Down Expand Up @@ -2208,7 +2241,7 @@
throw new SkipException("Transaction was supported since 22.7");
}

ClickHouseRequest<?> txRequest = newRequest(client, server).transaction();
ClickHouseRequest<?> txRequest = newRequest(client, server).set("async_insert", "0").transaction();

Check warning on line 2244 in clickhouse-client/src/test/java/com/clickhouse/client/ClientIntegrationTest.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this use of "ClickHouseRequest"; it is deprecated.

See more on https://sonarcloud.io/project/issues?id=ClickHouse_clickhouse-java&issues=AZzj14Y45tXfSMydqK8x&open=AZzj14Y45tXfSMydqK8x&pullRequest=2782
try (ClickHouseResponse response = txRequest.query("insert into " + tableName + " values(1)(2)(3)")
.executeAndWait()) {
// ignore
Expand Down Expand Up @@ -2349,7 +2382,7 @@
throw new SkipException("Transaction was supported since 22.7");
}

ClickHouseRequest<?> request = newRequest(client, server).transaction();
ClickHouseRequest<?> request = newRequest(client, server).set("async_insert", "0").transaction();

Check warning on line 2385 in clickhouse-client/src/test/java/com/clickhouse/client/ClientIntegrationTest.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this use of "ClickHouseRequest"; it is deprecated.

See more on https://sonarcloud.io/project/issues?id=ClickHouse_clickhouse-java&issues=AZzj14Y45tXfSMydqK8y&open=AZzj14Y45tXfSMydqK8y&pullRequest=2782
ClickHouseTransaction tx = request.getTransaction();
try (ClickHouseResponse response = newRequest(client, server)
.query("insert into " + tableName + " values(0, '?')").executeAndWait()) {
Expand All @@ -2367,7 +2400,7 @@
rows += 3;

checkRowCount(request, tableName, rows);
ClickHouseRequest<?> otherRequest = newRequest(client, server).transaction(tx);
ClickHouseRequest<?> otherRequest = newRequest(client, server).set("async_insert", "0").transaction(tx);

Check warning on line 2403 in clickhouse-client/src/test/java/com/clickhouse/client/ClientIntegrationTest.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this use of "ClickHouseRequest"; it is deprecated.

See more on https://sonarcloud.io/project/issues?id=ClickHouse_clickhouse-java&issues=AZzj14Y45tXfSMydqK8z&open=AZzj14Y45tXfSMydqK8z&pullRequest=2782
checkRowCount(otherRequest, tableName, rows);
checkRowCount(tableName, rows);

Expand Down Expand Up @@ -2413,8 +2446,8 @@
throw new SkipException("Transaction was supported since 22.7");
}

ClickHouseRequest<?> req1 = newRequest(client, server).transaction();
ClickHouseRequest<?> req2 = newRequest(client, server).transaction();
ClickHouseRequest<?> req1 = newRequest(client, server).set("async_insert", "0").transaction();

Check warning on line 2449 in clickhouse-client/src/test/java/com/clickhouse/client/ClientIntegrationTest.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this use of "ClickHouseRequest"; it is deprecated.

See more on https://sonarcloud.io/project/issues?id=ClickHouse_clickhouse-java&issues=AZzj14Y45tXfSMydqK80&open=AZzj14Y45tXfSMydqK80&pullRequest=2782
ClickHouseRequest<?> req2 = newRequest(client, server).set("async_insert", "0").transaction();

Check warning on line 2450 in clickhouse-client/src/test/java/com/clickhouse/client/ClientIntegrationTest.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this use of "ClickHouseRequest"; it is deprecated.

See more on https://sonarcloud.io/project/issues?id=ClickHouse_clickhouse-java&issues=AZzj14Y45tXfSMydqK81&open=AZzj14Y45tXfSMydqK81&pullRequest=2782
try (ClickHouseResponse response = req1.query("insert into " + tableName + " values(1)").executeAndWait()) {
// ignore
}
Expand Down Expand Up @@ -2557,6 +2590,7 @@
checkRowCount(tableName, 0);
request.transaction(1);
try (ClickHouseResponse response = request.write().query("insert into " + tableName + " values(1)(2)(3)")
.set("async_insert", "0")
.executeAndWait()) {
// ignore
}
Expand Down Expand Up @@ -2591,7 +2625,7 @@
if (!checkServerVersion(client, server, "[22.7,)")) {
throw new SkipException("Transaction was supported since 22.7");
}
ClickHouseRequest<?> request = newRequest(client, server);
ClickHouseRequest<?> request = newRequest(client, server).set("async_insert", "0");

Check warning on line 2628 in clickhouse-client/src/test/java/com/clickhouse/client/ClientIntegrationTest.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this use of "ClickHouseRequest"; it is deprecated.

See more on https://sonarcloud.io/project/issues?id=ClickHouse_clickhouse-java&issues=AZzj14Y45tXfSMydqK82&open=AZzj14Y45tXfSMydqK82&pullRequest=2782
ClickHouseTransaction.setImplicitTransaction(request, true);
try (ClickHouseResponse response = request.query("insert into " + tableName + " values(1)")
.executeAndWait()) {
Expand Down Expand Up @@ -2640,7 +2674,7 @@
try (ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory.getInstance()
.createPipedOutputStream(config)) {
// start the worker thread which transfer data from the input into ClickHouse
future = request.data(stream.getInputStream()).execute();
future = request.data(stream.getInputStream()).set("async_insert", "0").execute();
// write bytes into the piped stream
LongStream.range(0, numRows).forEachOrdered(
n -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@

try (ClickHouseClient client = ClickHouseClient.newInstance()) {

ClickHouseRequest<?> req1 = newRequest(client, server);
ClickHouseRequest<?> req1 = newRequest(client, server).option(ClickHouseHttpOption.CUSTOM_PARAMS, "async_insert=0");

Check warning on line 86 in clickhouse-http-client/src/test/java/com/clickhouse/client/http/ApacheHttpConnectionImplTest.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this use of "CUSTOM_PARAMS"; it is deprecated.

See more on https://sonarcloud.io/project/issues?id=ClickHouse_clickhouse-java&issues=AZzj14cU5tXfSMydqK84&open=AZzj14cU5tXfSMydqK84&pullRequest=2782

Check warning on line 86 in clickhouse-http-client/src/test/java/com/clickhouse/client/http/ApacheHttpConnectionImplTest.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this use of "ClickHouseHttpOption"; it is deprecated.

See more on https://sonarcloud.io/project/issues?id=ClickHouse_clickhouse-java&issues=AZzj14cU5tXfSMydqK83&open=AZzj14cU5tXfSMydqK83&pullRequest=2782
try (ClickHouseResponse resp = req1.query("select 1").executeAndWait()) {
Assert.assertEquals(resp.firstRecord().getValue(0).asString(), "1");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ public void testDecompressWithLargeChunk() throws ClickHouseException, IOExcepti
try (ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory.getInstance()
.createPipedOutputStream(config)) {
// start the worker thread which transfer data from the input into ClickHouse
future = request.data(stream.getInputStream()).execute();
future = request.data(stream.getInputStream()).set("async_insert", "0").execute();
// write bytes into the piped stream
LongStream.range(0, numRows).forEachOrdered(
n -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ public void setV1() {
}
@Override
public ClickHouseConnection newConnection(Properties properties) throws SQLException {
if (properties == null) {
properties = new Properties();
}
properties.setProperty("custom_http_params", "async_insert=0");
return (ClickHouseConnection) newDataSource(properties).getConnection();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
import com.clickhouse.client.ClickHouseProtocol;
import com.clickhouse.client.config.ClickHouseClientOption;
import com.clickhouse.client.config.ClickHouseDefaults;
import com.clickhouse.client.http.config.ClickHouseHttpOption;

Check warning on line 8 in clickhouse-r2dbc/src/main/java/com/clickhouse/r2dbc/connection/ClickHouseConnectionFactoryProvider.java

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Remove this unused import 'com.clickhouse.client.http.config.ClickHouseHttpOption'.

See more on https://sonarcloud.io/project/issues?id=ClickHouse_clickhouse-java&issues=AZzj14c65tXfSMydqK85&open=AZzj14c65tXfSMydqK85&pullRequest=2782
import com.clickhouse.config.ClickHouseOption;
import com.clickhouse.data.ClickHouseUtils;

import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.ConnectionFactoryOptions;
import io.r2dbc.spi.Option;
Expand All @@ -22,7 +22,14 @@
import java.util.ServiceLoader;
import java.util.Set;

import static io.r2dbc.spi.ConnectionFactoryOptions.*;
import static io.r2dbc.spi.ConnectionFactoryOptions.DATABASE;
import static io.r2dbc.spi.ConnectionFactoryOptions.DRIVER;
import static io.r2dbc.spi.ConnectionFactoryOptions.HOST;
import static io.r2dbc.spi.ConnectionFactoryOptions.PASSWORD;
import static io.r2dbc.spi.ConnectionFactoryOptions.PORT;
import static io.r2dbc.spi.ConnectionFactoryOptions.PROTOCOL;
import static io.r2dbc.spi.ConnectionFactoryOptions.SSL;
import static io.r2dbc.spi.ConnectionFactoryOptions.USER;

public class ClickHouseConnectionFactoryProvider implements io.r2dbc.spi.ConnectionFactoryProvider {

Expand Down Expand Up @@ -53,6 +60,7 @@
} catch (Exception e) {
// ignore
}
allOptions.add(Option.valueOf("custom_http_params"));
connQueryParams = Collections.unmodifiableList(new ArrayList<>(allOptions));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public static void setup() throws Exception {
ClickHouseServerForTest.beforeSuite();

connectionFactory = ConnectionFactories.get(
format("r2dbc:clickhouse:%s://%s:%s@%s/%s?falan=filan&%s#tag1", DEFAULT_PROTOCOL, USER, PASSWORD,
format("r2dbc:clickhouse:%s://%s:%s@%s/%s?falan=filan&custom_http_params=async_insert=0&%s#tag1", DEFAULT_PROTOCOL, USER, PASSWORD,
getClickHouseAddress(DEFAULT_PROTOCOL, false), DATABASE, EXTRA_PARAM));
jdbcTemplate = jdbcTemplate(null);
}
Expand Down Expand Up @@ -85,10 +85,10 @@ private static JdbcTemplate jdbcTemplate(String database) throws SQLException {
Driver driver = new ClickHouseDriver();
DriverManager.registerDriver(driver);
if (database == null) {
source.setJdbcUrl(format("jdbc:clickhouse:%s://%s?%s", DEFAULT_PROTOCOL,
source.setJdbcUrl(format("jdbc:clickhouse:%s://%s?custom_http_params=async_insert=0%s", DEFAULT_PROTOCOL,
getClickHouseAddress(DEFAULT_PROTOCOL, false), EXTRA_PARAM));
} else {
source.setJdbcUrl(format("jdbc:clickhouse:%s://%s/%s?%s", DEFAULT_PROTOCOL,
source.setJdbcUrl(format("jdbc:clickhouse:%s://%s/%s?custom_http_params=async_insert=0&%s", DEFAULT_PROTOCOL,
getClickHouseAddress(DEFAULT_PROTOCOL, false), DATABASE, EXTRA_PARAM));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -466,18 +466,21 @@ public void testServerSettings() throws Exception {
InsertSettings insertSettings = new InsertSettings()
.setQueryId(queryId)
.serverSetting(ServerSettings.ASYNC_INSERT, "1")
.serverSetting(ServerSettings.WAIT_ASYNC_INSERT, "1");
.serverSetting(ServerSettings.WAIT_ASYNC_INSERT, "1")
.serverSetting(ServerSettings.INPUT_FORMAT_BINARY_READ_JSON_AS_STRING, "1");

String csvData = "0.33\n0.44\n0.55\n";
client.insert("server_settings_test_table", new ByteArrayInputStream(csvData.getBytes()), ClickHouseFormat.CSV, insertSettings).get().close();

client.execute("SYSTEM FLUSH LOGS").get().close();

List<GenericRecord> logRecords = client.queryAll("SELECT * FROM clusterAllReplicas('default', system.query_log) WHERE query_id = '" + queryId + "' AND type = 'QueryFinish'");
List<GenericRecord> logRecords = client.queryAll("SELECT Settings, ProfileEvents['AsyncInsertQuery'] as was_async FROM clusterAllReplicas('default', system.query_log) WHERE query_id = '" + queryId + "' AND type = 'QueryFinish'");

GenericRecord record = logRecords.get(0);
Assert.assertTrue(record.getBoolean("was_async"));
String settings = record.getString(record.getSchema().nameToColumnIndex("Settings"));
Assert.assertTrue(settings.contains(ServerSettings.ASYNC_INSERT + "=1"));
Assert.assertTrue(settings.contains("input_format_binary_read_json_as_string=1"));
// Assert.assertTrue(settings.contains(ServerSettings.ASYNC_INSERT + "=1")); // async settings are not reflected in query log any more
// Assert.assertTrue(settings.contains(ServerSettings.WAIT_ASYNC_INSERT + "=1")); // uncomment after server fix
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
import com.clickhouse.client.api.enums.Protocol;
import com.clickhouse.client.api.enums.ProxyType;
import com.clickhouse.client.api.insert.InsertResponse;
import com.clickhouse.client.api.insert.InsertSettings;
import com.clickhouse.client.api.internal.ServerSettings;
import com.clickhouse.client.api.query.GenericRecord;
import com.clickhouse.client.insert.SamplePOJO;
import com.github.tomakehurst.wiremock.WireMockServer;
Expand Down Expand Up @@ -83,7 +85,8 @@ public void testInsert() throws Exception {
}

try {
InsertResponse response = client.get().insert(tableName, simplePOJOs).get(120, TimeUnit.SECONDS);
InsertResponse response = client.get().insert(tableName, simplePOJOs, new InsertSettings()
.serverSetting(ServerSettings.ASYNC_INSERT, "0")).get(120, TimeUnit.SECONDS);
Assert.assertEquals(response.getWrittenRows(), 1000);
} catch (Exception e) {
fail("Should not have thrown exception.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.clickhouse.client.api.data_formats.ClickHouseBinaryFormatReader;
import com.clickhouse.client.api.insert.InsertResponse;
import com.clickhouse.client.api.insert.InsertSettings;
import com.clickhouse.client.api.internal.ServerSettings;
import com.clickhouse.client.api.query.GenericRecord;
import com.clickhouse.client.api.query.QueryResponse;
import org.apache.commons.lang3.RandomStringUtils;
Expand Down Expand Up @@ -48,7 +49,8 @@ public void testInsertAndReadBackWithSecureConnection() {
client.register(SamplePOJO.class, client.getTableSchema(tableName));
InsertSettings settings = new InsertSettings()
.setDeduplicationToken(RandomStringUtils.randomAlphabetic(36))
.setQueryId(String.valueOf(UUID.randomUUID()));
.setQueryId(String.valueOf(UUID.randomUUID()))
.serverSetting(ServerSettings.ASYNC_INSERT, "0");
System.out.println("Inserting POJO: " + pojo);
try (InsertResponse response = client.insert(tableName, Collections.singletonList(pojo), settings).get(10, TimeUnit.SECONDS)) {
Assert.assertEquals(response.getWrittenRows(), 1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ public void testGetColumnsWithTable() throws Exception {
List<String> columnNames = Arrays.asList("id", "huge_integer", "name", "float1", "fixed_string1", "decimal_1", "nullable_column", "date", "datetime");
List<String> columnTypes = Arrays.asList("Int64", "UInt128", "String", "Float32", "FixedString(10)", "Decimal(10, 2)", "Nullable(Decimal(5, 4))", "Date", "DateTime");
List<Integer> columnSizes = Arrays.asList(8, 16, 0, 4, 10, 10, 5, 2, 0);
List<Integer> columnJDBCDataTypes = Arrays.asList(Types.BIGINT, Types.OTHER, Types.VARCHAR, Types.FLOAT, Types.VARCHAR, Types.DECIMAL, Types.DECIMAL, Types.DATE, Types.TIMESTAMP);
List<Integer> columnJDBCDataTypes = Arrays.asList(Types.BIGINT, Types.NUMERIC, Types.VARCHAR, Types.FLOAT, Types.VARCHAR, Types.DECIMAL, Types.DECIMAL, Types.DATE, Types.TIMESTAMP);
List<String> columnTypeNames = Arrays.asList("Int64", "UInt128", "String", "Float32", "FixedString(10)", "Decimal(10, 2)", "Nullable(Decimal(5, 4))", "Date", "DateTime");
List<Boolean> columnNullable = Arrays.asList(false, false, false, false, false, false, true, false, false);
List<Integer> columnDecimalDigits = Arrays.asList(null, null, null, null, null, 2, 4, null, null);
Expand Down
Loading