Spring Kafka常用配置详解
@Configuration
@EnableKafka
public class KafkaProducerConfig {
@Value("${kafka.bootstrap-servers}")
private String bootstrapServers;
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
// 设置连接的Bootstrap服务器
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// 设置值序列化器
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
// 其他可选配置
props.put(ProducerConfig.ACKS_CONFIG, "all"); // 确保所有副本都接收到消息
props.put(ProducerConfig.RETRIES_CONFIG, 3); // 重试次数
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); // 批处理大小
props.put(ProducerConfig.LINGER_MS_CONFIG, 1); // 批处理延迟发送时间
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432); // 记录缓冲区大小
return props;
}
@Bean
public ProducerFactory<String, Object> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
这个配置类定义了Spring Kafka的生产者设置,包括Bootstrap服务器地址、序列化器设置、确认级别、重试策略和批处理大小等。通过@Value
注解,可以将配置文件中的值注入到配置中,实现了配置和代码的解耦。producerConfigs
方法创建了配置的Map,producerFactory
方法使用这些配置创建了ProducerFactory
,最后kafkaTemplate
方法提供了一个KafkaTemplate
的Bean,用于发送消息。
评论已关闭