利用Redis Stream实现一个可靠的消息队列
import redis
# 连接到Redis
r = redis.Redis(host='localhost', port=6379, db=0)
# 生产者:将消息推送到Redis Stream
def produce_message(stream_key, message):
# XADD命令将一个消息添加到stream的末尾
r.xadd(stream_key, {'message': message})
# 消费者:从Redis Stream拉取并处理消息
def consume_message(stream_key):
# XREADBLOCK命令会阻塞,直到有新消息可读或超时
messages = r.xread({stream_key: '$'}, block=5)
for message in messages:
# 处理消息
stream, message_id, data = message
print(f"Received: {data['message']}")
# 示例使用
stream_key = 'my-stream'
produce_message(stream_key, 'Hello, Redis Stream!')
consume_message(stream_key)
这段代码展示了如何使用Redis Stream进行消息的生产和消费。produce_message
函数负责将消息推送到指定的stream中,而consume_message
函数则会从stream中拉取并处理消息。这个例子简单地打印出接收到的消息,实际应用中你可能需要替换处理逻辑以符合实际需求。
评论已关闭