Spring Kafka——基于 Spring Kafka 实现动态管理 Kafka 连接和 topic 的监听
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaAdmin;
import org.springframework.stereotype.Service;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
@Service
public class KafkaTopicManager {
@Autowired
private KafkaAdmin kafkaAdmin;
public void createTopic(String topicName, int numPartitions, short replicationFactor) throws ExecutionException, InterruptedException {
AdminClient adminClient = kafkaAdmin.getAdminClient();
NewTopic topic = new NewTopic(topicName, numPartitions, replicationFactor);
CreateTopicsResult result = adminClient.createTopics(Collections.singleton(topic));
result.all().get(); // 等待操作完成
}
public void deleteTopic(String topicName) throws ExecutionException, InterruptedException {
AdminClient adminClient = kafkaAdmin.getAdminClient();
adminClient.deleteTopics(Collections.singleton(topicName)).all().get();
}
public Map<String, Boolean> checkTopics(String... topics) throws ExecutionException, InterruptedException {
AdminClient adminClient = kafkaAdmin.getAdminClient();
Map<String, Boolean> topicsStatus = new HashMap<>();
// 检查 topic 是否存在的逻辑
// ...
return topicsStatus;
}
}
这个代码实例展示了如何使用Spring Kafka的KafkaAdmin
类来创建和删除Kafka主题。createTopic
方法接受主题名称、分区数和副本因子,并使用KafkaAdmin
客户端创建新主题。deleteTopic
方法则用于删除指定名称的主题。checkTopics
方法用于检查一系列主题是否存在,并返回一个包含每个主题状态的映射。注意,这些方法中的createTopics
和deleteTopics
调用是异步的,因此使用get()
方法等待操作完成。
评论已关闭