diff --git a/rocketmq-v5-client-spring-boot-samples/pom.xml b/rocketmq-v5-client-spring-boot-samples/pom.xml index ae931911..d8559c9a 100644 --- a/rocketmq-v5-client-spring-boot-samples/pom.xml +++ b/rocketmq-v5-client-spring-boot-samples/pom.xml @@ -22,7 +22,7 @@ org.apache.rocketmq rocketmq-v5-client-spring-boot-samples pom - 2.3.2-SNAPSHOT + 2.3.6-SNAPSHOT rocketmq-v5-client-spring-boot-samples rocketmq-v5-client-spring-boot-samples @@ -35,7 +35,9 @@ rocketmq-v5-client-producer-simple-demo rocketmq-v5-client-consumer-simple-demo rocketmq-v5-client-consumer-push-simple-demo - + rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo + rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo + 1.8 diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-acl-demo/pom.xml b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-acl-demo/pom.xml index a73027b6..cc600607 100644 --- a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-acl-demo/pom.xml +++ b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-acl-demo/pom.xml @@ -21,7 +21,7 @@ org.apache.rocketmq rocketmq-v5-client-spring-boot-samples - 2.3.2-SNAPSHOT + 2.3.6-SNAPSHOT rocketmq-v5-client-consume-acl-demo diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-demo/pom.xml b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-demo/pom.xml index 0b2c891b..2b5d46a8 100644 --- a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-demo/pom.xml +++ b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-demo/pom.xml @@ -21,7 +21,7 @@ org.apache.rocketmq rocketmq-v5-client-spring-boot-samples - 2.3.2-SNAPSHOT + 2.3.6-SNAPSHOT rocketmq-v5-client-consume-demo diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo/pom.xml b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo/pom.xml new file mode 100644 index 00000000..af13654f --- /dev/null +++ b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo/pom.xml @@ -0,0 +1,36 @@ + + + + 4.0.0 + + org.apache.rocketmq + rocketmq-v5-client-spring-boot-samples + 2.3.6-SNAPSHOT + + + rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo + + + 8 + 8 + UTF-8 + + + \ No newline at end of file diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo/src/main/java/org/apache/rocketmq/springboot/ExtRocketMQTemplate.java b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo/src/main/java/org/apache/rocketmq/springboot/ExtRocketMQTemplate.java new file mode 100644 index 00000000..6911b9ef --- /dev/null +++ b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo/src/main/java/org/apache/rocketmq/springboot/ExtRocketMQTemplate.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.rocketmq.springboot; + +import org.apache.rocketmq.client.annotation.ExtConsumerResetConfiguration; +import org.apache.rocketmq.client.core.RocketMQClientTemplate; + +@ExtConsumerResetConfiguration(subscriptionExpressions = { + @ExtConsumerResetConfiguration.FilterExpression(topic = "demo-topic", tag = "tagA", filterExpressionType = "tag"), + @ExtConsumerResetConfiguration.FilterExpression(topic = "demo-topic2", tag = "tagB", filterExpressionType = "tag") +}) +public class ExtRocketMQTemplate extends RocketMQClientTemplate { +} \ No newline at end of file diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo/src/main/java/org/apache/rocketmq/springboot/V5SimpleConsumerConsumerApplication.java b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo/src/main/java/org/apache/rocketmq/springboot/V5SimpleConsumerConsumerApplication.java new file mode 100644 index 00000000..bd933772 --- /dev/null +++ b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo/src/main/java/org/apache/rocketmq/springboot/V5SimpleConsumerConsumerApplication.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.rocketmq.springboot; + +import org.apache.rocketmq.client.apis.message.MessageView; +import org.apache.rocketmq.client.core.RocketMQClientTemplate; +import org.springframework.boot.CommandLineRunner; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +import javax.annotation.Resource; +import java.time.Duration; +import java.util.List; + +@SpringBootApplication +public class V5SimpleConsumerConsumerApplication implements CommandLineRunner { + @Resource + private RocketMQClientTemplate rocketMQClientTemplate; + + @Resource + private ExtRocketMQTemplate extRocketMQTemplate; + + public static void main(String[] args) { + SpringApplication.run(V5SimpleConsumerConsumerApplication.class, args); + } + + @Override + public void run(String... args) throws Exception { + while (true){ + List messageList = extRocketMQTemplate.receive(10, Duration.ofSeconds(10)); + System.out.println(messageList); + + messageList = rocketMQClientTemplate.receive(10, Duration.ofSeconds(10)); + System.out.println(messageList); + } + } +} diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo/src/main/resources/application.properties b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo/src/main/resources/application.properties new file mode 100644 index 00000000..d625fdb4 --- /dev/null +++ b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consume-simple-subscribe-muliti-topic-demo/src/main/resources/application.properties @@ -0,0 +1,25 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +rocketmq.simple-consumer.endpoints=localhost:8080 +rocketmq.simple-consumer.consumer-group=test-group +rocketmq.simple-consumer.subscription-expressions.demo-topic.tag=tagA +rocketmq.simple-consumer.subscription-expressions.demo-topic.filter-expression-type=tag +rocketmq.simple-consumer.subscription-expressions.demo-topic2.tag=tagB +rocketmq.simple-consumer.subscription-expressions.demo-topic2.filter-expression-type=tag +#rocketmq.simple-consumer.access-key= +#rocketmq.simple-consumer.secret-key= +#rocketmq.simple-consumer.namespace= + diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consumer-push-simple-demo/pom.xml b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consumer-push-simple-demo/pom.xml index 1a20a74c..b67eeead 100644 --- a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consumer-push-simple-demo/pom.xml +++ b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consumer-push-simple-demo/pom.xml @@ -22,7 +22,7 @@ org.apache.rocketmq rocketmq-v5-client-spring-boot-samples - 2.3.2-SNAPSHOT + 2.3.6-SNAPSHOT rocketmq-v5-client-consumer-push-simple-demo diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consumer-simple-demo/pom.xml b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consumer-simple-demo/pom.xml index c997133f..8008c7ec 100644 --- a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consumer-simple-demo/pom.xml +++ b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-consumer-simple-demo/pom.xml @@ -22,7 +22,7 @@ org.apache.rocketmq rocketmq-v5-client-spring-boot-samples - 2.3.2-SNAPSHOT + 2.3.6-SNAPSHOT rocketmq-v5-client-consumer-simple-demo diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-acl-demo/pom.xml b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-acl-demo/pom.xml index 22b190cd..cd97b8bf 100644 --- a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-acl-demo/pom.xml +++ b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-acl-demo/pom.xml @@ -21,7 +21,7 @@ org.apache.rocketmq rocketmq-v5-client-spring-boot-samples - 2.3.2-SNAPSHOT + 2.3.6-SNAPSHOT rocketmq-v5-client-producer-acl-demo diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-demo/pom.xml b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-demo/pom.xml index 6c23b307..a4e155a5 100644 --- a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-demo/pom.xml +++ b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-demo/pom.xml @@ -21,7 +21,7 @@ org.apache.rocketmq rocketmq-v5-client-spring-boot-samples - 2.3.2-SNAPSHOT + 2.3.6-SNAPSHOT rocketmq-v5-client-producer-demo diff --git a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-simple-demo/pom.xml b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-simple-demo/pom.xml index b6769604..e72039fb 100644 --- a/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-simple-demo/pom.xml +++ b/rocketmq-v5-client-spring-boot-samples/rocketmq-v5-client-producer-simple-demo/pom.xml @@ -22,7 +22,7 @@ org.apache.rocketmq rocketmq-v5-client-spring-boot-samples - 2.3.2-SNAPSHOT + 2.3.6-SNAPSHOT rocketmq-v5-client-producer-simple-demo diff --git a/rocketmq-v5-client-spring-boot/pom.xml b/rocketmq-v5-client-spring-boot/pom.xml index 202b7256..e4d910af 100644 --- a/rocketmq-v5-client-spring-boot/pom.xml +++ b/rocketmq-v5-client-spring-boot/pom.xml @@ -93,6 +93,10 @@ junit test + + com.alibaba + fastjson + diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/ExtConsumerResetConfiguration.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/ExtConsumerResetConfiguration.java index f0b942f5..6212735b 100644 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/ExtConsumerResetConfiguration.java +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/annotation/ExtConsumerResetConfiguration.java @@ -91,4 +91,28 @@ * The namespace of consumer. */ String namespace() default ""; + + /** + * subscribing to multiple topics + */ + FilterExpression[] subscriptionExpressions() default {}; + + @Retention(RetentionPolicy.RUNTIME) + @Target({}) + @interface FilterExpression { + /** + * Topic name of consumer. + */ + String topic(); + + /** + * Tag of consumer. + */ + String tag() default "*"; + + /** + * The type of filter expression + */ + String filterExpressionType() default "tag"; + } } diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ExtConsumerResetConfiguration.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ExtConsumerResetConfiguration.java index a501e64e..46bb399b 100644 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ExtConsumerResetConfiguration.java +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/ExtConsumerResetConfiguration.java @@ -16,6 +16,7 @@ */ package org.apache.rocketmq.client.autoconfigure; +import com.alibaba.fastjson.JSON; import org.apache.rocketmq.client.support.RocketMQMessageConverter; import org.apache.rocketmq.client.support.RocketMQUtil; import org.apache.rocketmq.client.apis.ClientConfiguration; @@ -40,7 +41,7 @@ import org.springframework.util.StringUtils; import java.time.Duration; -import java.util.Collections; +import java.util.HashMap; import java.util.Map; import java.util.Objects; import java.util.stream.Collectors; @@ -110,30 +111,44 @@ private SimpleConsumerInfo createConsumer( SimpleConsumerBuilder simpleConsumerBuilder) { RocketMQProperties.SimpleConsumer simpleConsumer = rocketMQProperties.getSimpleConsumer(); String consumerGroupName = resolvePlaceholders(annotation.consumerGroup(), simpleConsumer.getConsumerGroup()); - String topicName = resolvePlaceholders(annotation.topic(), simpleConsumer.getTopic()); String accessKey = resolvePlaceholders(annotation.accessKey(), simpleConsumer.getAccessKey()); String secretKey = resolvePlaceholders(annotation.secretKey(), simpleConsumer.getSecretKey()); String endPoints = resolvePlaceholders(annotation.endpoints(), simpleConsumer.getEndpoints()); String namespace = resolvePlaceholders(annotation.namespace(), simpleConsumer.getNamespace()); - String tag = resolvePlaceholders(annotation.tag(), simpleConsumer.getTag()); - String filterExpressionType = resolvePlaceholders(annotation.filterExpressionType(), simpleConsumer.getFilterExpressionType()); Duration requestTimeout = Duration.ofSeconds(annotation.requestTimeout()); int awaitDuration = annotation.awaitDuration(); Boolean sslEnabled = simpleConsumer.isSslEnabled(); - Assert.hasText(topicName, "[topic] must not be null"); ClientConfiguration clientConfiguration = RocketMQUtil.createClientConfiguration(accessKey, secretKey, endPoints, requestTimeout, sslEnabled, namespace); - FilterExpression filterExpression = RocketMQUtil.createFilterExpression(tag, filterExpressionType); Duration duration = Duration.ofSeconds(awaitDuration); simpleConsumerBuilder.setClientConfiguration(clientConfiguration); if (StringUtils.hasLength(consumerGroupName)) { simpleConsumerBuilder.setConsumerGroup(consumerGroupName); } simpleConsumerBuilder.setAwaitDuration(duration); - if (Objects.nonNull(filterExpression)) { - simpleConsumerBuilder.setSubscriptionExpressions(Collections.singletonMap(topicName, filterExpression)); + + Map subscriptionExpressions = new HashMap<>(); + org.apache.rocketmq.client.annotation.ExtConsumerResetConfiguration.FilterExpression[] filterExpressions = annotation.subscriptionExpressions(); + if (filterExpressions.length > 0) { + for (org.apache.rocketmq.client.annotation.ExtConsumerResetConfiguration.FilterExpression expression : filterExpressions) { + Assert.hasText(expression.topic(), "[topic] must not be null"); + FilterExpression filterExpression = RocketMQUtil.createFilterExpression(expression.tag(), expression.filterExpressionType()); + if (Objects.nonNull(filterExpression)) { + subscriptionExpressions.put(expression.topic(), filterExpression); + } + } + } else { + String topicName = resolvePlaceholders(annotation.topic(), simpleConsumer.getTopic()); + Assert.hasText(topicName, "[topic] must not be null"); + String tag = resolvePlaceholders(annotation.tag(), simpleConsumer.getTag()); + String filterExpressionType = resolvePlaceholders(annotation.filterExpressionType(), simpleConsumer.getFilterExpressionType()); + FilterExpression filterExpression = RocketMQUtil.createFilterExpression(tag, filterExpressionType); + if (Objects.nonNull(filterExpression)) { + subscriptionExpressions.put(topicName, filterExpression); + } } + simpleConsumerBuilder.setSubscriptionExpressions(subscriptionExpressions); - return new SimpleConsumerInfo(consumerGroupName, topicName, endPoints, namespace, tag, filterExpressionType, requestTimeout, awaitDuration, sslEnabled); + return new SimpleConsumerInfo(consumerGroupName, endPoints, namespace, requestTimeout, awaitDuration, sslEnabled, subscriptionExpressions); } private String resolvePlaceholders(String text, String defaultValue) { @@ -144,47 +159,40 @@ private String resolvePlaceholders(String text, String defaultValue) { static class SimpleConsumerInfo { String consumerGroup; - String topicName; - String endPoints; String namespace; - String tag; - - String filterExpressionType; - Duration requestTimeout; int awaitDuration; Boolean sslEnabled; - public SimpleConsumerInfo(String consumerGroupName, String topicName, String endPoints, String namespace, - String tag, String filterExpressionType, Duration requestTimeout, int awaitDuration, Boolean sslEnabled) { + Map subscriptionExpressions; + + public SimpleConsumerInfo(String consumerGroupName, String endPoints, String namespace, Duration requestTimeout, + int awaitDuration, Boolean sslEnabled, Map subscriptionExpressions) { this.consumerGroup = consumerGroupName; - this.topicName = topicName; this.endPoints = endPoints; this.namespace = namespace; - this.tag = tag; - this.filterExpressionType = filterExpressionType; this.requestTimeout = requestTimeout; this.awaitDuration = awaitDuration; this.sslEnabled = sslEnabled; + this.subscriptionExpressions = subscriptionExpressions; } - @Override public String toString() { + @Override + public String toString() { return "SimpleConsumerInfo{" + - "consumerGroup='" + consumerGroup + '\'' + - ", topicName='" + topicName + '\'' + - ", endPoints='" + endPoints + '\'' + - ", namespace='" + namespace + '\'' + - ", tag='" + tag + '\'' + - ", filterExpressionType='" + filterExpressionType + '\'' + - ", requestTimeout(seconds)=" + requestTimeout.getSeconds() + - ", awaitDuration=" + awaitDuration + - ", sslEnabled=" + sslEnabled + - '}'; + "consumerGroup='" + consumerGroup + '\'' + + ", endPoints='" + endPoints + '\'' + + ", namespace='" + namespace + '\'' + + ", requestTimeout=" + requestTimeout + + ", awaitDuration=" + awaitDuration + + ", sslEnabled=" + sslEnabled + + ", subscriptionExpressions=" + JSON.toJSONString(subscriptionExpressions) + + '}'; } } } diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQAutoConfiguration.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQAutoConfiguration.java index 346311f2..83748903 100644 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQAutoConfiguration.java +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQAutoConfiguration.java @@ -42,10 +42,12 @@ import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; import org.springframework.util.Assert; +import org.springframework.util.CollectionUtils; import org.springframework.util.StringUtils; import java.time.Duration; import java.util.Collections; +import java.util.Map; import java.util.Objects; @Configuration @@ -105,7 +107,6 @@ public SimpleConsumerBuilder simpleConsumerBuilder(RocketMQProperties rocketMQPr RocketMQProperties.SimpleConsumer simpleConsumer = rocketMQProperties.getSimpleConsumer(); final ClientServiceProvider provider = ClientServiceProvider.loadService(); String consumerGroup = simpleConsumer.getConsumerGroup(); - FilterExpression filterExpression = RocketMQUtil.createFilterExpression(simpleConsumer.getTag(), simpleConsumer.getFilterExpressionType()); ClientConfiguration clientConfiguration = RocketMQUtil.createConsumerClientConfiguration(simpleConsumer); SimpleConsumerBuilder simpleConsumerBuilder = provider.newSimpleConsumerBuilder() .setClientConfiguration(clientConfiguration); @@ -116,9 +117,16 @@ public SimpleConsumerBuilder simpleConsumerBuilder(RocketMQProperties rocketMQPr if (StringUtils.hasLength(consumerGroup)) { simpleConsumerBuilder.setConsumerGroup(consumerGroup); } + // Set the subscription for the consumer. - if (Objects.nonNull(filterExpression)) { - simpleConsumerBuilder.setSubscriptionExpressions(Collections.singletonMap(simpleConsumer.getTopic(), filterExpression)); + if (CollectionUtils.isEmpty(simpleConsumer.getSubscriptionExpressions())) { + FilterExpression filterExpression = RocketMQUtil.createFilterExpression(simpleConsumer.getTag(), simpleConsumer.getFilterExpressionType()); + if (Objects.nonNull(filterExpression)) { + simpleConsumerBuilder.setSubscriptionExpressions(Collections.singletonMap(simpleConsumer.getTopic(), filterExpression)); + } + } else { + Map subscriptionExpressions = RocketMQUtil.createSubscriptionExpressions(simpleConsumer.getSubscriptionExpressions()); + simpleConsumerBuilder.setSubscriptionExpressions(subscriptionExpressions); } return simpleConsumerBuilder; } diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQProperties.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQProperties.java index 8f3d4941..6e5be06c 100644 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQProperties.java +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/autoconfigure/RocketMQProperties.java @@ -18,6 +18,8 @@ import org.springframework.boot.context.properties.ConfigurationProperties; +import java.util.Map; + @SuppressWarnings("WeakerAccess") @ConfigurationProperties(prefix = "rocketmq") public class RocketMQProperties { @@ -211,6 +213,11 @@ public static class SimpleConsumer { private String namespace = ""; + /** + * key is topic + */ + private Map subscriptionExpressions; + public String getAccessKey() { return accessKey; } @@ -299,6 +306,14 @@ public void setNamespace(String namespace) { this.namespace = namespace; } + public Map getSubscriptionExpressions() { + return subscriptionExpressions; + } + + public void setSubscriptionExpressions(Map subscriptionExpressions) { + this.subscriptionExpressions = subscriptionExpressions; + } + @Override public String toString() { return "SimpleConsumer{" + @@ -315,4 +330,25 @@ public String toString() { } } + public static class FilterExpression { + private String tag; + + private String filterExpressionType; + + public String getTag() { + return tag; + } + + public void setTag(String tag) { + this.tag = tag; + } + + public String getFilterExpressionType() { + return filterExpressionType; + } + + public void setFilterExpressionType(String filterExpressionType) { + this.filterExpressionType = filterExpressionType; + } + } } diff --git a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/RocketMQUtil.java b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/RocketMQUtil.java index 55f3948f..187f399c 100644 --- a/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/RocketMQUtil.java +++ b/rocketmq-v5-client-spring-boot/src/main/java/org/apache/rocketmq/client/support/RocketMQUtil.java @@ -34,6 +34,8 @@ import java.nio.charset.Charset; import java.time.Duration; +import java.util.HashMap; +import java.util.Map; import java.util.Objects; public class RocketMQUtil { @@ -184,4 +186,14 @@ public static FilterExpression createFilterExpression(String tag, String type) { FilterExpression filterExpression = new FilterExpression(tag, filterExpressionType); return filterExpression; } + + public static Map createSubscriptionExpressions(Map map) { + Map subscriptionExpressions = new HashMap<>(); + map.forEach((topic, expression) -> { + FilterExpressionType filterExpressionType = "tag".equalsIgnoreCase(expression.getFilterExpressionType()) ? FilterExpressionType.TAG : FilterExpressionType.SQL92; + FilterExpression filterExpression = new FilterExpression(expression.getTag(), filterExpressionType); + subscriptionExpressions.put(topic, filterExpression); + }); + return subscriptionExpressions; + } }