Redis实现消息队列与延时消息队列
import redis
import time
# 连接Redis
r = redis.Redis(host='localhost', port=6379, db=0)
# 生产者:发送延时消息
def send_delay_message(queue_name, message, delay_seconds):
timestamp = int(time.time()) + delay_seconds
queue_name_key = f'{queue_name}:{timestamp}'
r.set(queue_name_key, message)
r.zadd('delayqueue', {queue_name_key: timestamp})
# 消费者:消费延时消息
def consume_delay_message(queue_name):
while True:
# 获取当前时间的前一秒,确保可以获取到已到期的消息
now = int(time.time()) - 1
# 获取到期的消息
messages = r.zrangebyscore('delayqueue', 0, now)
for message in messages:
# 移除延时队列中已经处理过的消息
r.zrem('delayqueue', message)
# 处理消息
print(f'Consuming message: {r.get(message).decode()}')
r.delete(message)
time.sleep(5) # 每5秒检查一次延时消息
# 使用示例
send_delay_message('myqueue', 'Hello, Redis!', 10) # 10秒后消费
consume_delay_message('myqueue') # 消费延时消息
这段代码展示了如何使用Redis实现延时消息队列。首先,我们定义了连接Redis的函数和发送延时消息的函数。发送延时消息时,我们计算了消息需要被处理的时间戳,并将消息存储在Redis的一个哈希表中,同时将它的键添加到有序集合(sorted set)中,以便可以按时间戳排序。然后,我们定义了一个消费延时消息的函数,它会循环检查是否有消息已经到期,如果有,就处理这些消息。这个示例简单地打印了消费的消息,实际应用中可以替换为相应的业务逻辑。
评论已关闭