kafka:消息发送以及消费的过程
在Kafka中,消息的生产者将消息发送到Kafka集群,而消费者从集群中读取消息。以下是使用Python的kafka-python库进行消息生产和消费的简单示例。
首先,确保安装了kafka-python库:
pip install kafka-python
生产者代码示例:
from kafka import KafkaProducer
import json
# 创建Kafka生产者
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda m: json.dumps(m).encode('ascii'))
# 发送消息
message = {"id": 1, "msg": "Hello, Kafka!"}
producer.send('test-topic', message)
producer.flush() # 确保所有消息都已发送
消费者代码示例:
from kafka import KafkaConsumer
import json
# 创建Kafka消费者
consumer = KafkaConsumer('test-topic',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my-group',
value_deserializer=lambda m: json.loads(m.decode('ascii')))
# 消费消息
for message in consumer:
print(message.value)
确保Kafka服务器正在运行并且主题test-topic
已创建。在消费者代码中,auto_offset_reset='earliest'
表示如果没有现有的偏移量,从最早的消息开始消费。enable_auto_commit=True
表示自动提交消费的偏移量。group_id
是消费者群组的标识,确保同一个群组内的消费者不会消费到同一条消息。
评论已关闭