diff --git a/stream-tests-tck/src/test/scala/org/apache/pekko/stream/tck/InputStreamSourceTest.scala b/stream-tests-tck/src/test/scala/org/apache/pekko/stream/tck/InputStreamSourceTest.scala index bc54abba439..1c2dad2ba4a 100644 --- a/stream-tests-tck/src/test/scala/org/apache/pekko/stream/tck/InputStreamSourceTest.scala +++ b/stream-tests-tck/src/test/scala/org/apache/pekko/stream/tck/InputStreamSourceTest.scala @@ -25,17 +25,20 @@ import org.reactivestreams.Publisher class InputStreamSourceTest extends PekkoPublisherVerification[ByteString] { def createPublisher(elements: Long): Publisher[ByteString] = { + def inputStream = new InputStream { + private var remaining = elements + override def read(): Int = { + if (remaining > 0) { + remaining -= 1 + 1 + } else -1 + } + } + StreamConverters - .fromInputStream(() => - new InputStream { - @volatile var num = 0 - override def read(): Int = { - num += 1 - num - } - }) + // The TCK counts publisher elements, so emit one byte per ByteString. + .fromInputStream(() => inputStream, chunkSize = 1) .withAttributes(ActorAttributes.dispatcher("pekko.test.stream-dispatcher")) - .take(elements) .runWith(Sink.asPublisher(false)) } }