diff --git a/rocketmq-v5-client-spring-boot-samples/pom.xml b/rocketmq-v5-client-spring-boot-samples/pom.xml
index ae931911..d8559c9a 100644
--- a/rocketmq-v5-client-spring-boot-samples/pom.xml
+++ b/rocketmq-v5-client-spring-boot-samples/pom.xml
@@ -22,7 +22,7 @@
org.apache.rocketmq
rocketmq-v5-client-spring-boot-samples
pom
- 2.3.2-SNAPSHOT
+ 2.3.6-SNAPSHOT
rocketmq-v5-client-spring-boot-samples
rocketmq-v5-client-spring-boot-samples
@@ -35,7 +35,9 @@
rocketmq-v5-client-producer-simple-demo
rocketmq-v5-client-consumer-simple-demo
rocketmq-v5-client-consumer-push-simple-demo
-
+ rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo
+ rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo
+
1.8
diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-acl-demo/pom.xml b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-acl-demo/pom.xml
index a73027b6..cc600607 100644
--- a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-acl-demo/pom.xml
+++ b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-acl-demo/pom.xml
@@ -21,7 +21,7 @@
org.apache.rocketmq
rocketmq-v5-client-spring-boot-samples
- 2.3.2-SNAPSHOT
+ 2.3.6-SNAPSHOT
rocketmq-v5-client-consume-acl-demo
diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-demo/pom.xml b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-demo/pom.xml
index 0b2c891b..2b5d46a8 100644
--- a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-demo/pom.xml
+++ b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-demo/pom.xml
@@ -21,7 +21,7 @@
org.apache.rocketmq
rocketmq-v5-client-spring-boot-samples
- 2.3.2-SNAPSHOT
+ 2.3.6-SNAPSHOT
rocketmq-v5-client-consume-demo
diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo/pom.xml b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo/pom.xml
new file mode 100644
index 00000000..af13654f
--- /dev/null
+++ b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo/pom.xml
@@ -0,0 +1,36 @@
+
+
+
+ 4.0.0
+
+ org.apache.rocketmq
+ rocketmq-v5-client-spring-boot-samples
+ 2.3.6-SNAPSHOT
+
+
+ rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo
+
+
+ 8
+ 8
+ UTF-8
+
+
+
\ No newline at end of file
diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo/src/main/java/org/apache/rocketmq/springboot/ExtRocketMQTemplate.java b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo/src/main/java/org/apache/rocketmq/springboot/ExtRocketMQTemplate.java
new file mode 100644
index 00000000..6911b9ef
--- /dev/null
+++ b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo/src/main/java/org/apache/rocketmq/springboot/ExtRocketMQTemplate.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.rocketmq.springboot;
+
+import org.apache.rocketmq.client.annotation.ExtConsumerResetConfiguration;
+import org.apache.rocketmq.client.core.RocketMQClientTemplate;
+
+@ExtConsumerResetConfiguration(subscriptionExpressions = {
+ @ExtConsumerResetConfiguration.FilterExpression(topic = "demo-topic", tag = "tagA", filterExpressionType = "tag"),
+ @ExtConsumerResetConfiguration.FilterExpression(topic = "demo-topic2", tag = "tagB", filterExpressionType = "tag")
+})
+public class ExtRocketMQTemplate extends RocketMQClientTemplate {
+}
\ No newline at end of file
diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo/src/main/java/org/apache/rocketmq/springboot/V5SimpleConsumerConsumerApplication.java b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo/src/main/java/org/apache/rocketmq/springboot/V5SimpleConsumerConsumerApplication.java
new file mode 100644
index 00000000..bd933772
--- /dev/null
+++ b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo/src/main/java/org/apache/rocketmq/springboot/V5SimpleConsumerConsumerApplication.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.rocketmq.springboot;
+
+import org.apache.rocketmq.client.apis.message.MessageView;
+import org.apache.rocketmq.client.core.RocketMQClientTemplate;
+import org.springframework.boot.CommandLineRunner;
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+import javax.annotation.Resource;
+import java.time.Duration;
+import java.util.List;
+
+@SpringBootApplication
+public class V5SimpleConsumerConsumerApplication implements CommandLineRunner {
+ @Resource
+ private RocketMQClientTemplate rocketMQClientTemplate;
+
+ @Resource
+ private ExtRocketMQTemplate extRocketMQTemplate;
+
+ public static void main(String[] args) {
+ SpringApplication.run(V5SimpleConsumerConsumerApplication.class, args);
+ }
+
+ @Override
+ public void run(String... args) throws Exception {
+ while (true){
+ List messageList = extRocketMQTemplate.receive(10, Duration.ofSeconds(10));
+ System.out.println(messageList);
+
+ messageList = rocketMQClientTemplate.receive(10, Duration.ofSeconds(10));
+ System.out.println(messageList);
+ }
+ }
+}
diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo/src/main/resources/application.properties b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo/src/main/resources/application.properties
new file mode 100644
index 00000000..d625fdb4
--- /dev/null
+++ b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo/src/main/resources/application.properties
@@ -0,0 +1,25 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+rocketmq.simple-consumer.endpoints=localhost:8080
+rocketmq.simple-consumer.consumer-group=test-group
+rocketmq.simple-consumer.subscription-expressions.demo-topic.tag=tagA
+rocketmq.simple-consumer.subscription-expressions.demo-topic.filter-expression-type=tag
+rocketmq.simple-consumer.subscription-expressions.demo-topic2.tag=tagB
+rocketmq.simple-consumer.subscription-expressions.demo-topic2.filter-expression-type=tag
+#rocketmq.simple-consumer.access-key=
+#rocketmq.simple-consumer.secret-key=
+#rocketmq.simple-consumer.namespace=
+
diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consumer-push-simple-demo/pom.xml b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consumer-push-simple-demo/pom.xml
index 1a20a74c..b67eeead 100644
--- a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consumer-push-simple-demo/pom.xml
+++ b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consumer-push-simple-demo/pom.xml
@@ -22,7 +22,7 @@
org.apache.rocketmq
rocketmq-v5-client-spring-boot-samples
- 2.3.2-SNAPSHOT
+ 2.3.6-SNAPSHOT
rocketmq-v5-client-consumer-push-simple-demo
diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consumer-simple-demo/pom.xml b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consumer-simple-demo/pom.xml
index c997133f..8008c7ec 100644
--- a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consumer-simple-demo/pom.xml
+++ b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consumer-simple-demo/pom.xml
@@ -22,7 +22,7 @@
org.apache.rocketmq
rocketmq-v5-client-spring-boot-samples
- 2.3.2-SNAPSHOT
+ 2.3.6-SNAPSHOT
rocketmq-v5-client-consumer-simple-demo
diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-acl-demo/pom.xml b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-acl-demo/pom.xml
index 22b190cd..cd97b8bf 100644
--- a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-acl-demo/pom.xml
+++ b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-acl-demo/pom.xml
@@ -21,7 +21,7 @@
org.apache.rocketmq
rocketmq-v5-client-spring-boot-samples
- 2.3.2-SNAPSHOT
+ 2.3.6-SNAPSHOT
rocketmq-v5-client-producer-acl-demo
diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-demo/pom.xml b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-demo/pom.xml
index 6c23b307..a4e155a5 100644
--- a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-demo/pom.xml
+++ b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-demo/pom.xml
@@ -21,7 +21,7 @@
org.apache.rocketmq
rocketmq-v5-client-spring-boot-samples
- 2.3.2-SNAPSHOT
+ 2.3.6-SNAPSHOT
rocketmq-v5-client-producer-demo
diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-simple-demo/pom.xml b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-simple-demo/pom.xml
index b6769604..e72039fb 100644
--- a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-simple-demo/pom.xml
+++ b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-simple-demo/pom.xml
@@ -22,7 +22,7 @@
org.apache.rocketmq
rocketmq-v5-client-spring-boot-samples
- 2.3.2-SNAPSHOT
+ 2.3.6-SNAPSHOT
rocketmq-v5-client-producer-simple-demo
diff --git a/rocketmq-v5-client-spring-boot/pom.xml b/rocketmq-v5-client-spring-boot/pom.xml
index 202b7256..e4d910af 100644
--- a/rocketmq-v5-client-spring-boot/pom.xml
+++ b/rocketmq-v5-client-spring-boot/pom.xml
@@ -93,6 +93,10 @@
junit
test
+
+ com.alibaba
+ fastjson
+
diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/ExtConsumerResetConfiguration.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/ExtConsumerResetConfiguration.java
index f0b942f5..6212735b 100644
--- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/ExtConsumerResetConfiguration.java
+++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/ExtConsumerResetConfiguration.java
@@ -91,4 +91,28 @@
* The namespace of consumer.
*/
String namespace() default "";
+
+ /**
+ * subscribing to multiple topics
+ */
+ FilterExpression[] subscriptionExpressions() default {};
+
+ @Retention(RetentionPolicy.RUNTIME)
+ @Target({})
+ @interface FilterExpression {
+ /**
+ * Topic name of consumer.
+ */
+ String topic();
+
+ /**
+ * Tag of consumer.
+ */
+ String tag() default "*";
+
+ /**
+ * The type of filter expression
+ */
+ String filterExpressionType() default "tag";
+ }
}
diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ExtConsumerResetConfiguration.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ExtConsumerResetConfiguration.java
index a501e64e..46bb399b 100644
--- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ExtConsumerResetConfiguration.java
+++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ExtConsumerResetConfiguration.java
@@ -16,6 +16,7 @@
*/
package org.apache.rocketmq.client.autoconfigure;
+import com.alibaba.fastjson.JSON;
import org.apache.rocketmq.client.support.RocketMQMessageConverter;
import org.apache.rocketmq.client.support.RocketMQUtil;
import org.apache.rocketmq.client.apis.ClientConfiguration;
@@ -40,7 +41,7 @@
import org.springframework.util.StringUtils;
import java.time.Duration;
-import java.util.Collections;
+import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;
@@ -110,30 +111,44 @@ private SimpleConsumerInfo createConsumer(
SimpleConsumerBuilder simpleConsumerBuilder) {
RocketMQProperties.SimpleConsumer simpleConsumer = rocketMQProperties.getSimpleConsumer();
String consumerGroupName = resolvePlaceholders(annotation.consumerGroup(), simpleConsumer.getConsumerGroup());
- String topicName = resolvePlaceholders(annotation.topic(), simpleConsumer.getTopic());
String accessKey = resolvePlaceholders(annotation.accessKey(), simpleConsumer.getAccessKey());
String secretKey = resolvePlaceholders(annotation.secretKey(), simpleConsumer.getSecretKey());
String endPoints = resolvePlaceholders(annotation.endpoints(), simpleConsumer.getEndpoints());
String namespace = resolvePlaceholders(annotation.namespace(), simpleConsumer.getNamespace());
- String tag = resolvePlaceholders(annotation.tag(), simpleConsumer.getTag());
- String filterExpressionType = resolvePlaceholders(annotation.filterExpressionType(), simpleConsumer.getFilterExpressionType());
Duration requestTimeout = Duration.ofSeconds(annotation.requestTimeout());
int awaitDuration = annotation.awaitDuration();
Boolean sslEnabled = simpleConsumer.isSslEnabled();
- Assert.hasText(topicName, "[topic] must not be null");
ClientConfiguration clientConfiguration = RocketMQUtil.createClientConfiguration(accessKey, secretKey, endPoints, requestTimeout, sslEnabled, namespace);
- FilterExpression filterExpression = RocketMQUtil.createFilterExpression(tag, filterExpressionType);
Duration duration = Duration.ofSeconds(awaitDuration);
simpleConsumerBuilder.setClientConfiguration(clientConfiguration);
if (StringUtils.hasLength(consumerGroupName)) {
simpleConsumerBuilder.setConsumerGroup(consumerGroupName);
}
simpleConsumerBuilder.setAwaitDuration(duration);
- if (Objects.nonNull(filterExpression)) {
- simpleConsumerBuilder.setSubscriptionExpressions(Collections.singletonMap(topicName, filterExpression));
+
+ Map subscriptionExpressions = new HashMap<>();
+ org.apache.rocketmq.client.annotation.ExtConsumerResetConfiguration.FilterExpression[] filterExpressions = annotation.subscriptionExpressions();
+ if (filterExpressions.length > 0) {
+ for (org.apache.rocketmq.client.annotation.ExtConsumerResetConfiguration.FilterExpression expression : filterExpressions) {
+ Assert.hasText(expression.topic(), "[topic] must not be null");
+ FilterExpression filterExpression = RocketMQUtil.createFilterExpression(expression.tag(), expression.filterExpressionType());
+ if (Objects.nonNull(filterExpression)) {
+ subscriptionExpressions.put(expression.topic(), filterExpression);
+ }
+ }
+ } else {
+ String topicName = resolvePlaceholders(annotation.topic(), simpleConsumer.getTopic());
+ Assert.hasText(topicName, "[topic] must not be null");
+ String tag = resolvePlaceholders(annotation.tag(), simpleConsumer.getTag());
+ String filterExpressionType = resolvePlaceholders(annotation.filterExpressionType(), simpleConsumer.getFilterExpressionType());
+ FilterExpression filterExpression = RocketMQUtil.createFilterExpression(tag, filterExpressionType);
+ if (Objects.nonNull(filterExpression)) {
+ subscriptionExpressions.put(topicName, filterExpression);
+ }
}
+ simpleConsumerBuilder.setSubscriptionExpressions(subscriptionExpressions);
- return new SimpleConsumerInfo(consumerGroupName, topicName, endPoints, namespace, tag, filterExpressionType, requestTimeout, awaitDuration, sslEnabled);
+ return new SimpleConsumerInfo(consumerGroupName, endPoints, namespace, requestTimeout, awaitDuration, sslEnabled, subscriptionExpressions);
}
private String resolvePlaceholders(String text, String defaultValue) {
@@ -144,47 +159,40 @@ private String resolvePlaceholders(String text, String defaultValue) {
static class SimpleConsumerInfo {
String consumerGroup;
- String topicName;
-
String endPoints;
String namespace;
- String tag;
-
- String filterExpressionType;
-
Duration requestTimeout;
int awaitDuration;
Boolean sslEnabled;
- public SimpleConsumerInfo(String consumerGroupName, String topicName, String endPoints, String namespace,
- String tag, String filterExpressionType, Duration requestTimeout, int awaitDuration, Boolean sslEnabled) {
+ Map subscriptionExpressions;
+
+ public SimpleConsumerInfo(String consumerGroupName, String endPoints, String namespace, Duration requestTimeout,
+ int awaitDuration, Boolean sslEnabled, Map subscriptionExpressions) {
this.consumerGroup = consumerGroupName;
- this.topicName = topicName;
this.endPoints = endPoints;
this.namespace = namespace;
- this.tag = tag;
- this.filterExpressionType = filterExpressionType;
this.requestTimeout = requestTimeout;
this.awaitDuration = awaitDuration;
this.sslEnabled = sslEnabled;
+ this.subscriptionExpressions = subscriptionExpressions;
}
- @Override public String toString() {
+ @Override
+ public String toString() {
return "SimpleConsumerInfo{" +
- "consumerGroup='" + consumerGroup + '\'' +
- ", topicName='" + topicName + '\'' +
- ", endPoints='" + endPoints + '\'' +
- ", namespace='" + namespace + '\'' +
- ", tag='" + tag + '\'' +
- ", filterExpressionType='" + filterExpressionType + '\'' +
- ", requestTimeout(seconds)=" + requestTimeout.getSeconds() +
- ", awaitDuration=" + awaitDuration +
- ", sslEnabled=" + sslEnabled +
- '}';
+ "consumerGroup='" + consumerGroup + '\'' +
+ ", endPoints='" + endPoints + '\'' +
+ ", namespace='" + namespace + '\'' +
+ ", requestTimeout=" + requestTimeout +
+ ", awaitDuration=" + awaitDuration +
+ ", sslEnabled=" + sslEnabled +
+ ", subscriptionExpressions=" + JSON.toJSONString(subscriptionExpressions) +
+ '}';
}
}
}
diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQAutoConfiguration.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQAutoConfiguration.java
index 346311f2..83748903 100644
--- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQAutoConfiguration.java
+++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQAutoConfiguration.java
@@ -42,10 +42,12 @@
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.util.Assert;
+import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import java.time.Duration;
import java.util.Collections;
+import java.util.Map;
import java.util.Objects;
@Configuration
@@ -105,7 +107,6 @@ public SimpleConsumerBuilder simpleConsumerBuilder(RocketMQProperties rocketMQPr
RocketMQProperties.SimpleConsumer simpleConsumer = rocketMQProperties.getSimpleConsumer();
final ClientServiceProvider provider = ClientServiceProvider.loadService();
String consumerGroup = simpleConsumer.getConsumerGroup();
- FilterExpression filterExpression = RocketMQUtil.createFilterExpression(simpleConsumer.getTag(), simpleConsumer.getFilterExpressionType());
ClientConfiguration clientConfiguration = RocketMQUtil.createConsumerClientConfiguration(simpleConsumer);
SimpleConsumerBuilder simpleConsumerBuilder = provider.newSimpleConsumerBuilder()
.setClientConfiguration(clientConfiguration);
@@ -116,9 +117,16 @@ public SimpleConsumerBuilder simpleConsumerBuilder(RocketMQProperties rocketMQPr
if (StringUtils.hasLength(consumerGroup)) {
simpleConsumerBuilder.setConsumerGroup(consumerGroup);
}
+
// Set the subscription for the consumer.
- if (Objects.nonNull(filterExpression)) {
- simpleConsumerBuilder.setSubscriptionExpressions(Collections.singletonMap(simpleConsumer.getTopic(), filterExpression));
+ if (CollectionUtils.isEmpty(simpleConsumer.getSubscriptionExpressions())) {
+ FilterExpression filterExpression = RocketMQUtil.createFilterExpression(simpleConsumer.getTag(), simpleConsumer.getFilterExpressionType());
+ if (Objects.nonNull(filterExpression)) {
+ simpleConsumerBuilder.setSubscriptionExpressions(Collections.singletonMap(simpleConsumer.getTopic(), filterExpression));
+ }
+ } else {
+ Map subscriptionExpressions = RocketMQUtil.createSubscriptionExpressions(simpleConsumer.getSubscriptionExpressions());
+ simpleConsumerBuilder.setSubscriptionExpressions(subscriptionExpressions);
}
return simpleConsumerBuilder;
}
diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQProperties.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQProperties.java
index 8f3d4941..6e5be06c 100644
--- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQProperties.java
+++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQProperties.java
@@ -18,6 +18,8 @@
import org.springframework.boot.context.properties.ConfigurationProperties;
+import java.util.Map;
+
@SuppressWarnings("WeakerAccess")
@ConfigurationProperties(prefix = "rocketmq")
public class RocketMQProperties {
@@ -211,6 +213,11 @@ public static class SimpleConsumer {
private String namespace = "";
+ /**
+ * key is topic
+ */
+ private Map subscriptionExpressions;
+
public String getAccessKey() {
return accessKey;
}
@@ -299,6 +306,14 @@ public void setNamespace(String namespace) {
this.namespace = namespace;
}
+ public Map getSubscriptionExpressions() {
+ return subscriptionExpressions;
+ }
+
+ public void setSubscriptionExpressions(Map subscriptionExpressions) {
+ this.subscriptionExpressions = subscriptionExpressions;
+ }
+
@Override
public String toString() {
return "SimpleConsumer{" +
@@ -315,4 +330,25 @@ public String toString() {
}
}
+ public static class FilterExpression {
+ private String tag;
+
+ private String filterExpressionType;
+
+ public String getTag() {
+ return tag;
+ }
+
+ public void setTag(String tag) {
+ this.tag = tag;
+ }
+
+ public String getFilterExpressionType() {
+ return filterExpressionType;
+ }
+
+ public void setFilterExpressionType(String filterExpressionType) {
+ this.filterExpressionType = filterExpressionType;
+ }
+ }
}
diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/RocketMQUtil.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/RocketMQUtil.java
index 55f3948f..187f399c 100644
--- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/RocketMQUtil.java
+++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/RocketMQUtil.java
@@ -34,6 +34,8 @@
import java.nio.charset.Charset;
import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Objects;
public class RocketMQUtil {
@@ -184,4 +186,14 @@ public static FilterExpression createFilterExpression(String tag, String type) {
FilterExpression filterExpression = new FilterExpression(tag, filterExpressionType);
return filterExpression;
}
+
+ public static Map createSubscriptionExpressions(Map map) {
+ Map subscriptionExpressions = new HashMap<>();
+ map.forEach((topic, expression) -> {
+ FilterExpressionType filterExpressionType = "tag".equalsIgnoreCase(expression.getFilterExpressionType()) ? FilterExpressionType.TAG : FilterExpressionType.SQL92;
+ FilterExpression filterExpression = new FilterExpression(expression.getTag(), filterExpressionType);
+ subscriptionExpressions.put(topic, filterExpression);
+ });
+ return subscriptionExpressions;
+ }
}