Spring Cloud Stream 整合 RocketMQ 的基本步骤如下:
- 在项目的pom.xml中添加Spring Cloud Stream和RocketMQ Binder的依赖:
<dependencies>
<!-- Spring Cloud Stream -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
<!-- 其他依赖 -->
</dependencies>
- 在application.yml或application.properties中配置RocketMQ的连接信息:
spring:
cloud:
stream:
rocketmq:
binder:
namesrv-addr: 127.0.0.1:9876 # RocketMQ NameServer地址
bindings:
output:
destination: test-topic # 指定发送到的topic
content-type: text/plain # 设置消息类型
input:
destination: test-topic # 指定订阅的topic
content-type: text/plain # 设置消息类型
group: test-group # 设置消费组
- 创建发送和接收消息的服务类:
@EnableBinding(value = {Processor.class})
public class RocketMQService {
@Autowired
private MessageChannel output;
public void send(String content) {
output.send(MessageBuilder.withPayload(content).build());
}
@StreamListener(Processor.INPUT)
public void receive(String payload) {
System.out.println("Received: " + payload);
}
}
在上述代码中,@EnableBinding(Processor.class)
注解启用了Spring Cloud Stream的Processor绑定,这允许我们发送和接收消息。send
方法用于发送消息,receive
方法用@StreamListener
注解标记,以接收消息。
确保RocketMQ的服务器正在运行并可以正常访问,然后就可以通过调用send
方法发送消息,并在接收端打印出接收到的消息内容。