要在Spring Boot中集成带有SASL/PLAIN认证的Kafka,你需要在application.properties
或application.yml
配置文件中配置Kafka的相关属性。
以下是一个配置示例:
application.yml
配置:
spring:
kafka:
bootstrap-servers: your-kafka-broker:9092
consumer:
group-id: test-group
auto-offset-reset: earliest
properties:
sasl:
mechanism: PLAIN
security:
protocol: SASL_PLAINTEXT
producer:
value-serializer: org.apache.kafka.common.serialization.StringSerializer
key-serializer: org.apache.kafka.common.serialization.StringSerializer
listener:
concurrency: 3
jaas:
config: org.apache.kafka.common.security.plain.PlainLoginModule required username="your-username" password="your-password";
application.properties
配置:
spring.kafka.bootstrap-servers=your-kafka-broker:9092
spring.kafka.consumer.group-id=test-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.properties.sasl.mechanism=PLAIN
spring.kafka.consumer.properties.security.protocol=SASL_PLAINTEXT
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.listener.concurrency=3
spring.kafka.jaas.options.username=your-username
spring.kafka.jaas.options.password=your-password
spring.kafka.jaas.control-flag=org.apache.kafka.common.security.plain.PlainLoginModule
确保替换your-kafka-broker
, your-username
, 和 your-password
为你的Kafka集群的实际地址和认证信息。
然后,你可以使用@KafkaListener
注解来创建消费者,使用@KafkaTemplate
来发送消息。
例如,创建一个消费者:
@KafkaListener(topics = "your-topic", groupId = "test-group")
public void listen(String message) {
System.out.println("Received message in group test-group: " + message);
}
发送消息:
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String topic, String message) {
kafkaTemplate.send(topic, message);
}
确保你的项目中包含了Spring Kafka依赖:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>你的Spring Kafka版本</version>
</dependency>
以上代码提供了Spring Boot集成Kafka的基本配置和使用方法,并展示了如何创建消费者和生产者。