spring-boot redis stream消息队列demo-及死信简单处理
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.stream.StreamListener;
import org.springframework.data.redis.stream.Subscription;
import org.springframework.data.redis.stream.config.StreamListenerContainerFactory;
import org.springframework.data.redis.stream.listener.RedisMessageListenerContainer;
@SpringBootApplication
public class RedisStreamDemoApplication {
public static void main(String[] args) {
SpringApplication.run(RedisStreamDemoApplication.class, args);
}
@Bean
public StreamListener<String, String> processListener(StringRedisTemplate template) {
return new StreamListener<String, String>() {
@Override
public void onData(String message) {
System.out.println("Received: " + message);
// 处理业务逻辑
// ...
// 如果需要将消息放入死信队列
template.opsForStream().add("dead_letter_stream", "dead_letter", message);
}
@Override
public void onError(Throwable throwable) {
System.err.println("Error: " + throwable.getMessage());
}
};
}
@Bean
public RedisMessageListenerContainer redisContainer(RedisConnectionFactory connectionFactory,
StreamListenerContainerFactory<String, String> factory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener(processListener(new StringRedisTemplate()), "my_stream");
return container;
}
}
这个代码示例展示了如何在Spring Boot应用中使用StringRedisTemplate来监听名为"my\_stream"的Redis流消息,并在接收到消息后执行业务逻辑。如果业务逻辑处理失败,将打印错误信息。如果业务逻辑处理成功,可以选择将消息发送到死信队列"dead\_letter\_stream"中。这个示例简单明了,并且清晰地展示了如何在Spring应用中集成Redis Streams。
评论已关闭