kafka配置多个消费者groupid kafka多个消费者消费同一个partition(java)
    		       		warning:
    		            这篇文章距离上次修改已过451天,其中的内容可能已经有所变动。
    		        
        		                
                在Kafka中,如果多个消费者属于同一个消费者组(同一个group.id),则它们会共享该主题的所有分区。Kafka通过分配分区来实现这一点,以确保每个分区只由一个消费者实例消费。
以下是一个使用Java和kafka-clients库的简单示例,演示如何配置多个消费者来消费同一个主题的不同分区。
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.util.Arrays;
import java.util.Properties;
 
public class KafkaConsumerExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test-group"); // 设置消费者组ID
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
 
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("test-topic")); // 订阅主题
 
        final int minBatchSize = 1;
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
 
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("Offset: %d, Partition: %d, Value: %s\n", record.offset(), record.partition(), record.value());
            }
        }
    }
}在这个例子中,我们创建了一个Kafka消费者实例,并将其配置为属于test-group消费者组。然后订阅了主题test-topic。poll方法用于从Kafka获取消息,并且这个过程是循环的,因此该消费者会持续地从分配给它的分区中拉取消息。
如果你想要多个消费者实例共同消费同一主题,你需要确保:
- 每个消费者实例都有相同的
group.id。 - 主题的分区数量大于消费者实例的数量。
 
如果你运行多个这样的消费者实例,它们将平均分摊主题的分区,每个实例处理一部分分区。如果新的消费者实例加入到同一个组中,它们会自动获取新的分区来消费。如果消费者实例的数量超过分区数量,则多出的实例将不会接收到任何消息,除非它们属于不同的消费者组。
评论已关闭