From fbacd30e484e8e4b1ea038792ea9f2a80f22eadf Mon Sep 17 00:00:00 2001 From: He-Pin Date: Thu, 28 May 2026 11:22:55 +0800 Subject: [PATCH] test: make InputStreamSource TCK publisher finite Motivation: JDK 25 nightly builds abort InputStreamSourceTest when the TCK cancellation scenario waits for completion from an infinite, CPU-busy InputStream publisher. Modification: Make the test publisher finite and emit one byte per ByteString so the TCK element count maps directly to stream elements without relying on take(elements) to stop a busy reader. Result: InputStreamSourceTest completes under JDK 25 nightly-style virtualized stream-dispatcher settings. Tests: - JDK 25 nightly-style virtualized stream-dispatcher flags: stream-tests-tck / Test / testOnly org.apache.pekko.stream.tck.InputStreamSourceTest - scalafmt --mode diff-ref=origin/main --quiet - scalafmt --list --mode diff-ref=origin/main - git diff --check References: Refs #2994 --- .../stream/tck/InputStreamSourceTest.scala | 21 +++++++++++-------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/stream-tests-tck/src/test/scala/org/apache/pekko/stream/tck/InputStreamSourceTest.scala b/stream-tests-tck/src/test/scala/org/apache/pekko/stream/tck/InputStreamSourceTest.scala index bc54abba439..1c2dad2ba4a 100644 --- a/stream-tests-tck/src/test/scala/org/apache/pekko/stream/tck/InputStreamSourceTest.scala +++ b/stream-tests-tck/src/test/scala/org/apache/pekko/stream/tck/InputStreamSourceTest.scala @@ -25,17 +25,20 @@ import org.reactivestreams.Publisher class InputStreamSourceTest extends PekkoPublisherVerification[ByteString] { def createPublisher(elements: Long): Publisher[ByteString] = { + def inputStream = new InputStream { + private var remaining = elements + override def read(): Int = { + if (remaining > 0) { + remaining -= 1 + 1 + } else -1 + } + } + StreamConverters - .fromInputStream(() => - new InputStream { - @volatile var num = 0 - override def read(): Int = { - num += 1 - num - } - }) + // The TCK counts publisher elements, so emit one byte per ByteString. + .fromInputStream(() => inputStream, chunkSize = 1) .withAttributes(ActorAttributes.dispatcher("pekko.test.stream-dispatcher")) - .take(elements) .runWith(Sink.asPublisher(false)) } }