From 8fc91c7ef5c26ce5d3451507b390c191841d196f Mon Sep 17 00:00:00 2001 From: He-Pin Date: Mon, 25 May 2026 21:54:56 +0800 Subject: [PATCH 1/3] test: extend actor-system-terminate phase timeout in InputStreamSourceTest for JDK 25 virtualized nightly Motivation: JDK 25 nightly runs abort the stream TCK with `Failed to stop [InputStreamSourceTest] within [40000 milliseconds]` after the CoordinatedShutdown `actor-system-terminate` phase times out at its default 10 seconds. The dump shows two `flow-X-0-take` ActorGraphInterpreter children stuck mid-termination under the StreamSupervisor. The test feeds a CPU-busy `InputStream` whose `read()` always returns a fresh byte without blocking or yielding, so each `onPull` runs up to `chunkSize` synchronous `read()` calls. The nightly JDK 25 build forces `pekko.test.stream-dispatcher.fork-join-executor.virtualize=on`, which is the very dispatcher the test pins via `ActorAttributes.dispatcher(...)`. On a virtualized dispatcher this combination slows cancellation propagation through `take(elements)` enough that the 10 second phase timeout fires before the lingering flow actors finish terminating, even though the outer `ActorSystemLifecycle.shutdownTimeout` is already scaled to 40 seconds by `pekko.test.timefactor`. Modification: Override `additionalConfig` in `InputStreamSourceTest` to extend `pekko.coordinated-shutdown.phases.actor-system-terminate.timeout` to 30 seconds, mirroring the pattern already used in `MixedProtocolClusterSpec` for the same JDK 25 virtualized failure mode. The override layers on top of `PekkoPublisherVerification.additionalConfig` via `withFallback` so existing buffer-size settings are preserved. Result: The phase has enough headroom to drain in-flight cancellation traffic on virtualized dispatchers before the outer shutdown await fires. Verified locally on JDK 25 (Oracle OpenJDK 25.0.2) with the same virtualize/timefactor flags as `nightly-builds.yml`: `sbt "project stream-tests-tck" "testOnly org.apache.pekko.stream.tck.InputStreamSourceTest"` reports 26 passing / 0 failing / 12 canceled (TCK optional multi-subscriber specs). References: nightly-builds.yml `jdk-nightly-build` matrix entry javaVersion=25 --- .../pekko/stream/tck/InputStreamSourceTest.scala | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/stream-tests-tck/src/test/scala/org/apache/pekko/stream/tck/InputStreamSourceTest.scala b/stream-tests-tck/src/test/scala/org/apache/pekko/stream/tck/InputStreamSourceTest.scala index bc54abba439..dc604f2bb6d 100644 --- a/stream-tests-tck/src/test/scala/org/apache/pekko/stream/tck/InputStreamSourceTest.scala +++ b/stream-tests-tck/src/test/scala/org/apache/pekko/stream/tck/InputStreamSourceTest.scala @@ -15,6 +15,8 @@ package org.apache.pekko.stream.tck import java.io.InputStream +import com.typesafe.config.{ Config, ConfigFactory } + import org.apache.pekko import pekko.stream.ActorAttributes import pekko.stream.scaladsl.{ Sink, StreamConverters } @@ -24,6 +26,17 @@ import org.reactivestreams.Publisher class InputStreamSourceTest extends PekkoPublisherVerification[ByteString] { + // The test's InputStream is CPU-busy (each read() returns a fresh byte without + // blocking or yielding), so cancellation propagation through `take(elements)` can + // be slow when `pekko.test.stream-dispatcher` is virtualized in JDK 21+/JDK 25 + // nightly runs. Extend the actor-system-terminate phase timeout so that the + // CoordinatedShutdown phase has enough headroom for any lingering flow actors to + // finish terminating before the outer shutdown await fires. + override def additionalConfig: Config = + ConfigFactory + .parseString("pekko.coordinated-shutdown.phases.actor-system-terminate.timeout = 30 s") + .withFallback(super.additionalConfig) + def createPublisher(elements: Long): Publisher[ByteString] = { StreamConverters .fromInputStream(() => From 95f6a3626269498ddd70cb18b92240df5c3318c4 Mon Sep 17 00:00:00 2001 From: He-Pin Date: Wed, 27 May 2026 17:28:34 +0800 Subject: [PATCH 2/3] test: stabilize nightly stream and TLS specs Motivation: Recent nightly builds fail repeatedly on JDK 21/25 in stream TCK and TLS rotating-key tests. Modification: Make InputStreamSourceTest model the TCK element count directly by emitting one byte per ByteString without relying on take(elements) cancellation. Allow RotatingKeysSSLEngineProviderSpec.contact to ignore retry ActorIdentity messages while waiting for the echo response. Result: The affected specs no longer fail when delayed Identify responses or JDK 25 virtualized test-stream-dispatcher scheduling occur. Tests: - scalafmt --mode diff-ref=origin/main - scalafmt --list --mode diff-ref=origin/main - git diff --check - sbt with JDK 25 nightly-style virtualized dispatcher flags: stream-tests-tck / Test / testOnly org.apache.pekko.stream.tck.InputStreamSourceTest; remote / Test / testOnly org.apache.pekko.remote.artery.tcp.ssl.RotatingProviderWithChangingKeysSpec - sbt with JDK 21 nightly-style virtualized dispatcher flags: remote / Test / testOnly org.apache.pekko.remote.artery.tcp.ssl.RotatingProviderWithChangingKeysSpec - sbt with JDK 25 nightly-style virtualized test-stream-dispatcher flags: stream-tests-tck / Test / testOnly org.apache.pekko.stream.tck.InputStreamSourceTest References: None - nightly-builds.yml failure analysis --- .../RotatingKeysSSLEngineProviderSpec.scala | 6 +++- .../stream/tck/InputStreamSourceTest.scala | 34 +++++++------------ 2 files changed, 17 insertions(+), 23 deletions(-) diff --git a/remote/src/test/scala/org/apache/pekko/remote/artery/tcp/ssl/RotatingKeysSSLEngineProviderSpec.scala b/remote/src/test/scala/org/apache/pekko/remote/artery/tcp/ssl/RotatingKeysSSLEngineProviderSpec.scala index 52b35e94580..a4c16e14f02 100644 --- a/remote/src/test/scala/org/apache/pekko/remote/artery/tcp/ssl/RotatingKeysSSLEngineProviderSpec.scala +++ b/remote/src/test/scala/org/apache/pekko/remote/artery/tcp/ssl/RotatingKeysSSLEngineProviderSpec.scala @@ -302,7 +302,11 @@ abstract class RotatingKeysSSLEngineProviderSpec(extraConfig: String) val ref = targetRef.getOrElse( fail(s"Timed out waiting for ActorIdentity from $toPath after $maxAttempts attempts")) ref.tell("ping-1", senderOnSource.ref) - senderOnSource.expectMsg("ping-1") + senderOnSource.fishForMessage(identifyTimeout, hint = "waiting for ping-1") { + case "ping-1" => true + case ActorIdentity(_, _) => false + case _ => false + } } /** diff --git a/stream-tests-tck/src/test/scala/org/apache/pekko/stream/tck/InputStreamSourceTest.scala b/stream-tests-tck/src/test/scala/org/apache/pekko/stream/tck/InputStreamSourceTest.scala index dc604f2bb6d..1c2dad2ba4a 100644 --- a/stream-tests-tck/src/test/scala/org/apache/pekko/stream/tck/InputStreamSourceTest.scala +++ b/stream-tests-tck/src/test/scala/org/apache/pekko/stream/tck/InputStreamSourceTest.scala @@ -15,8 +15,6 @@ package org.apache.pekko.stream.tck import java.io.InputStream -import com.typesafe.config.{ Config, ConfigFactory } - import org.apache.pekko import pekko.stream.ActorAttributes import pekko.stream.scaladsl.{ Sink, StreamConverters } @@ -26,29 +24,21 @@ import org.reactivestreams.Publisher class InputStreamSourceTest extends PekkoPublisherVerification[ByteString] { - // The test's InputStream is CPU-busy (each read() returns a fresh byte without - // blocking or yielding), so cancellation propagation through `take(elements)` can - // be slow when `pekko.test.stream-dispatcher` is virtualized in JDK 21+/JDK 25 - // nightly runs. Extend the actor-system-terminate phase timeout so that the - // CoordinatedShutdown phase has enough headroom for any lingering flow actors to - // finish terminating before the outer shutdown await fires. - override def additionalConfig: Config = - ConfigFactory - .parseString("pekko.coordinated-shutdown.phases.actor-system-terminate.timeout = 30 s") - .withFallback(super.additionalConfig) - def createPublisher(elements: Long): Publisher[ByteString] = { + def inputStream = new InputStream { + private var remaining = elements + override def read(): Int = { + if (remaining > 0) { + remaining -= 1 + 1 + } else -1 + } + } + StreamConverters - .fromInputStream(() => - new InputStream { - @volatile var num = 0 - override def read(): Int = { - num += 1 - num - } - }) + // The TCK counts publisher elements, so emit one byte per ByteString. + .fromInputStream(() => inputStream, chunkSize = 1) .withAttributes(ActorAttributes.dispatcher("pekko.test.stream-dispatcher")) - .take(elements) .runWith(Sink.asPublisher(false)) } } From ac996bc47b0218d8668654dda9cf5f28ff63ebc8 Mon Sep 17 00:00:00 2001 From: He-Pin Date: Wed, 27 May 2026 18:20:32 +0800 Subject: [PATCH 3/3] test: clean up TLS edge case streams after early cancellation Motivation: Recent nightly builds repeatedly time out in TlsGraphStageEdgeCasesSpec under JDK 25 while running early-cancellation TLS edge cases. Modification: Have collectExactly materialize a KillSwitch and watch stream termination, then shut the stream down after collecting the expected bytes so repeated early-cancellation tests do not leave previous TLS materializations draining in the same actor system. Result: The TLS edge case suite no longer accumulates lingering TlsGraphStage/headOptionSink actors during repeated early-cancellation checks. Tests: - scalafmt --mode diff-ref=origin/main - scalafmt --list --mode diff-ref=origin/main - git diff --check - sbt with JDK 25 nightly-style virtualized test-stream-dispatcher flags: stream-tests / Test / testOnly org.apache.pekko.stream.io.TlsGraphStageEdgeCasesSpec References: None - nightly-builds.yml failure analysis --- .../io/TlsGraphStageEdgeCasesSpec.scala | 26 +++++++++++++------ 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/io/TlsGraphStageEdgeCasesSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/io/TlsGraphStageEdgeCasesSpec.scala index 2e9e3216bb1..718867a9b83 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/io/TlsGraphStageEdgeCasesSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/io/TlsGraphStageEdgeCasesSpec.scala @@ -68,14 +68,24 @@ class TlsGraphStageEdgeCasesSpec extends StreamSpec(TlsGraphStageEdgeCasesSpec.c private def collectExactly( stream: Source[SslTlsInbound, NotUsed], expectedBytes: Int, - timeout: FiniteDuration = 30.seconds): ByteString = - Await.result( - stream - .collect { case SessionBytes(_, b) => b } - .scan(ByteString.empty)(_ ++ _) - .dropWhile(_.size < expectedBytes) - .runWith(Sink.headOption), - timeout.dilated).getOrElse(ByteString.empty) + timeout: FiniteDuration = 30.seconds): ByteString = { + val ((killSwitch, streamDone), result) = stream + .viaMat(KillSwitches.single)(Keep.right) + .watchTermination(Keep.both) + .toMat( + Flow[SslTlsInbound] + .collect { case SessionBytes(_, b) => b } + .scan(ByteString.empty)(_ ++ _) + .dropWhile(_.size < expectedBytes) + .toMat(Sink.headOption)(Keep.right))(Keep.both) + .run() + + try Await.result(result, timeout.dilated).getOrElse(ByteString.empty) + finally { + killSwitch.shutdown() + Await.result(streamDone, timeout.dilated) + } + } "TlsGraphStage" must {