分布式 - 消息队列Kafka:Kafka 消费者的消费位移
warning:
这篇文章距离上次修改已过192天,其中的内容可能已经有所变动。
在Kafka中,消费者的消费位移(consumer offset)是指消费者消费的特定分区中的消息的位置。Kafka为每个消费者维护消费位移,以便在消费者失败时能够恢复消费状态。
消费位移可以通过以下方式提交:
- 自动提交:消费者定期自动提交消费位移到Kafka。
- 手动提交:消费者在适当的时候手动调用API提交消费位移。
以下是一个简单的示例,展示了如何在手动提交模式下使用Java Kafka消费者API提交消费位移:
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import java.util.Arrays;
import java.util.Properties;
public class ManualOffsetCommitExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); // 关闭自动提交
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("topic"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
// 处理消息
System.out.println(record.value());
}
// 在处理完所有消息之后手动提交位移
consumer.commitSync();
}
} finally {
consumer.close();
}
}
}
在这个例子中,我们关闭了消费者的自动位移提交,并在处理完所有消息后通过调用commitSync()
方法手动同步提交位移。这确保了在消费者崩溃的情况下,我们不会丢失任何消息。
评论已关闭