diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/io/FileSourceSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/io/FileSourceSpec.scala index 8f6a417568c..17da5ead664 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/io/FileSourceSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/io/FileSourceSpec.scala @@ -24,6 +24,7 @@ import scala.concurrent.duration._ import com.google.common.jimfs.{ Configuration, Jimfs } import org.apache.pekko +import pekko.actor.ActorRef import pekko.stream._ import pekko.stream.IOResult._ import pekko.stream.impl.{ PhasedFusingActorMaterializer, StreamSupervisor } @@ -81,6 +82,23 @@ class FileSourceSpec extends StreamSpec(UnboundedMailboxConfig) { f } + def fileSourceChildren(): Set[ActorRef] = { + SystemMaterializer(system).materializer + .asInstanceOf[PhasedFusingActorMaterializer] + .supervisor + .tell(StreamSupervisor.GetChildren, testActor) + expectMsgType[Children].children.filter(_.path.toString contains "fileSource").toSet + } + + def expectNewFileSource(before: Set[ActorRef]): ActorRef = + awaitAssert { + val newChildren = fileSourceChildren().diff(before) + withClue(newChildren) { + newChildren.size shouldBe 1 + newChildren.head + } + } + "FileSource" must { "read contents from a file" in { val chunkSize = 512 @@ -258,30 +276,32 @@ class FileSourceSpec extends StreamSpec(UnboundedMailboxConfig) { } "use dedicated blocking-io-dispatcher by default" in { + val before = fileSourceChildren() val p = FileIO.fromPath(manyLines).runWith(TestSink()) + val ref = expectNewFileSource(before) - SystemMaterializer(system).materializer - .asInstanceOf[PhasedFusingActorMaterializer] - .supervisor - .tell(StreamSupervisor.GetChildren, testActor) - val ref = expectMsgType[Children].children.find(_.path.toString contains "fileSource").get try assertDispatcher(ref, ActorAttributes.IODispatcher.dispatcher) - finally p.cancel() + finally { + watch(ref) + p.cancel() + expectTerminated(ref) + } } "allow overriding the dispatcher using Attributes" in { + val before = fileSourceChildren() val p = FileIO .fromPath(manyLines) .addAttributes(ActorAttributes.dispatcher("pekko.actor.default-dispatcher")) .runWith(TestSink()) + val ref = expectNewFileSource(before) - SystemMaterializer(system).materializer - .asInstanceOf[PhasedFusingActorMaterializer] - .supervisor - .tell(StreamSupervisor.GetChildren, testActor) - val ref = expectMsgType[Children].children.find(_.path.toString contains "fileSource").get try assertDispatcher(ref, "pekko.actor.default-dispatcher") - finally p.cancel() + finally { + watch(ref) + p.cancel() + expectTerminated(ref) + } } "not signal onComplete more than once" in {