diff --git a/remote/src/test/scala/org/apache/pekko/remote/artery/tcp/ssl/RotatingKeysSSLEngineProviderSpec.scala b/remote/src/test/scala/org/apache/pekko/remote/artery/tcp/ssl/RotatingKeysSSLEngineProviderSpec.scala index 52b35e94580..a4c16e14f02 100644 --- a/remote/src/test/scala/org/apache/pekko/remote/artery/tcp/ssl/RotatingKeysSSLEngineProviderSpec.scala +++ b/remote/src/test/scala/org/apache/pekko/remote/artery/tcp/ssl/RotatingKeysSSLEngineProviderSpec.scala @@ -302,7 +302,11 @@ abstract class RotatingKeysSSLEngineProviderSpec(extraConfig: String) val ref = targetRef.getOrElse( fail(s"Timed out waiting for ActorIdentity from $toPath after $maxAttempts attempts")) ref.tell("ping-1", senderOnSource.ref) - senderOnSource.expectMsg("ping-1") + senderOnSource.fishForMessage(identifyTimeout, hint = "waiting for ping-1") { + case "ping-1" => true + case ActorIdentity(_, _) => false + case _ => false + } } /** diff --git a/stream-tests-tck/src/test/scala/org/apache/pekko/stream/tck/InputStreamSourceTest.scala b/stream-tests-tck/src/test/scala/org/apache/pekko/stream/tck/InputStreamSourceTest.scala index bc54abba439..1c2dad2ba4a 100644 --- a/stream-tests-tck/src/test/scala/org/apache/pekko/stream/tck/InputStreamSourceTest.scala +++ b/stream-tests-tck/src/test/scala/org/apache/pekko/stream/tck/InputStreamSourceTest.scala @@ -25,17 +25,20 @@ import org.reactivestreams.Publisher class InputStreamSourceTest extends PekkoPublisherVerification[ByteString] { def createPublisher(elements: Long): Publisher[ByteString] = { + def inputStream = new InputStream { + private var remaining = elements + override def read(): Int = { + if (remaining > 0) { + remaining -= 1 + 1 + } else -1 + } + } + StreamConverters - .fromInputStream(() => - new InputStream { - @volatile var num = 0 - override def read(): Int = { - num += 1 - num - } - }) + // The TCK counts publisher elements, so emit one byte per ByteString. + .fromInputStream(() => inputStream, chunkSize = 1) .withAttributes(ActorAttributes.dispatcher("pekko.test.stream-dispatcher")) - .take(elements) .runWith(Sink.asPublisher(false)) } } diff --git a/stream-tests/src/test/scala/org/apache/pekko/stream/io/TlsGraphStageEdgeCasesSpec.scala b/stream-tests/src/test/scala/org/apache/pekko/stream/io/TlsGraphStageEdgeCasesSpec.scala index 2e9e3216bb1..718867a9b83 100644 --- a/stream-tests/src/test/scala/org/apache/pekko/stream/io/TlsGraphStageEdgeCasesSpec.scala +++ b/stream-tests/src/test/scala/org/apache/pekko/stream/io/TlsGraphStageEdgeCasesSpec.scala @@ -68,14 +68,24 @@ class TlsGraphStageEdgeCasesSpec extends StreamSpec(TlsGraphStageEdgeCasesSpec.c private def collectExactly( stream: Source[SslTlsInbound, NotUsed], expectedBytes: Int, - timeout: FiniteDuration = 30.seconds): ByteString = - Await.result( - stream - .collect { case SessionBytes(_, b) => b } - .scan(ByteString.empty)(_ ++ _) - .dropWhile(_.size < expectedBytes) - .runWith(Sink.headOption), - timeout.dilated).getOrElse(ByteString.empty) + timeout: FiniteDuration = 30.seconds): ByteString = { + val ((killSwitch, streamDone), result) = stream + .viaMat(KillSwitches.single)(Keep.right) + .watchTermination(Keep.both) + .toMat( + Flow[SslTlsInbound] + .collect { case SessionBytes(_, b) => b } + .scan(ByteString.empty)(_ ++ _) + .dropWhile(_.size < expectedBytes) + .toMat(Sink.headOption)(Keep.right))(Keep.both) + .run() + + try Await.result(result, timeout.dilated).getOrElse(ByteString.empty) + finally { + killSwitch.shutdown() + Await.result(streamDone, timeout.dilated) + } + } "TlsGraphStage" must {