kafka原理五之springboot 集成批量消费
在Spring Boot中,你可以使用spring-kafka
依赖来整合Kafka,并实现批量消费。以下是一个简单的例子:
- 添加依赖到你的
pom.xml
:
<dependencies>
<!-- Spring Kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.8.0</version> <!-- 使用合适的版本 -->
</dependency>
<!-- 其他依赖 -->
</dependencies>
- 配置Kafka消费者(application.yml或application.properties):
spring:
kafka:
bootstrap-servers: localhost:9092
consumer:
group-id: test-group
auto-offset-reset: earliest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
properties:
spring:
json:
trusted:
packages: com.example.demo
- 创建消费者类:
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import java.util.List;
@Component
public class KafkaConsumer {
@KafkaListener(topics = "your-topic", groupId = "test-group", containerFactory = "batchFactory")
public void listen(List<String> messages) {
// 处理批量消息
for (String message : messages) {
// 处理单条消息
}
}
}
- 配置批量消费(BatchFactory):
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.listener.BatchMessagingMessageListenerAdapter;
import org.springframework.kafka.listener.ContainerProperties;
@Configuration
public class KafkaConsumerConfig {
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> batchFactory(
ConsumerFactory<Integer, String> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setBatchListener(true); // 设置为批量消费
f
评论已关闭