实战解析耳听八方的Redis拓展应用——Stream
# 导入必要的模块
from redis import Redis
# 连接Redis实例
redis_client = Redis(host='localhost', port=6379, db=0)
# 定义一个处理消息的函数
def handle_message(message):
print(f"接收到消息: {message}")
# 使用XREADGROUP读取消费组的消息,并处理
group_name = "my-group"
consumer_name = "my-consumer"
stream_name = "mystream"
# 创建消费组,如果已存在则忽略
redis_client.xgroup_create(stream_name, group_name, mkstream=True)
# 持续从消费组读取消息
while True:
messages = redis_client.xreadgroup(group_name, consumer_name, {stream_name: ">"}, count=1)
for stream_name, message in messages:
# 获取消息ID和内容
message_id, message_data = message
# 处理消息
handle_message(message_data)
这段代码演示了如何使用xreadgroup
命令从Redis的Stream中读取消费组的消息,并处理这些消息。代码中使用了xgroup_create
来创建一个消费组,如果该消费组已经存在,则mkstream
参数会被忽略。然后,代码进入一个循环中,不断地从消费组中读取消息,并调用handle_message
函数来处理这些消息。
评论已关闭