import redis
import time
# 连接Redis
r = redis.Redis(host='localhost', port=6379, db=0)
# 使用zset实现延时任务队列
def delay_queue(queue_name, delay_time):
while True:
# 获取当前时间戳
now = time.time()
# 获取zset中所有score小于当前时间戳的元素
items = r.zrangebyscore(queue_name, 0, now)
for item in items:
# 移除已经处理的元素
r.zrem(queue_name, item)
# 处理业务逻辑
print(f"处理任务: {item}")
# 休眠一段时间后继续循环
time.sleep(delay_time)
# 示例:使用延时队列
queue_name = "delay_queue"
delay_time = 1 # 单位为秒
# 添加任务到延时队列
task1 = f"task1_{time.time()}"
r.zadd(queue_name, {task1: time.time() + 10}) # 10秒后处理
task2 = f"task2_{time.time()}"
r.zadd(queue_name, {task2: time.time() + 20}) # 20秒后处理
# 运行延时队列的处理循环
delay_queue(queue_name, delay_time)
这段代码首先连接到Redis,然后定义了一个delay_queue
函数,该函数使用一个无限循环来检查是否有需要立即处理的任务。如果有,它会处理这些任务,并从zset中移除它们。这个例子展示了如何使用Redis的zset数据结构来实现一个延时任务队列。