From 5a01c80b9e6923e8b216e47b882f73b160b680e4 Mon Sep 17 00:00:00 2001 From: Javier Aliaga Date: Mon, 30 Mar 2026 13:34:45 +0200 Subject: [PATCH] test: add integration test reproducing streaming subscription disruption with RabbitMQ (#1701) Adds an integration test using RabbitMQ testcontainer that reproduces the issue where stopping and restarting a streaming subscription causes all other active subscriptions to disconnect with Exception (504). The Dapr sidecar reuses the topic name as the RabbitMQ consumer tag, and RabbitMQ rejects the duplicate tag with a connection-level error that cascades to all consumers on the shared connection. This test currently fails, confirming the bug is in the Dapr runtime. Signed-off-by: Javier Aliaga Signed-off-by: Javier Aliaga --- sdk-tests/pom.xml | 5 + .../pubsub/stream/DaprPubSubStreamIT.java | 216 ++++++++++++++++++ 2 files changed, 221 insertions(+) create mode 100644 sdk-tests/src/test/java/io/dapr/it/testcontainers/pubsub/stream/DaprPubSubStreamIT.java diff --git a/sdk-tests/pom.xml b/sdk-tests/pom.xml index c7cb44734..3aaf00856 100644 --- a/sdk-tests/pom.xml +++ b/sdk-tests/pom.xml @@ -212,6 +212,11 @@ toxiproxy test + + org.testcontainers + rabbitmq + test + diff --git a/sdk-tests/src/test/java/io/dapr/it/testcontainers/pubsub/stream/DaprPubSubStreamIT.java b/sdk-tests/src/test/java/io/dapr/it/testcontainers/pubsub/stream/DaprPubSubStreamIT.java new file mode 100644 index 000000000..4fc42ef92 --- /dev/null +++ b/sdk-tests/src/test/java/io/dapr/it/testcontainers/pubsub/stream/DaprPubSubStreamIT.java @@ -0,0 +1,216 @@ +/* + * Copyright 2025 The Dapr Authors + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.dapr.it.testcontainers.pubsub.stream; + +import io.dapr.client.DaprClient; +import io.dapr.client.DaprClientBuilder; +import io.dapr.client.DaprPreviewClient; +import io.dapr.config.Properties; +import io.dapr.testcontainers.Component; +import io.dapr.testcontainers.DaprContainer; +import io.dapr.testcontainers.DaprLogLevel; +import io.dapr.utils.TypeRef; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.Network; +import org.testcontainers.containers.RabbitMQContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; +import reactor.core.Disposable; + +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.atomic.AtomicReference; + +import static io.dapr.it.Retry.callWithRetry; +import static io.dapr.it.testcontainers.ContainerConstants.DAPR_RUNTIME_IMAGE_TAG; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; + +/** + * Integration test for streaming subscription stop/restart behavior with RabbitMQ. + * + *

Reproduces #1701: + * When a streaming subscription is stopped and restarted, the Dapr sidecar reuses the + * topic name as the RabbitMQ consumer tag. If the previous consumer hasn't been fully + * cleaned up, RabbitMQ rejects the duplicate tag with a connection-level error (504) + * that kills ALL consumers on that connection — not just the one being restarted.

+ */ +@Testcontainers +@Tag("testcontainers") +public class DaprPubSubStreamIT { + + private static final Logger LOG = LoggerFactory.getLogger(DaprPubSubStreamIT.class); + private static final Network DAPR_NETWORK = Network.newNetwork(); + private static final String PUBSUB_NAME = "pubsub"; + private static final int NUM_MESSAGES = 10; + + @Container + private static final RabbitMQContainer RABBITMQ = new RabbitMQContainer( + DockerImageName.parse("rabbitmq:3.7.25-management-alpine")) + .withExposedPorts(5672) + .withNetworkAliases("rabbitmq") + .withNetwork(DAPR_NETWORK); + + @Container + private static final DaprContainer DAPR_CONTAINER = new DaprContainer(DAPR_RUNTIME_IMAGE_TAG) + .withAppName("stream-test-app") + .withNetwork(DAPR_NETWORK) + .withDaprLogLevel(DaprLogLevel.DEBUG) + .withLogConsumer(outputFrame -> LOG.info(outputFrame.getUtf8String())) + .withComponent(new Component(PUBSUB_NAME, "pubsub.rabbitmq", "v1", Map.of( + "connectionString", "amqp://guest:guest@rabbitmq:5672", + "user", "guest", + "password", "guest" + ))) + .dependsOn(RABBITMQ); + + private DaprClientBuilder createClientBuilder() { + return new DaprClientBuilder() + .withPropertyOverride(Properties.HTTP_ENDPOINT, "http://localhost:" + DAPR_CONTAINER.getHttpPort()) + .withPropertyOverride(Properties.GRPC_ENDPOINT, "http://localhost:" + DAPR_CONTAINER.getGrpcPort()); + } + + /** + * Verifies that stopping and restarting a streaming subscription does not + * disrupt other active streaming subscriptions. + * + *

Steps: + *

    + *
  1. Start streaming subscriptions on topic-one and topic-two
  2. + *
  3. Publish messages and verify both subscriptions receive them
  4. + *
  5. Start a subscription on topic-three, then stop it
  6. + *
  7. Restart the subscription on topic-three
  8. + *
  9. Publish more messages to topic-one and topic-two
  10. + *
  11. Verify topic-one and topic-two still receive messages without errors
  12. + *
+ */ + @Test + public void restartingSubscriptionShouldNotDisruptOtherSubscriptions() throws Exception { + var topicOne = "stream-topic-one"; + var topicTwo = "stream-topic-two"; + var topicThree = "stream-topic-three"; + var runId = UUID.randomUUID().toString(); + + try (DaprClient client = createClientBuilder().build(); + DaprPreviewClient previewClient = (DaprPreviewClient) client) { + + Set topicOneMessages = Collections.synchronizedSet(new HashSet<>()); + Set topicTwoMessages = Collections.synchronizedSet(new HashSet<>()); + AtomicReference topicOneError = new AtomicReference<>(); + AtomicReference topicTwoError = new AtomicReference<>(); + + // Step 1: Start streaming subscriptions on topic-one and topic-two + Disposable sub1 = previewClient.subscribeToTopic(PUBSUB_NAME, topicOne, TypeRef.STRING) + .doOnNext(msg -> { + if (msg.contains(runId)) { + topicOneMessages.add(msg); + LOG.info("topic-one received: {}", msg); + } + }) + .doOnError(e -> { + LOG.error("topic-one error: {}", e.getMessage()); + topicOneError.set(e); + }) + .subscribe(); + + Disposable sub2 = previewClient.subscribeToTopic(PUBSUB_NAME, topicTwo, TypeRef.STRING) + .doOnNext(msg -> { + if (msg.contains(runId)) { + topicTwoMessages.add(msg); + LOG.info("topic-two received: {}", msg); + } + }) + .doOnError(e -> { + LOG.error("topic-two error: {}", e.getMessage()); + topicTwoError.set(e); + }) + .subscribe(); + + // Give subscriptions time to establish + Thread.sleep(2000); + + // Step 2: Publish messages and verify both receive them + for (int i = 0; i < NUM_MESSAGES; i++) { + client.publishEvent(PUBSUB_NAME, topicOne, String.format("pre-%s-%d", runId, i)).block(); + client.publishEvent(PUBSUB_NAME, topicTwo, String.format("pre-%s-%d", runId, i)).block(); + } + + callWithRetry(() -> { + LOG.info("topic-one has {} messages, topic-two has {} messages", + topicOneMessages.size(), topicTwoMessages.size()); + assertEquals(NUM_MESSAGES, topicOneMessages.size(), "topic-one should receive all pre-restart messages"); + assertEquals(NUM_MESSAGES, topicTwoMessages.size(), "topic-two should receive all pre-restart messages"); + }, 30000); + + // Step 3: Start and stop a subscription on topic-three + Disposable sub3 = previewClient.subscribeToTopic(PUBSUB_NAME, topicThree, TypeRef.STRING) + .subscribe(); + Thread.sleep(2000); + sub3.dispose(); + LOG.info("Disposed topic-three subscription"); + Thread.sleep(2000); + + // Step 4: Restart the subscription on topic-three + Set topicThreeMessages = Collections.synchronizedSet(new HashSet<>()); + Disposable sub3Restarted = previewClient.subscribeToTopic(PUBSUB_NAME, topicThree, TypeRef.STRING) + .doOnNext(msg -> { + if (msg.contains(runId)) { + topicThreeMessages.add(msg); + LOG.info("topic-three received: {}", msg); + } + }) + .subscribe(); + Thread.sleep(2000); + + // Step 5: Publish more messages to all topics + topicOneMessages.clear(); + topicTwoMessages.clear(); + + for (int i = 0; i < NUM_MESSAGES; i++) { + client.publishEvent(PUBSUB_NAME, topicOne, String.format("post-%s-%d", runId, i)).block(); + client.publishEvent(PUBSUB_NAME, topicTwo, String.format("post-%s-%d", runId, i)).block(); + client.publishEvent(PUBSUB_NAME, topicThree, String.format("post-%s-%d", runId, i)).block(); + } + + // Step 6: Verify topic-one and topic-two still work after topic-three was restarted + callWithRetry(() -> { + LOG.info("Post-restart: topic-one has {} messages, topic-two has {} messages, topic-three has {} messages", + topicOneMessages.size(), topicTwoMessages.size(), topicThreeMessages.size()); + assertEquals(NUM_MESSAGES, topicOneMessages.size(), + "topic-one should still receive messages after topic-three restart"); + assertEquals(NUM_MESSAGES, topicTwoMessages.size(), + "topic-two should still receive messages after topic-three restart"); + assertEquals(NUM_MESSAGES, topicThreeMessages.size(), + "topic-three should receive messages after restart"); + }, 30000); + + // Verify no errors occurred on the active subscriptions + assertNull(topicOneError.get(), + "topic-one should not have received any errors, but got: " + topicOneError.get()); + assertNull(topicTwoError.get(), + "topic-two should not have received any errors, but got: " + topicTwoError.get()); + + // Cleanup + sub1.dispose(); + sub2.dispose(); + sub3Restarted.dispose(); + } + } +}