以下是使用Spring Boot整合Kafka发送和接收消息的示例代码:
1. 添加依赖到你的 pom.xml
文件中
<dependencies>
<!-- Kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- 用于编写消息监听器 -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
2. 配置 application.properties
或 application.yml
文件
# Kafka 基础配置
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=myGroup
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
3. 发送消息
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaProducer {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
public void sendMessage(String topic, String message) {
kafkaTemplate.send(topic, message);
}
}
4. 接收消息
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
@Component
public class KafkaConsumer {
@KafkaListener(topics = "yourTopic", groupId = "myGroup")
public void listen(String message) {
System.out.println("Received message in group myGroup: " + message);
}
}
5. 使用 KafkaProducer
发送消息
@Autowired
private KafkaProducer kafkaProducer;
public void sendMessageExample() {
kafkaProducer.sendMessage("yourTopic", "Hello, Kafka!");
}
以上代码演示了如何在Spring Boot应用程序中使用KafkaTemplate
发送消息,并使用@KafkaListener
注解接收消息。记得替换配置文件中的localhost:9092
为你的Kafka服务器地址,以及将yourTopic
替换为你要监听的主题。