From 6abd814ae9c9f2d2006899eb38e996a190c95cf5 Mon Sep 17 00:00:00 2001 From: He-Pin Date: Thu, 28 May 2026 11:38:47 +0800 Subject: [PATCH] test: select the active FileSource actor in dispatcher checks Motivation: FileSourceSpec dispatcher checks can select a stale fileSource child left briefly by a previous materialization, which makes the dispatcher override assertion flaky. Modification: Capture existing fileSource children before starting each dispatcher-check stream, wait for exactly the newly materialized child, and watch it terminate after cancellation. Result: The checks assert the dispatcher of the stream started by the current test instead of a leftover child from an earlier materialization. Tests: - JDK 17 Scala 3.3.6: stream-tests / Test / testOnly org.apache.pekko.stream.io.FileSourceSpec - scalafmt --mode diff-ref=origin/main --quiet - scalafmt --list --mode diff-ref=origin/main - git diff --check References: Refs #2994 --- .../pekko/stream/io/FileSourceSpec.scala | 44 ++++++++++++++----- 1 file changed, 32 insertions(+), 12 deletions(-) diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/io/FileSourceSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/io/FileSourceSpec.scala index 8f6a417568c..17da5ead664 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/io/FileSourceSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/io/FileSourceSpec.scala @@ -24,6 +24,7 @@ import scala.concurrent.duration._ import com.google.common.jimfs.{ Configuration, Jimfs } import org.apache.pekko +import pekko.actor.ActorRef import pekko.stream._ import pekko.stream.IOResult._ import pekko.stream.impl.{ PhasedFusingActorMaterializer, StreamSupervisor } @@ -81,6 +82,23 @@ class FileSourceSpec extends StreamSpec(UnboundedMailboxConfig) { f } + def fileSourceChildren(): Set[ActorRef] = { + SystemMaterializer(system).materializer + .asInstanceOf[PhasedFusingActorMaterializer] + .supervisor + .tell(StreamSupervisor.GetChildren, testActor) + expectMsgType[Children].children.filter(_.path.toString contains "fileSource").toSet + } + + def expectNewFileSource(before: Set[ActorRef]): ActorRef = + awaitAssert { + val newChildren = fileSourceChildren().diff(before) + withClue(newChildren) { + newChildren.size shouldBe 1 + newChildren.head + } + } + "FileSource" must { "read contents from a file" in { val chunkSize = 512 @@ -258,30 +276,32 @@ class FileSourceSpec extends StreamSpec(UnboundedMailboxConfig) { } "use dedicated blocking-io-dispatcher by default" in { + val before = fileSourceChildren() val p = FileIO.fromPath(manyLines).runWith(TestSink()) + val ref = expectNewFileSource(before) - SystemMaterializer(system).materializer - .asInstanceOf[PhasedFusingActorMaterializer] - .supervisor - .tell(StreamSupervisor.GetChildren, testActor) - val ref = expectMsgType[Children].children.find(_.path.toString contains "fileSource").get try assertDispatcher(ref, ActorAttributes.IODispatcher.dispatcher) - finally p.cancel() + finally { + watch(ref) + p.cancel() + expectTerminated(ref) + } } "allow overriding the dispatcher using Attributes" in { + val before = fileSourceChildren() val p = FileIO .fromPath(manyLines) .addAttributes(ActorAttributes.dispatcher("pekko.actor.default-dispatcher")) .runWith(TestSink()) + val ref = expectNewFileSource(before) - SystemMaterializer(system).materializer - .asInstanceOf[PhasedFusingActorMaterializer] - .supervisor - .tell(StreamSupervisor.GetChildren, testActor) - val ref = expectMsgType[Children].children.find(_.path.toString contains "fileSource").get try assertDispatcher(ref, "pekko.actor.default-dispatcher") - finally p.cancel() + finally { + watch(ref) + p.cancel() + expectTerminated(ref) + } } "not signal onComplete more than once" in {