springboot的kafka动态工具类(动态创建topic、监听和监听方法)
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.config.TopicBuilder;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.stereotype.Component;
@Component
public class KafkaDynamicUtil {
@Autowired
private KafkaAdmin kafkaAdmin;
/**
* 动态创建Topic
*
* @param topicName 主题名称
* @param numPartitions 分区数
* @param replicationFactor 副本因子
*/
public void createTopic(String topicName, int numPartitions, short replicationFactor) {
NewTopic topic = new NewTopic(topicName, numPartitions, replicationFactor);
AdminClient adminClient = kafkaAdmin.getAdminClient();
adminClient.createTopics(List.of(topic));
adminClient.close();
}
/**
* 监听Kafka消息
*
* @param messageListener 消息监听器
* @param topic 主题名称
*/
public void listen(MessageListener<String, String> messageListener, String topic) {
ContainerProperties containerProps = new ContainerProperties(topic);
containerProps.setMessageListener(messageListener);
KafkaMessageListenerContainer<String, String> container =
new KafkaMessageListenerContainer<>(kafkaAdmin.getConsumerFactory(), containerProps);
container.start();
}
/**
* 监听并处理Kafka消息的方法
*
* @param message 消息内容
*/
public void processMessage(String message) {
// 处理消息的逻辑
System.out.println("Received message in processMessage: " + message);
}
}
这个代码实例提供了一个KafkaDynamicUtil工具类,其中包含了创建Topic、启动监听器以及处理消息的方法。这个工具类可以用于动态管理Kafka主题和消息的接收。在实际使用时,你需要根据自己的项目配置和消息处理逻辑进行调整。
评论已关闭