kafka,MQ消息队列
Kafka是一个分布式流处理平台,可以用于消息队列,但不仅限于消息队列。以下是一个使用Python和kafka-python库来发送和接收Kafka消息的基本示例。
首先,确保安装了kafka-python库:
pip install kafka-python
生产者(发送消息):
from kafka import KafkaProducer
import json
# 创建Kafka生产者
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
value_serializer=lambda m: json.dumps(m).encode('ascii'))
# 发送消息
message = {"id": 1, "msg": "Hello, Kafka!"}
producer.send('test-topic', message)
producer.flush() # 确保所有消息都已发送
消费者(接收消息):
from kafka import KafkaConsumer
import json
# 创建Kafka消费者
consumer = KafkaConsumer('test-topic',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my-group',
value_deserializer=lambda m: json.loads(m.decode('ascii')))
# 监听消息
for message in consumer:
print(message.value)
确保Kafka服务器正在运行并且配置正确(例如,正确的bootstrap.servers
)。以上代码片段是基本的生产者和消费者示例,实际应用中可能需要更复杂的配置和错误处理。
评论已关闭