Springboot集成kafka高级应用实战
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
import java.util.Map;
@Configuration
@EnableKafka
public class KafkaProducerConfig {
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
// 可以在这里添加更多的配置属性
return props;
}
@Bean
public ProducerFactory<String, String> producerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
这段代码定义了一个配置类,其中包含了Kafka生产者的配置和KafkaTemplate的定义。这个配置类使用@Configuration
注解标注,表明这是一个配置类。@EnableKafka
注解启用了Spring Kafka的功能。producerConfigs
方法定义了生产者的配置属性,包括Kafka集群地址和序列化器。producerFactory
方法创建了一个生产者工厂,它使用了前面定义的配置。最后,kafkaTemplate
方法创建了一个KafkaTemplate实例,它使用了定义好的生产者工厂。这个KafkaTemplate可以在其他Spring beans中注入并使用,以发送消息到Kafka。
评论已关闭