Kafka入门
在Kafka中,我们可以使用KafkaConsumer API来读取Kafka中的数据。以下是一些基本的使用方法:
- 创建KafkaConsumer实例
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
- 订阅主题
consumer.subscribe(Collections.singletonList("topic"));
- 使用poll方法获取数据
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
}
- 关闭KafkaConsumer实例
consumer.close();
注意:上述代码只是一个简单的示例,实际使用时可能需要处理更多的异常和配置参数。例如,可以通过设置props.put("auto.offset.reset", "earliest");
来指定如果当前offset无效时,从最早的消息开始读取。
以上就是使用KafkaConsumer API的基本方法,实际应用中可能需要根据具体情况进行调整和优化。
评论已关闭