Redis实现消息队列及延迟队列
import time
import redis
# 连接Redis
r = redis.Redis(host='localhost', port=6379, db=0)
# 定义一个生产者函数,用于将任务推送到Redis队列中
def produce(queue_name, data, delay_seconds=0):
if delay_seconds > 0:
# 使用时间戳加上延迟时长作为score,将任务存储在延迟队列中
r.zadd('delays', {data: time.time() + delay_seconds}, queue_name)
else:
# 直接将任务推送到普通队列
r.rpush(queue_name, data)
# 定义一个消费者函数,用于从Redis队列中取出任务并处理
def consume(queue_name):
while True:
# 检查延迟队列是否有待处理的任务
tasks_with_delay = r.zrangebyscore('delays', 0, time.time(), withscores=True)
for task, score in tasks_with_delay:
# 移除已经到期的任务
r.zrem('delays', task)
# 将任务移动到普通队列
r.rpush(queue_name, task)
# 处理普通队列的任务
while True:
task = r.lpop(queue_name)
if task:
print(f"处理任务: {task}")
# 这里可以添加处理任务的代码
else:
break
time.sleep(1) # 休眠一段时间,然后重新检查
# 使用示例
produce('myqueue', 'task1', delay_seconds=5) # 延迟5秒执行的任务
produce('myqueue', 'task2', delay_seconds=0) # 立即执行的任务
# 启动消费者
consume('myqueue')
这段代码展示了如何使用Redis实现一个带有延迟功能的消息队列。它使用了Redis的有序集合(sorted set)来存储带有延迟的任务,并通过定时任务检查和处理这些任务。这个例子简单明了,并且可以作为实现更复杂消息队列逻辑的基础。
评论已关闭