kafka配置多个消费者groupid kafka多个消费者消费同一个partition(java)

warning: 这篇文章距离上次修改已过253天,其中的内容可能已经有所变动。

在Kafka中,如果多个消费者属于同一个消费者组(同一个group.id),则它们会共享该主题的所有分区。Kafka通过分配分区来实现这一点,以确保每个分区只由一个消费者实例消费。

以下是一个使用Java和kafka-clients库的简单示例,演示如何配置多个消费者来消费同一个主题的不同分区。




import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import java.util.Arrays;
import java.util.Properties;
 
public class KafkaConsumerExample {
    public static void main(String[] args) {
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("group.id", "test-group"); // 设置消费者组ID
        props.put("enable.auto.commit", "true");
        props.put("auto.commit.interval.ms", "1000");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
 
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Arrays.asList("test-topic")); // 订阅主题
 
        final int minBatchSize = 1;
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
 
            for (ConsumerRecord<String, String> record : records) {
                System.out.printf("Offset: %d, Partition: %d, Value: %s\n", record.offset(), record.partition(), record.value());
            }
        }
    }
}

在这个例子中,我们创建了一个Kafka消费者实例,并将其配置为属于test-group消费者组。然后订阅了主题test-topicpoll方法用于从Kafka获取消息,并且这个过程是循环的,因此该消费者会持续地从分配给它的分区中拉取消息。

如果你想要多个消费者实例共同消费同一主题,你需要确保:

  1. 每个消费者实例都有相同的group.id
  2. 主题的分区数量大于消费者实例的数量。

如果你运行多个这样的消费者实例,它们将平均分摊主题的分区,每个实例处理一部分分区。如果新的消费者实例加入到同一个组中,它们会自动获取新的分区来消费。如果消费者实例的数量超过分区数量,则多出的实例将不会接收到任何消息,除非它们属于不同的消费者组。

最后修改于:2024年08月09日 12:58

评论已关闭

推荐阅读

DDPG 模型解析,附Pytorch完整代码
2024年11月24日
DQN 模型解析,附Pytorch完整代码
2024年11月24日
AIGC实战——Transformer模型
2024年12月01日
Socket TCP 和 UDP 编程基础(Python)
2024年11月30日
python , tcp , udp
如何使用 ChatGPT 进行学术润色?你需要这些指令
2024年12月01日
AI
最新 Python 调用 OpenAi 详细教程实现问答、图像合成、图像理解、语音合成、语音识别(详细教程)
2024年11月24日
ChatGPT 和 DALL·E 2 配合生成故事绘本
2024年12月01日
omegaconf,一个超强的 Python 库!
2024年11月24日
【视觉AIGC识别】误差特征、人脸伪造检测、其他类型假图检测
2024年12月01日
[超级详细]如何在深度学习训练模型过程中使用 GPU 加速
2024年11月29日
Python 物理引擎pymunk最完整教程
2024年11月27日
MediaPipe 人体姿态与手指关键点检测教程
2024年11月27日
深入了解 Taipy:Python 打造 Web 应用的全面教程
2024年11月26日
基于Transformer的时间序列预测模型
2024年11月25日
Python在金融大数据分析中的AI应用(股价分析、量化交易)实战
2024年11月25日
AIGC Gradio系列学习教程之Components
2024年12月01日
Python3 `asyncio` — 异步 I/O,事件循环和并发工具
2024年11月30日
llama-factory SFT系列教程:大模型在自定义数据集 LoRA 训练与部署
2024年12月01日
Python 多线程和多进程用法
2024年11月24日
Python socket详解,全网最全教程
2024年11月27日