kafka配置多个消费者groupid kafka多个消费者消费同一个partition(java)
warning:
这篇文章距离上次修改已过253天,其中的内容可能已经有所变动。
在Kafka中,如果多个消费者属于同一个消费者组(同一个group.id
),则它们会共享该主题的所有分区。Kafka通过分配分区来实现这一点,以确保每个分区只由一个消费者实例消费。
以下是一个使用Java和kafka-clients库的简单示例,演示如何配置多个消费者来消费同一个主题的不同分区。
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.util.Arrays;
import java.util.Properties;
public class KafkaConsumerExample {
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group"); // 设置消费者组ID
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test-topic")); // 订阅主题
final int minBatchSize = 1;
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("Offset: %d, Partition: %d, Value: %s\n", record.offset(), record.partition(), record.value());
}
}
}
}
在这个例子中,我们创建了一个Kafka消费者实例,并将其配置为属于test-group
消费者组。然后订阅了主题test-topic
。poll
方法用于从Kafka获取消息,并且这个过程是循环的,因此该消费者会持续地从分配给它的分区中拉取消息。
如果你想要多个消费者实例共同消费同一主题,你需要确保:
- 每个消费者实例都有相同的
group.id
。 - 主题的分区数量大于消费者实例的数量。
如果你运行多个这样的消费者实例,它们将平均分摊主题的分区,每个实例处理一部分分区。如果新的消费者实例加入到同一个组中,它们会自动获取新的分区来消费。如果消费者实例的数量超过分区数量,则多出的实例将不会接收到任何消息,除非它们属于不同的消费者组。
评论已关闭