Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import scala.concurrent.duration._
import com.google.common.jimfs.{ Configuration, Jimfs }

import org.apache.pekko
import pekko.actor.ActorRef
import pekko.stream._
import pekko.stream.IOResult._
import pekko.stream.impl.{ PhasedFusingActorMaterializer, StreamSupervisor }
Expand Down Expand Up @@ -81,6 +82,23 @@ class FileSourceSpec extends StreamSpec(UnboundedMailboxConfig) {
f
}

def fileSourceChildren(): Set[ActorRef] = {
SystemMaterializer(system).materializer
.asInstanceOf[PhasedFusingActorMaterializer]
.supervisor
.tell(StreamSupervisor.GetChildren, testActor)
expectMsgType[Children].children.filter(_.path.toString contains "fileSource").toSet
}

def expectNewFileSource(before: Set[ActorRef]): ActorRef =
awaitAssert {
val newChildren = fileSourceChildren().diff(before)
withClue(newChildren) {
newChildren.size shouldBe 1
newChildren.head
}
}

"FileSource" must {
"read contents from a file" in {
val chunkSize = 512
Expand Down Expand Up @@ -258,30 +276,32 @@ class FileSourceSpec extends StreamSpec(UnboundedMailboxConfig) {
}

"use dedicated blocking-io-dispatcher by default" in {
val before = fileSourceChildren()
val p = FileIO.fromPath(manyLines).runWith(TestSink())
val ref = expectNewFileSource(before)

SystemMaterializer(system).materializer
.asInstanceOf[PhasedFusingActorMaterializer]
.supervisor
.tell(StreamSupervisor.GetChildren, testActor)
val ref = expectMsgType[Children].children.find(_.path.toString contains "fileSource").get
try assertDispatcher(ref, ActorAttributes.IODispatcher.dispatcher)
finally p.cancel()
finally {
watch(ref)
p.cancel()
expectTerminated(ref)
}
}

"allow overriding the dispatcher using Attributes" in {
val before = fileSourceChildren()
val p = FileIO
.fromPath(manyLines)
.addAttributes(ActorAttributes.dispatcher("pekko.actor.default-dispatcher"))
.runWith(TestSink())
val ref = expectNewFileSource(before)

SystemMaterializer(system).materializer
.asInstanceOf[PhasedFusingActorMaterializer]
.supervisor
.tell(StreamSupervisor.GetChildren, testActor)
val ref = expectMsgType[Children].children.find(_.path.toString contains "fileSource").get
try assertDispatcher(ref, "pekko.actor.default-dispatcher")
finally p.cancel()
finally {
watch(ref)
p.cancel()
expectTerminated(ref)
}
}

"not signal onComplete more than once" in {
Expand Down