Spring Cloud Stream 整合 RocketMQ 的基本步骤如下:
- 在项目的pom.xml中添加Spring Cloud Stream和RocketMQ Binder的依赖:
 
<dependencies>
    <!-- Spring Cloud Stream -->
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
    </dependency>
    <!-- 其他依赖 -->
</dependencies>- 在application.yml或application.properties中配置RocketMQ的连接信息:
 
spring:
  cloud:
    stream:
      rocketmq:
        binder:
          namesrv-addr: 127.0.0.1:9876 # RocketMQ NameServer地址
      bindings:
        output:
          destination: test-topic # 指定发送到的topic
          content-type: text/plain # 设置消息类型
        input:
          destination: test-topic # 指定订阅的topic
          content-type: text/plain # 设置消息类型
          group: test-group # 设置消费组- 创建发送和接收消息的服务类:
 
@EnableBinding(value = {Processor.class})
public class RocketMQService {
 
    @Autowired
    private MessageChannel output;
 
    public void send(String content) {
        output.send(MessageBuilder.withPayload(content).build());
    }
 
    @StreamListener(Processor.INPUT)
    public void receive(String payload) {
        System.out.println("Received: " + payload);
    }
}在上述代码中,@EnableBinding(Processor.class)注解启用了Spring Cloud Stream的Processor绑定,这允许我们发送和接收消息。send方法用于发送消息,receive方法用@StreamListener注解标记,以接收消息。
确保RocketMQ的服务器正在运行并可以正常访问,然后就可以通过调用send方法发送消息,并在接收端打印出接收到的消息内容。