分布式 - 消息队列Kafka:Kafka 消费者消费位移的提交方式
在Kafka中,消费者通常会定期或在拉取消息后提交消费位移。Kafka消费者可以使用两种方式提交位移:自动提交和手动提交。
自动提交(默认关闭):
properties.put("enable.auto.commit", "true");
properties.put("auto.commit.interval.ms", "1000");
开启自动提交后,消费者会定期自动提交消费的位置。
手动提交:
while (true) {
// 拉取消息
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
// 处理消息
for (ConsumerRecord<String, String> record : records) {
// 消费者逻辑
}
// 手动同步提交位移
consumer.commitSync();
}
在手动提交模式下,你需要在适当的时候调用commitSync()
或commitAsync()
方法来提交位移。
手动提交更加灵活,因为你可以在处理完所有消息后再提交位移,从而保证在异常发生时不会丢失任何消息。自动提交通常用于简单的场景,不需要严格控制位移提交的时机。
评论已关闭