【实战】Spring Cloud Stream3.0 整合RocketMq
在Spring Cloud Stream中整合RocketMQ,首先需要添加相关依赖,并配置RocketMQ的Binder。以下是一个简化的例子:
- 添加依赖到你的
pom.xml
:
<dependencies>
<!-- Spring Cloud Stream -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
<!-- 其他依赖... -->
</dependencies>
- 配置application.yml文件:
spring:
cloud:
stream:
rocketmq:
binder:
name-server: 127.0.0.1:9876 # RocketMQ NameServer地址
bindings:
input:
destination: test-topic # 输入主题
content-type: text/plain
group: test-consumer-group
output:
destination: test-topic # 输出主题
content-type: text/plain
group: test-producer-group
- 创建接收和发送消息的接口:
public interface MyStream {
String INPUT = "input";
String OUTPUT = "output";
@Input(INPUT)
SubscribableChannel input();
@Output(OUTPUT)
MessageChannel output();
}
- 发送和接收消息的服务:
@EnableBinding(MyStream.class)
public class MessageService {
@Autowired
private MyStream myStream;
public void sendMessage(String message) {
myStream.output().send(MessageBuilder.withPayload(message).build());
}
@StreamListener(MyStream.INPUT)
public void receiveMessage(String payload) {
System.out.println("Received: " + payload);
}
}
- 启动类添加@EnableBinding注解:
@SpringBootApplication
public class StreamRocketmqApplication {
public static void main(String[] args) {
SpringApplication.run(StreamRocketmqApplication.class, args);
}
}
以上代码展示了如何在Spring Cloud Stream中使用RocketMQ。你需要替换掉NameServer地址和主题,并确保RocketMQ服务器运行在相应的地址。这个例子中,MessageService
类包含了发送消息到RocketMQ和接收消息的逻辑。通过MyStream
接口,你可以定义输入和输出通道的名称。
评论已关闭