Redis 提供了几种可以实现定时任务的方法,包括使用 Sorted Set 实现延时任务队列,使用 Stream 的消费组功能,以及使用 Redis 的 Lua 脚本。
- 使用 Sorted Set 实现延时任务队列
Redis 的 Sorted Set 是根据分数进行排序的,我们可以把要执行的任务以分数的形式存入 Sorted Set,分数就是任务执行的时间戳。然后用一个循环不断地检查 Sorted Set 的第一个任务是否到期,如果到期就执行并移除。
import time
import redis
client = redis.StrictRedis()
# 添加任务
def add_job(job_id, execute_time):
client.zadd('jobs', {job_id: execute_time})
# 执行任务
def run_jobs():
while True:
# 获取当前时间
now = time.time()
# 获取所有分数小于当前时间的任务
jobs = client.zrangebyscore('jobs', 0, now)
for job in jobs:
# 执行任务
print(f'Execute job: {job}')
# 移除已经执行的任务
client.zrem('jobs', job)
time.sleep(1)
# 示例:添加一个将在10秒后执行的任务
add_job(b'job1', time.time() + 10)
# 启动循环执行任务
run_jobs()
- 使用 Stream 的消费组功能
Redis 的 Stream 是一个消息流,可以用来实现定时任务队列。我们可以把任务以消息的形式放入 Stream,然后使用消费组来处理这些消息。
import time
import redis
client = redis.StrictRedis()
# 添加任务
def add_job(job_id, delay):
client.xadd('jobs', {'job_id': job_id, 'delay': delay})
# 执行任务
def run_jobs():
while True:
# 获取消息
messages = client.xrange('jobs', '-', '+', count=1)
for message in messages:
# 获取消息ID和内容
id, message = message
# 解析消息内容得到延时
delay = int(message[b'delay'])
if delay <= 0:
# 执行任务
print(f'Execute job: {message[b"job_id"]}')
# 移除已处理的消息
client.xdel('jobs', id)
time.sleep(1)
# 示例:添加一个将在10秒后执行的任务
add_job(b'job1', 10)
# 启动循环执行任务
run_jobs()
- 使用 Lua 脚本
我们也可以使用 Redis 的 Lua 脚本来实现定时任务。Lua 脚本可以原子性地执行多条 Redis 命令,非常适合实现定时任务。
import time
import redis
client = redis.StrictRedis()
# Lua脚本
script = """
local tasks = redis.call('zrangebyscore', KEYS[1], 0, ARGV[1], 'withscores')
for i = 1, #tasks, 2 do
local task_id = tasks[i]
local execute_time = tasks[i + 1]
if execute_time < ARGV[1] then
redis.call('zrem', KEYS[1], task_id)
return task_id
end
end
return false
"""