Redis实现延迟任务队列可以通过以下两种方式实现:
- 使用Sorted Set(有序集合)
Redis的Sorted Set是一种可以根据score进行排序的集合,我们可以把要执行的任务的ID放入Sorted Set中,并设定执行时间作为score。然后用一个循环来不断地轮询这个Sorted Set,一旦发现有任务到期,就执行之。
# 添加任务
def add_job(job_id, execute_time):
redis_conn.zadd('jobs', {job_id: execute_time})
# 执行任务
def process_jobs():
while True:
# 获取当前时间
now = time.time()
# 获取所有score小于或等于当前时间的任务
jobs = redis_conn.zrangebyscore('jobs', 0, now)
for job in jobs:
try:
# 执行任务
do_job(job)
# 从集合中移除已经执行的任务
redis_conn.zrem('jobs', job)
except Exception as e:
logger.exception(e)
# 休眠一段时间后继续执行循环
time.sleep(1)
- 使用Redis的"发布/订阅"模式
我们可以用Redis的"发布/订阅"模式来实现延迟任务队列。首先,我们定期检查是否有新的任务到期,如果有,我们就发布一个消息。然后我们需要一个订阅者在等待这个消息,一旦接收到这个消息,它就会执行任务。
import redis
import time
import threading
redis_conn = redis.StrictRedis()
def add_job(job_id, execute_time):
# 发布任务
redis_conn.set(job_id, execute_time)
redis_conn.publish('new_job', job_id)
def process_jobs():
pubsub = redis_conn.pubsub()
pubsub.subscribe('new_job')
for message in pubsub.listen():
if message['type'] == 'message':
job_id = message['data']
try:
# 执行任务
do_job(job_id)
except Exception as e:
logger.exception(e)
# 启动订阅者
threading.Thread(target=process_jobs).start()
# 添加任务
add_job('job1', time.time() + 10)
以上两种方式都可以实现Redis延迟任务队列,但是需要注意的是,这两种方式都需要一直运行来处理任务,所以通常会和一些守护进程服务结合起来使用,如supervisor等。