diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index dd46f8697..adfdd54fb 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -51,9 +51,7 @@ jobs: uses: actions/setup-java@v4 with: distribution: "temurin" - java-version: | - 8 - 21 + java-version: 21 cache: "maven" - name: Build and install libraries run: mvn --batch-mode --no-transfer-progress --show-version --strict-checksums --threads 2 -Dmaven.wagon.rto=30000 -Dj8 -DskipITs install diff --git a/clickhouse-client/src/test/java/com/clickhouse/client/ClickHouseTestClient.java b/clickhouse-client/src/test/java/com/clickhouse/client/ClickHouseTestClient.java index 7484565b3..3dc2ff2e3 100644 --- a/clickhouse-client/src/test/java/com/clickhouse/client/ClickHouseTestClient.java +++ b/clickhouse-client/src/test/java/com/clickhouse/client/ClickHouseTestClient.java @@ -7,7 +7,7 @@ public class ClickHouseTestClient implements ClickHouseClient { @Override public boolean accept(ClickHouseProtocol protocol) { - return true; + return protocol == ClickHouseProtocol.MYSQL; // to avoid taking this client in other tests } @Override diff --git a/clickhouse-client/src/test/java/com/clickhouse/client/ClientIntegrationTest.java b/clickhouse-client/src/test/java/com/clickhouse/client/ClientIntegrationTest.java index bc23f7aac..960e416eb 100644 --- a/clickhouse-client/src/test/java/com/clickhouse/client/ClientIntegrationTest.java +++ b/clickhouse-client/src/test/java/com/clickhouse/client/ClientIntegrationTest.java @@ -95,6 +95,39 @@ import java.util.zip.GZIPOutputStream; public abstract class ClientIntegrationTest extends BaseIntegrationTest { + + public static final ClickHouseOption CUSTOM_HTTP_PARAMS_OPT = new ClickHouseOption() { + @Override + public Serializable getDefaultValue() { + return null; + } + + @Override + public String getDescription() { + return ""; + } + + @Override + public String getKey() { + return "custom_http_params"; + } + + @Override + public Class getValueType() { + return String.class; + } + + @Override + public boolean isSensitive() { + return false; + } + + @Override + public String name() { + return "custom_http_params"; + } + }; + protected void checkRowCount(String queryOrTableName, int expectedRowCount) throws ClickHouseException { try (ClickHouseClient client = getClient()) { checkRowCount(newRequest(client, getServer()).format(ClickHouseFormat.RowBinaryWithNamesAndTypes), @@ -1869,7 +1902,7 @@ public void testLoadRawData() throws ClickHouseException, IOException { try (ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory.getInstance() .createPipedOutputStream(config)) { // start the worker thread which transfer data from the input into ClickHouse - future = request.data(stream.getInputStream()).execute(); + future = request.data(stream.getInputStream()).set("async_insert", "0").execute(); // write bytes into the piped stream for (int i = 0; i < rows; i++) { BinaryStreamUtils.writeInt64(stream, i); @@ -1999,7 +2032,7 @@ public void testInsertRawDataSimple(int numberOfRecords) throws Exception { CompletableFuture future; try (ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory.getInstance().createPipedOutputStream(config)) { // start the worker thread which transfer data from the input into ClickHouse - future = request.data(stream.getInputStream()).execute(); + future = request.data(stream.getInputStream()).set("async_insert", "0").execute(); for (int i = 0; i < numberOfRecords; i++) { BinaryStreamUtils.writeBytes(stream, String.format("{\"i\": %s, \"\": \"JSON\"}", i).getBytes(StandardCharsets.UTF_8)); } @@ -2208,7 +2241,7 @@ public void testAbortTransaction() throws ClickHouseException { throw new SkipException("Transaction was supported since 22.7"); } - ClickHouseRequest txRequest = newRequest(client, server).transaction(); + ClickHouseRequest txRequest = newRequest(client, server).set("async_insert", "0").transaction(); try (ClickHouseResponse response = txRequest.query("insert into " + tableName + " values(1)(2)(3)") .executeAndWait()) { // ignore @@ -2349,7 +2382,7 @@ public void testRollbackTransaction() throws ClickHouseException { throw new SkipException("Transaction was supported since 22.7"); } - ClickHouseRequest request = newRequest(client, server).transaction(); + ClickHouseRequest request = newRequest(client, server).set("async_insert", "0").transaction(); ClickHouseTransaction tx = request.getTransaction(); try (ClickHouseResponse response = newRequest(client, server) .query("insert into " + tableName + " values(0, '?')").executeAndWait()) { @@ -2367,7 +2400,7 @@ public void testRollbackTransaction() throws ClickHouseException { rows += 3; checkRowCount(request, tableName, rows); - ClickHouseRequest otherRequest = newRequest(client, server).transaction(tx); + ClickHouseRequest otherRequest = newRequest(client, server).set("async_insert", "0").transaction(tx); checkRowCount(otherRequest, tableName, rows); checkRowCount(tableName, rows); @@ -2413,8 +2446,8 @@ public void testTransactionSnapshot() throws ClickHouseException { throw new SkipException("Transaction was supported since 22.7"); } - ClickHouseRequest req1 = newRequest(client, server).transaction(); - ClickHouseRequest req2 = newRequest(client, server).transaction(); + ClickHouseRequest req1 = newRequest(client, server).set("async_insert", "0").transaction(); + ClickHouseRequest req2 = newRequest(client, server).set("async_insert", "0").transaction(); try (ClickHouseResponse response = req1.query("insert into " + tableName + " values(1)").executeAndWait()) { // ignore } @@ -2557,6 +2590,7 @@ public void testTransactionTimeout() throws ClickHouseException { checkRowCount(tableName, 0); request.transaction(1); try (ClickHouseResponse response = request.write().query("insert into " + tableName + " values(1)(2)(3)") + .set("async_insert", "0") .executeAndWait()) { // ignore } @@ -2591,7 +2625,7 @@ public void testImplicitTransaction() throws ClickHouseException { if (!checkServerVersion(client, server, "[22.7,)")) { throw new SkipException("Transaction was supported since 22.7"); } - ClickHouseRequest request = newRequest(client, server); + ClickHouseRequest request = newRequest(client, server).set("async_insert", "0"); ClickHouseTransaction.setImplicitTransaction(request, true); try (ClickHouseResponse response = request.query("insert into " + tableName + " values(1)") .executeAndWait()) { @@ -2640,7 +2674,7 @@ public void testRowBinaryWithDefaults() throws ClickHouseException, IOException, try (ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory.getInstance() .createPipedOutputStream(config)) { // start the worker thread which transfer data from the input into ClickHouse - future = request.data(stream.getInputStream()).execute(); + future = request.data(stream.getInputStream()).set("async_insert", "0").execute(); // write bytes into the piped stream LongStream.range(0, numRows).forEachOrdered( n -> { diff --git a/clickhouse-http-client/src/test/java/com/clickhouse/client/http/ApacheHttpConnectionImplTest.java b/clickhouse-http-client/src/test/java/com/clickhouse/client/http/ApacheHttpConnectionImplTest.java index 2b2a54397..7a5e99de4 100644 --- a/clickhouse-http-client/src/test/java/com/clickhouse/client/http/ApacheHttpConnectionImplTest.java +++ b/clickhouse-http-client/src/test/java/com/clickhouse/client/http/ApacheHttpConnectionImplTest.java @@ -83,7 +83,7 @@ public void testConnection() throws Exception { try (ClickHouseClient client = ClickHouseClient.newInstance()) { - ClickHouseRequest req1 = newRequest(client, server); + ClickHouseRequest req1 = newRequest(client, server).option(ClickHouseHttpOption.CUSTOM_PARAMS, "async_insert=0"); try (ClickHouseResponse resp = req1.query("select 1").executeAndWait()) { Assert.assertEquals(resp.firstRecord().getValue(0).asString(), "1"); } diff --git a/clickhouse-http-client/src/test/java/com/clickhouse/client/http/ClickHouseHttpClientTest.java b/clickhouse-http-client/src/test/java/com/clickhouse/client/http/ClickHouseHttpClientTest.java index 239a3313b..dd26a1ecd 100644 --- a/clickhouse-http-client/src/test/java/com/clickhouse/client/http/ClickHouseHttpClientTest.java +++ b/clickhouse-http-client/src/test/java/com/clickhouse/client/http/ClickHouseHttpClientTest.java @@ -535,7 +535,7 @@ public void testDecompressWithLargeChunk() throws ClickHouseException, IOExcepti try (ClickHousePipedOutputStream stream = ClickHouseDataStreamFactory.getInstance() .createPipedOutputStream(config)) { // start the worker thread which transfer data from the input into ClickHouse - future = request.data(stream.getInputStream()).execute(); + future = request.data(stream.getInputStream()).set("async_insert", "0").execute(); // write bytes into the piped stream LongStream.range(0, numRows).forEachOrdered( n -> { diff --git a/clickhouse-jdbc/src/test/java/com/clickhouse/jdbc/ClickHouseConnectionTest.java b/clickhouse-jdbc/src/test/java/com/clickhouse/jdbc/ClickHouseConnectionTest.java index 5fdeb3a82..04074f84a 100644 --- a/clickhouse-jdbc/src/test/java/com/clickhouse/jdbc/ClickHouseConnectionTest.java +++ b/clickhouse-jdbc/src/test/java/com/clickhouse/jdbc/ClickHouseConnectionTest.java @@ -30,6 +30,10 @@ public void setV1() { } @Override public ClickHouseConnection newConnection(Properties properties) throws SQLException { + if (properties == null) { + properties = new Properties(); + } + properties.setProperty("custom_http_params", "async_insert=0"); return (ClickHouseConnection) newDataSource(properties).getConnection(); } diff --git a/clickhouse-r2dbc/src/main/java/com/clickhouse/r2dbc/connection/ClickHouseConnectionFactoryProvider.java b/clickhouse-r2dbc/src/main/java/com/clickhouse/r2dbc/connection/ClickHouseConnectionFactoryProvider.java index c466b4f32..86656d390 100644 --- a/clickhouse-r2dbc/src/main/java/com/clickhouse/r2dbc/connection/ClickHouseConnectionFactoryProvider.java +++ b/clickhouse-r2dbc/src/main/java/com/clickhouse/r2dbc/connection/ClickHouseConnectionFactoryProvider.java @@ -5,9 +5,9 @@ import com.clickhouse.client.ClickHouseProtocol; import com.clickhouse.client.config.ClickHouseClientOption; import com.clickhouse.client.config.ClickHouseDefaults; +import com.clickhouse.client.http.config.ClickHouseHttpOption; import com.clickhouse.config.ClickHouseOption; import com.clickhouse.data.ClickHouseUtils; - import io.r2dbc.spi.ConnectionFactory; import io.r2dbc.spi.ConnectionFactoryOptions; import io.r2dbc.spi.Option; @@ -22,7 +22,14 @@ import java.util.ServiceLoader; import java.util.Set; -import static io.r2dbc.spi.ConnectionFactoryOptions.*; +import static io.r2dbc.spi.ConnectionFactoryOptions.DATABASE; +import static io.r2dbc.spi.ConnectionFactoryOptions.DRIVER; +import static io.r2dbc.spi.ConnectionFactoryOptions.HOST; +import static io.r2dbc.spi.ConnectionFactoryOptions.PASSWORD; +import static io.r2dbc.spi.ConnectionFactoryOptions.PORT; +import static io.r2dbc.spi.ConnectionFactoryOptions.PROTOCOL; +import static io.r2dbc.spi.ConnectionFactoryOptions.SSL; +import static io.r2dbc.spi.ConnectionFactoryOptions.USER; public class ClickHouseConnectionFactoryProvider implements io.r2dbc.spi.ConnectionFactoryProvider { @@ -53,6 +60,7 @@ public class ClickHouseConnectionFactoryProvider implements io.r2dbc.spi.Connect } catch (Exception e) { // ignore } + allOptions.add(Option.valueOf("custom_http_params")); connQueryParams = Collections.unmodifiableList(new ArrayList<>(allOptions)); } diff --git a/clickhouse-r2dbc/src/test/java/com/clickhouse/r2dbc/spi/test/R2DBCTestKitImplTest.java b/clickhouse-r2dbc/src/test/java/com/clickhouse/r2dbc/spi/test/R2DBCTestKitImplTest.java index 510bd614f..d082d6a37 100644 --- a/clickhouse-r2dbc/src/test/java/com/clickhouse/r2dbc/spi/test/R2DBCTestKitImplTest.java +++ b/clickhouse-r2dbc/src/test/java/com/clickhouse/r2dbc/spi/test/R2DBCTestKitImplTest.java @@ -54,7 +54,7 @@ public static void setup() throws Exception { ClickHouseServerForTest.beforeSuite(); connectionFactory = ConnectionFactories.get( - format("r2dbc:clickhouse:%s://%s:%s@%s/%s?falan=filan&%s#tag1", DEFAULT_PROTOCOL, USER, PASSWORD, + format("r2dbc:clickhouse:%s://%s:%s@%s/%s?falan=filan&custom_http_params=async_insert=0&%s#tag1", DEFAULT_PROTOCOL, USER, PASSWORD, getClickHouseAddress(DEFAULT_PROTOCOL, false), DATABASE, EXTRA_PARAM)); jdbcTemplate = jdbcTemplate(null); } @@ -85,10 +85,10 @@ private static JdbcTemplate jdbcTemplate(String database) throws SQLException { Driver driver = new ClickHouseDriver(); DriverManager.registerDriver(driver); if (database == null) { - source.setJdbcUrl(format("jdbc:clickhouse:%s://%s?%s", DEFAULT_PROTOCOL, + source.setJdbcUrl(format("jdbc:clickhouse:%s://%s?custom_http_params=async_insert=0%s", DEFAULT_PROTOCOL, getClickHouseAddress(DEFAULT_PROTOCOL, false), EXTRA_PARAM)); } else { - source.setJdbcUrl(format("jdbc:clickhouse:%s://%s/%s?%s", DEFAULT_PROTOCOL, + source.setJdbcUrl(format("jdbc:clickhouse:%s://%s/%s?custom_http_params=async_insert=0&%s", DEFAULT_PROTOCOL, getClickHouseAddress(DEFAULT_PROTOCOL, false), DATABASE, EXTRA_PARAM)); } diff --git a/client-v2/src/test/java/com/clickhouse/client/ClientTests.java b/client-v2/src/test/java/com/clickhouse/client/ClientTests.java index 08cd233f6..d63f9b2cb 100644 --- a/client-v2/src/test/java/com/clickhouse/client/ClientTests.java +++ b/client-v2/src/test/java/com/clickhouse/client/ClientTests.java @@ -466,18 +466,21 @@ public void testServerSettings() throws Exception { InsertSettings insertSettings = new InsertSettings() .setQueryId(queryId) .serverSetting(ServerSettings.ASYNC_INSERT, "1") - .serverSetting(ServerSettings.WAIT_ASYNC_INSERT, "1"); + .serverSetting(ServerSettings.WAIT_ASYNC_INSERT, "1") + .serverSetting(ServerSettings.INPUT_FORMAT_BINARY_READ_JSON_AS_STRING, "1"); String csvData = "0.33\n0.44\n0.55\n"; client.insert("server_settings_test_table", new ByteArrayInputStream(csvData.getBytes()), ClickHouseFormat.CSV, insertSettings).get().close(); client.execute("SYSTEM FLUSH LOGS").get().close(); - List logRecords = client.queryAll("SELECT * FROM clusterAllReplicas('default', system.query_log) WHERE query_id = '" + queryId + "' AND type = 'QueryFinish'"); + List logRecords = client.queryAll("SELECT Settings, ProfileEvents['AsyncInsertQuery'] as was_async FROM clusterAllReplicas('default', system.query_log) WHERE query_id = '" + queryId + "' AND type = 'QueryFinish'"); GenericRecord record = logRecords.get(0); + Assert.assertTrue(record.getBoolean("was_async")); String settings = record.getString(record.getSchema().nameToColumnIndex("Settings")); - Assert.assertTrue(settings.contains(ServerSettings.ASYNC_INSERT + "=1")); + Assert.assertTrue(settings.contains("input_format_binary_read_json_as_string=1")); +// Assert.assertTrue(settings.contains(ServerSettings.ASYNC_INSERT + "=1")); // async settings are not reflected in query log any more // Assert.assertTrue(settings.contains(ServerSettings.WAIT_ASYNC_INSERT + "=1")); // uncomment after server fix } } diff --git a/client-v2/src/test/java/com/clickhouse/client/ProxyTests.java b/client-v2/src/test/java/com/clickhouse/client/ProxyTests.java index e5de2eeab..2c1d5d697 100644 --- a/client-v2/src/test/java/com/clickhouse/client/ProxyTests.java +++ b/client-v2/src/test/java/com/clickhouse/client/ProxyTests.java @@ -6,6 +6,8 @@ import com.clickhouse.client.api.enums.Protocol; import com.clickhouse.client.api.enums.ProxyType; import com.clickhouse.client.api.insert.InsertResponse; +import com.clickhouse.client.api.insert.InsertSettings; +import com.clickhouse.client.api.internal.ServerSettings; import com.clickhouse.client.api.query.GenericRecord; import com.clickhouse.client.insert.SamplePOJO; import com.github.tomakehurst.wiremock.WireMockServer; @@ -83,7 +85,8 @@ public void testInsert() throws Exception { } try { - InsertResponse response = client.get().insert(tableName, simplePOJOs).get(120, TimeUnit.SECONDS); + InsertResponse response = client.get().insert(tableName, simplePOJOs, new InsertSettings() + .serverSetting(ServerSettings.ASYNC_INSERT, "0")).get(120, TimeUnit.SECONDS); Assert.assertEquals(response.getWrittenRows(), 1000); } catch (Exception e) { fail("Should not have thrown exception.", e); diff --git a/client-v2/src/test/java/com/clickhouse/client/insert/InsertClientContentCompressionTests.java b/client-v2/src/test/java/com/clickhouse/client/insert/InsertClientContentCompressionTests.java index 1a2d263be..67d7f23ef 100644 --- a/client-v2/src/test/java/com/clickhouse/client/insert/InsertClientContentCompressionTests.java +++ b/client-v2/src/test/java/com/clickhouse/client/insert/InsertClientContentCompressionTests.java @@ -7,6 +7,7 @@ import com.clickhouse.client.api.data_formats.ClickHouseBinaryFormatReader; import com.clickhouse.client.api.insert.InsertResponse; import com.clickhouse.client.api.insert.InsertSettings; +import com.clickhouse.client.api.internal.ServerSettings; import com.clickhouse.client.api.query.GenericRecord; import com.clickhouse.client.api.query.QueryResponse; import org.apache.commons.lang3.RandomStringUtils; @@ -48,7 +49,8 @@ public void testInsertAndReadBackWithSecureConnection() { client.register(SamplePOJO.class, client.getTableSchema(tableName)); InsertSettings settings = new InsertSettings() .setDeduplicationToken(RandomStringUtils.randomAlphabetic(36)) - .setQueryId(String.valueOf(UUID.randomUUID())); + .setQueryId(String.valueOf(UUID.randomUUID())) + .serverSetting(ServerSettings.ASYNC_INSERT, "0"); System.out.println("Inserting POJO: " + pojo); try (InsertResponse response = client.insert(tableName, Collections.singletonList(pojo), settings).get(10, TimeUnit.SECONDS)) { Assert.assertEquals(response.getWrittenRows(), 1); diff --git a/jdbc-v2/src/test/java/com/clickhouse/jdbc/metadata/DatabaseMetaDataTest.java b/jdbc-v2/src/test/java/com/clickhouse/jdbc/metadata/DatabaseMetaDataTest.java index 68025b640..76248845e 100644 --- a/jdbc-v2/src/test/java/com/clickhouse/jdbc/metadata/DatabaseMetaDataTest.java +++ b/jdbc-v2/src/test/java/com/clickhouse/jdbc/metadata/DatabaseMetaDataTest.java @@ -315,7 +315,7 @@ public void testGetColumnsWithTable() throws Exception { List columnNames = Arrays.asList("id", "huge_integer", "name", "float1", "fixed_string1", "decimal_1", "nullable_column", "date", "datetime"); List columnTypes = Arrays.asList("Int64", "UInt128", "String", "Float32", "FixedString(10)", "Decimal(10, 2)", "Nullable(Decimal(5, 4))", "Date", "DateTime"); List columnSizes = Arrays.asList(8, 16, 0, 4, 10, 10, 5, 2, 0); - List columnJDBCDataTypes = Arrays.asList(Types.BIGINT, Types.OTHER, Types.VARCHAR, Types.FLOAT, Types.VARCHAR, Types.DECIMAL, Types.DECIMAL, Types.DATE, Types.TIMESTAMP); + List columnJDBCDataTypes = Arrays.asList(Types.BIGINT, Types.NUMERIC, Types.VARCHAR, Types.FLOAT, Types.VARCHAR, Types.DECIMAL, Types.DECIMAL, Types.DATE, Types.TIMESTAMP); List columnTypeNames = Arrays.asList("Int64", "UInt128", "String", "Float32", "FixedString(10)", "Decimal(10, 2)", "Nullable(Decimal(5, 4))", "Date", "DateTime"); List columnNullable = Arrays.asList(false, false, false, false, false, false, true, false, false); List columnDecimalDigits = Arrays.asList(null, null, null, null, null, 2, 4, null, null);