Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
16b7883
be generous for should_evict_down_node_metrics_when_timeout_fires
SiyaoIsHiding Jun 24, 2025
6f31d18
mavenbundle start level 1
SiyaoIsHiding Jun 25, 2025
ff0cc04
should_evict_down_node_metrics_when_timeout_fires to 40 seconds
SiyaoIsHiding Jun 25, 2025
6594f3b
udt codec IT simple statement raise time out to 20 seconds
SiyaoIsHiding Jun 25, 2025
faeda6e
check schema agreement for PreparedStatementCachingIT
SiyaoIsHiding Jun 25, 2025
935cf34
DefaultReactiveResultSetIT raise METADATA_SCHEMA_REQUEST_TIMEOUT?
SiyaoIsHiding Jun 25, 2025
ab7b1c1
DefaultReactiveResultSetIT raise METADATA_SCHEMA_REQUEST_TIMEOUT?
SiyaoIsHiding Jun 30, 2025
7ac257a
change timeout to 120s or sth.
SiyaoIsHiding Jun 30, 2025
5593f5b
change table and type names
SiyaoIsHiding Jul 1, 2025
646ffc9
DefaultReactiveResultSetIT not paralell
SiyaoIsHiding Jul 1, 2025
02dedff
empty
SiyaoIsHiding Jul 1, 2025
e2b5062
compilation warning
SiyaoIsHiding Jul 2, 2025
eb8822b
empty
SiyaoIsHiding Jul 2, 2025
a7ff308
Merge remote-tracking branch 'upstream/4.x' into flaky-test
SiyaoIsHiding Jul 2, 2025
d7165f2
add logging
SiyaoIsHiding Jul 8, 2025
ec7a107
add logging
SiyaoIsHiding Jul 8, 2025
cf054fa
empty
SiyaoIsHiding Jul 9, 2025
9045265
empty
SiyaoIsHiding Jul 9, 2025
933bafb
fmt
SiyaoIsHiding Jul 9, 2025
6ec3c73
error
SiyaoIsHiding Jul 10, 2025
9ee5a8b
add logging
SiyaoIsHiding Jul 10, 2025
3289fda
register listener first, create prepare later
SiyaoIsHiding Jul 10, 2025
98eb757
strong values of cache
SiyaoIsHiding Jul 11, 2025
c0d2f74
Merge remote-tracking branch 'upstream/4.x' into flaky-test
SiyaoIsHiding Jul 15, 2025
4f22bd6
CancellationIT use strong values cache without cache removal callback
SiyaoIsHiding Jul 16, 2025
9ba8203
empty
SiyaoIsHiding Jul 16, 2025
647d262
Address PR review
SiyaoIsHiding Mar 5, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
import com.datastax.oss.driver.internal.core.session.RequestProcessor;
import com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures;
import com.datastax.oss.driver.internal.core.util.concurrent.RunOrSchedule;
import com.datastax.oss.driver.shaded.guava.common.base.Functions;
import com.datastax.oss.driver.shaded.guava.common.cache.Cache;
import com.datastax.oss.driver.shaded.guava.common.cache.CacheBuilder;
import com.datastax.oss.driver.shaded.guava.common.collect.Iterables;
Expand Down Expand Up @@ -64,14 +63,17 @@ public CqlPrepareAsyncProcessor() {
}

public CqlPrepareAsyncProcessor(@NonNull Optional<? extends DefaultDriverContext> context) {
this(context, Functions.identity());
this(context, CacheBuilder::weakValues);
}

protected CqlPrepareAsyncProcessor(
Optional<? extends DefaultDriverContext> context,
Function<CacheBuilder<Object, Object>, CacheBuilder<Object, Object>> decorator) {

CacheBuilder<Object, Object> baseCache = CacheBuilder.newBuilder().weakValues();
// CASSJAVA-104
// Note that the base cache does NOT use weak values like the one-arg constructor it previously
// does!
CacheBuilder<Object, Object> baseCache = CacheBuilder.newBuilder();
this.cache = decorator.apply(baseCache).build();
context.ifPresent(
(ctx) -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,8 @@ private void invalidationResultSetTest(
Consumer<CqlSession> setupTestSchema, Set<String> expectedChangedTypes) {
invalidationTestInner(
setupTestSchema,
"select f from test_table_1 where e = ?",
"select h from test_table_2 where g = ?",
"select f from test_table_caching_1 where e = ?",
"select h from test_table_caching_2 where g = ?",
expectedChangedTypes);
}

Expand All @@ -206,8 +206,8 @@ private void invalidationVariableDefsTest(
String condition = isCollection ? "contains ?" : "= ?";
invalidationTestInner(
setupTestSchema,
String.format("select e from test_table_1 where f %s allow filtering", condition),
String.format("select g from test_table_2 where h %s allow filtering", condition),
String.format("select e from test_table_caching_1 where f %s allow filtering", condition),
String.format("select g from test_table_caching_2 where h %s allow filtering", condition),
expectedChangedTypes);
}

Expand Down Expand Up @@ -263,16 +263,18 @@ private void invalidationTestInner(
preparedStmtCacheRemoveLatch.countDown();
});

// alter test_type_2 to trigger cache invalidation and above events
session.execute("ALTER TYPE test_type_2 add i blob");
// alter test_type_caching_2 to trigger cache invalidation and above events
session.execute("ALTER TYPE test_type_caching_2 add i blob");

session.checkSchemaAgreement();

// wait for latches and fail if they don't reach zero before timeout
assertThat(
Uninterruptibles.awaitUninterruptibly(
preparedStmtCacheRemoveLatch, 10, TimeUnit.SECONDS))
preparedStmtCacheRemoveLatch, 120, TimeUnit.SECONDS))
.withFailMessage("preparedStmtCacheRemoveLatch did not trigger before timeout")
.isTrue();
assertThat(Uninterruptibles.awaitUninterruptibly(typeChangeEventLatch, 10, TimeUnit.SECONDS))
assertThat(Uninterruptibles.awaitUninterruptibly(typeChangeEventLatch, 20, TimeUnit.SECONDS))
.withFailMessage("typeChangeEventLatch did not trigger before timeout")
.isTrue();

Expand All @@ -295,17 +297,20 @@ private void invalidationTestInner(

Consumer<CqlSession> setupCacheEntryTestBasic =
(session) -> {
session.execute("CREATE TYPE test_type_1 (a text, b int)");
session.execute("CREATE TYPE test_type_2 (c int, d text)");
session.execute("CREATE TABLE test_table_1 (e int primary key, f frozen<test_type_1>)");
session.execute("CREATE TABLE test_table_2 (g int primary key, h frozen<test_type_2>)");
session.execute("CREATE TYPE test_type_caching_1 (a text, b int)");
session.execute("CREATE TYPE test_type_caching_2 (c int, d text)");
session.execute(
"CREATE TABLE test_table_caching_1 (e int primary key, f frozen<test_type_caching_1>)");
session.execute(
"CREATE TABLE test_table_caching_2 (g int primary key, h frozen<test_type_caching_2>)");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good job, former me 🤦

Should the names of the tables and types created here not follow the basic/collection/tuple/nested split we use for setupCacheEntryTest* method names throughout? Seems like this should be:

        session.execute("CREATE TYPE test_type_basic_1 (a text, b int)");
        session.execute("CREATE TYPE test_type_basic_2 (c int, d text)");
        session.execute("CREATE TABLE test_table_basic_1 (e int primary key, f frozen<test_type_1>)");
        session.execute("CREATE TABLE test_table_basic_2 (g int primary key, h frozen<test_type_2>)");

and so on if we really want to avoid collisions.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

invalidationResultSetTest, invalidationVariableDefsTest, and invalidationTestInner are shared among basic/collection/tuple/nested. They need to be refactored if we want to change the table names to specify basic/collection/tuple/nested. I don't think it's worth it, cuz test methods inside of one test suite won't run in parallel anyway

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But don't we have to change the table name since these tests don't clean up after themselves? Once you execute the tests for, say, basic you've already created test_type_1, test_type_2 and all the rest. Won't you have a conflict then if you try to create those types against the same Cassandra backend (since they were never removed)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test suite has a @Rule instead of a @ClassRule, that means for every test method, CCM bridge creates a separate keyspace and session.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait, is that how that works? I remembered that working differently. My recollection was that CcmBridge just controlled the ccm interaction and that the session rule was the entity that constrained ops to a given keyspace.

Yeah, as I read this SessionRule can create a new keyspace if the user specifies it in the builder but this IT does not currently do so and this PR doesn't change that behaviour.

I do agree we need a new keyspace per test (since without that the create type calls will definitely stomp on each other) but I don't think we're quite there yet are we?

};

@Test
public void should_invalidate_cache_entry_on_basic_udt_change_result_set() {
SchemaChangeSynchronizer.withLock(
() -> {
invalidationResultSetTest(setupCacheEntryTestBasic, ImmutableSet.of("test_type_2"));
invalidationResultSetTest(
setupCacheEntryTestBasic, ImmutableSet.of("test_type_caching_2"));
});
}

Expand All @@ -314,25 +319,26 @@ public void should_invalidate_cache_entry_on_basic_udt_change_variable_defs() {
SchemaChangeSynchronizer.withLock(
() -> {
invalidationVariableDefsTest(
setupCacheEntryTestBasic, false, ImmutableSet.of("test_type_2"));
setupCacheEntryTestBasic, false, ImmutableSet.of("test_type_caching_2"));
});
}

Consumer<CqlSession> setupCacheEntryTestCollection =
(session) -> {
session.execute("CREATE TYPE test_type_1 (a text, b int)");
session.execute("CREATE TYPE test_type_2 (c int, d text)");
session.execute("CREATE TYPE test_type_caching_1 (a text, b int)");
session.execute("CREATE TYPE test_type_caching_2 (c int, d text)");
session.execute(
"CREATE TABLE test_table_1 (e int primary key, f list<frozen<test_type_1>>)");
"CREATE TABLE test_table_caching_1 (e int primary key, f list<frozen<test_type_caching_1>>)");
session.execute(
"CREATE TABLE test_table_2 (g int primary key, h list<frozen<test_type_2>>)");
"CREATE TABLE test_table_caching_2 (g int primary key, h list<frozen<test_type_caching_2>>)");
};

@Test
public void should_invalidate_cache_entry_on_collection_udt_change_result_set() {
SchemaChangeSynchronizer.withLock(
() -> {
invalidationResultSetTest(setupCacheEntryTestCollection, ImmutableSet.of("test_type_2"));
invalidationResultSetTest(
setupCacheEntryTestCollection, ImmutableSet.of("test_type_caching_2"));
});
}

Expand All @@ -341,25 +347,26 @@ public void should_invalidate_cache_entry_on_collection_udt_change_variable_defs
SchemaChangeSynchronizer.withLock(
() -> {
invalidationVariableDefsTest(
setupCacheEntryTestCollection, true, ImmutableSet.of("test_type_2"));
setupCacheEntryTestCollection, true, ImmutableSet.of("test_type_caching_2"));
});
}

Consumer<CqlSession> setupCacheEntryTestTuple =
(session) -> {
session.execute("CREATE TYPE test_type_1 (a text, b int)");
session.execute("CREATE TYPE test_type_2 (c int, d text)");
session.execute("CREATE TYPE test_type_caching_1 (a text, b int)");
session.execute("CREATE TYPE test_type_caching_2 (c int, d text)");
session.execute(
"CREATE TABLE test_table_1 (e int primary key, f tuple<int, test_type_1, text>)");
"CREATE TABLE test_table_caching_1 (e int primary key, f tuple<int, test_type_caching_1, text>)");
session.execute(
"CREATE TABLE test_table_2 (g int primary key, h tuple<text, test_type_2, int>)");
"CREATE TABLE test_table_caching_2 (g int primary key, h tuple<text, test_type_caching_2, int>)");
};

@Test
public void should_invalidate_cache_entry_on_tuple_udt_change_result_set() {
SchemaChangeSynchronizer.withLock(
() -> {
invalidationResultSetTest(setupCacheEntryTestTuple, ImmutableSet.of("test_type_2"));
invalidationResultSetTest(
setupCacheEntryTestTuple, ImmutableSet.of("test_type_caching_2"));
});
}

Expand All @@ -368,26 +375,29 @@ public void should_invalidate_cache_entry_on_tuple_udt_change_variable_defs() {
SchemaChangeSynchronizer.withLock(
() -> {
invalidationVariableDefsTest(
setupCacheEntryTestTuple, false, ImmutableSet.of("test_type_2"));
setupCacheEntryTestTuple, false, ImmutableSet.of("test_type_caching_2"));
});
}

Consumer<CqlSession> setupCacheEntryTestNested =
(session) -> {
session.execute("CREATE TYPE test_type_1 (a text, b int)");
session.execute("CREATE TYPE test_type_2 (c int, d text)");
session.execute("CREATE TYPE test_type_3 (e frozen<test_type_1>, f int)");
session.execute("CREATE TYPE test_type_4 (g int, h frozen<test_type_2>)");
session.execute("CREATE TABLE test_table_1 (e int primary key, f frozen<test_type_3>)");
session.execute("CREATE TABLE test_table_2 (g int primary key, h frozen<test_type_4>)");
session.execute("CREATE TYPE test_type_caching_1 (a text, b int)");
session.execute("CREATE TYPE test_type_caching_2 (c int, d text)");
session.execute("CREATE TYPE test_type_caching_3 (e frozen<test_type_caching_1>, f int)");
session.execute("CREATE TYPE test_type_caching_4 (g int, h frozen<test_type_caching_2>)");
session.execute(
"CREATE TABLE test_table_caching_1 (e int primary key, f frozen<test_type_caching_3>)");
session.execute(
"CREATE TABLE test_table_caching_2 (g int primary key, h frozen<test_type_caching_4>)");
};

@Test
public void should_invalidate_cache_entry_on_nested_udt_change_result_set() {
SchemaChangeSynchronizer.withLock(
() -> {
invalidationResultSetTest(
setupCacheEntryTestNested, ImmutableSet.of("test_type_2", "test_type_4"));
setupCacheEntryTestNested,
ImmutableSet.of("test_type_caching_2", "test_type_caching_4"));
});
}

Expand All @@ -396,7 +406,9 @@ public void should_invalidate_cache_entry_on_nested_udt_change_variable_defs() {
SchemaChangeSynchronizer.withLock(
() -> {
invalidationVariableDefsTest(
setupCacheEntryTestNested, false, ImmutableSet.of("test_type_2", "test_type_4"));
setupCacheEntryTestNested,
false,
ImmutableSet.of("test_type_caching_2", "test_type_caching_4"));
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,34 @@
import static org.junit.Assert.fail;

import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
import com.datastax.oss.driver.api.core.context.DriverContext;
import com.datastax.oss.driver.api.core.cql.PrepareRequest;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.session.ProgrammaticArguments;
import com.datastax.oss.driver.api.core.session.SessionBuilder;
import com.datastax.oss.driver.api.testinfra.ccm.CustomCcmRule;
import com.datastax.oss.driver.api.testinfra.session.SessionRule;
import com.datastax.oss.driver.api.testinfra.session.SessionUtils;
import com.datastax.oss.driver.categories.IsolatedTests;
import com.datastax.oss.driver.internal.core.context.DefaultDriverContext;
import com.datastax.oss.driver.internal.core.cql.CqlPrepareAsyncProcessor;
import com.datastax.oss.driver.internal.core.cql.CqlPrepareSyncProcessor;
import com.datastax.oss.driver.internal.core.session.BuiltInRequestProcessors;
import com.datastax.oss.driver.internal.core.session.RequestProcessor;
import com.datastax.oss.driver.internal.core.session.RequestProcessorRegistry;
import com.datastax.oss.driver.shaded.guava.common.base.Predicates;
import com.datastax.oss.driver.shaded.guava.common.cache.Cache;
import com.datastax.oss.driver.shaded.guava.common.collect.Iterables;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
Expand All @@ -50,6 +64,69 @@ public class PreparedStatementCancellationIT {

@Rule public TestRule chain = RuleChain.outerRule(ccmRule).around(sessionRule);

private static class TestCqlPrepareAsyncProcessor extends CqlPrepareAsyncProcessor {

public TestCqlPrepareAsyncProcessor(@NonNull Optional<DefaultDriverContext> context) {
// Default CqlPrepareAsyncProcessor uses weak values here as well. We avoid doing so
// to prevent cache entries from unexpectedly disappearing mid-test.
super(context, Function.identity());
}
}

private static class TestDefaultDriverContext extends DefaultDriverContext {
public TestDefaultDriverContext(
DriverConfigLoader configLoader, ProgrammaticArguments programmaticArguments) {
super(configLoader, programmaticArguments);
}

@Override
protected RequestProcessorRegistry buildRequestProcessorRegistry() {
// Re-create the processor cache to insert the TestCqlPrepareAsyncProcessor with it's strong
// prepared statement cache, see JAVA-3062
List<RequestProcessor<?, ?>> processors =
BuiltInRequestProcessors.createDefaultProcessors(this);
processors.removeIf((processor) -> processor instanceof CqlPrepareAsyncProcessor);
processors.removeIf((processor) -> processor instanceof CqlPrepareSyncProcessor);
CqlPrepareAsyncProcessor asyncProcessor =
new PreparedStatementCancellationIT.TestCqlPrepareAsyncProcessor(Optional.of(this));
processors.add(2, asyncProcessor);
processors.add(3, new CqlPrepareSyncProcessor(asyncProcessor));
return new RequestProcessorRegistry(
getSessionName(), processors.toArray(new RequestProcessor[0]));
}
}

private static class TestSessionBuilder extends SessionBuilder {

@Override
protected Object wrap(@NonNull CqlSession defaultSession) {
return defaultSession;
}

@Override
protected DriverContext buildContext(
DriverConfigLoader configLoader, ProgrammaticArguments programmaticArguments) {
return new PreparedStatementCancellationIT.TestDefaultDriverContext(
configLoader, programmaticArguments);
}
}

@BeforeClass
public static void setupBeforeClass() {
System.setProperty(
SessionUtils.SESSION_BUILDER_CLASS_PROPERTY,
PreparedStatementCancellationIT.class.getName());
}

@AfterClass
public static void teardownAfterClass() {
System.clearProperty(SessionUtils.SESSION_BUILDER_CLASS_PROPERTY);
}

public static SessionBuilder builder() {
return new PreparedStatementCancellationIT.TestSessionBuilder();
}

@Before
public void setup() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,10 @@
import com.datastax.oss.driver.api.core.cql.ExecutionInfo;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import com.datastax.oss.driver.api.core.cql.Statement;
import com.datastax.oss.driver.api.testinfra.ccm.CcmRule;
import com.datastax.oss.driver.api.testinfra.ccm.SchemaChangeSynchronizer;
import com.datastax.oss.driver.api.testinfra.session.SessionRule;
import com.datastax.oss.driver.categories.ParallelizableTests;
import com.datastax.oss.driver.internal.core.cql.EmptyColumnDefinitions;
import com.tngtech.java.junit.dataprovider.DataProvider;
import com.tngtech.java.junit.dataprovider.DataProviderRunner;
Expand All @@ -47,13 +47,11 @@
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.rules.RuleChain;
import org.junit.rules.TestRule;
import org.junit.runner.RunWith;

@RunWith(DataProviderRunner.class)
@Category(ParallelizableTests.class)
public class DefaultReactiveResultSetIT {

private static CcmRule ccmRule = CcmRule.getInstance();
Expand All @@ -67,19 +65,15 @@ public static void initialize() {
CqlSession session = sessionRule.session();
SchemaChangeSynchronizer.withLock(
() -> {
session.execute("DROP TABLE IF EXISTS test_reactive_read");
session.execute("DROP TABLE IF EXISTS test_reactive_write");
session.execute(createSlowStatement("DROP TABLE IF EXISTS test_reactive_read"));
session.execute(createSlowStatement("DROP TABLE IF EXISTS test_reactive_write"));
session.checkSchemaAgreement();
session.execute(
SimpleStatement.builder(
"CREATE TABLE test_reactive_read (pk int, cc int, v int, PRIMARY KEY ((pk), cc))")
.setExecutionProfile(sessionRule.slowProfile())
.build());
createSlowStatement(
"CREATE TABLE test_reactive_read (pk int, cc int, v int, PRIMARY KEY ((pk), cc))"));
session.execute(
SimpleStatement.builder(
"CREATE TABLE test_reactive_write (pk int, cc int, v int, PRIMARY KEY ((pk), cc))")
.setExecutionProfile(sessionRule.slowProfile())
.build());
createSlowStatement(
"CREATE TABLE test_reactive_write (pk int, cc int, v int, PRIMARY KEY ((pk), cc))"));
session.checkSchemaAgreement();
});
for (int i = 0; i < 1000; i++) {
Expand All @@ -92,6 +86,12 @@ public static void initialize() {
}
}

static Statement<?> createSlowStatement(String statement) {
return SimpleStatement.builder(statement)
.setExecutionProfile(sessionRule.slowProfile())
.build();
}

@Before
public void truncateTables() throws Exception {
CqlSession session = sessionRule.session();
Expand Down
Loading