在Redis中实现延时消息通常有两种方式:使用SORT
命令结合ZSET
,或者使用Stream
数据类型。
以下是使用SORT
和ZSET
实现的示例代码:
import redis
import time
# 连接Redis
r = redis.Redis(host='localhost', port=6379, db=0)
# 消息添加到zset,分数为当前时间戳加上延时时长
delay_time = 10 # 延时10秒
message_id = "message:123" # 消息ID
r.zadd("delayed_queue", {message_id: time.time() + delay_time})
# 使用SORT命令获取到期的消息
while True:
# 使用SORT命令获取当前时间之前的所有元素
messages = r.zrangebyscore("delayed_queue", 0, time.time())
for msg in messages:
print(f"处理消息: {msg}")
# 处理完消息后,从zset中移除
r.zrem("delayed_queue", msg)
time.sleep(1) # 每隔1秒检查一次
使用Stream
实现延时消息的示例代码:
import redis
import time
# 连接Redis
r = redis.Redis(host='localhost', port=6379, db=0)
# 创建Stream
stream_key = "delayed_stream"
r.xgroup_create(stream_key, group_name="delayed_group", id="$", mkstream=True)
# 消息添加到Stream,并设置延时时长
delay_time = 10 # 延时10秒
message_id = r.xadd(stream_key, {"message": "Hello", "delay_time": delay_time})
# 消费者从Stream中读取消息
while True:
messages = r.xreadgroup(
group="delayed_group", consumer="consumer1", streams={stream_key: ">"}
)
for msg in messages:
stream, data = msg
for id, message in data:
print(f"处理消息: {message['message']}")
# 处理完消息后,Ack
r.xack(stream_key, "delayed_group", id)
# 可以在这里根据消息中的延时时长进行sleep
time.sleep(1) # 每隔1秒检查一次
以上代码示例展示了如何使用Redis的不同特性来实现延时消息。在实际应用中,你可能需要添加额外的逻辑,比如消息的可靠传递和重试机制。