Redis与RabbitMQ配合使用多线程(多消费者)处理消息
import pika
import time
from threading import Thread
def process_message(channel, method, properties, body):
# 这里处理接收到的消息
print(f"Received message: {body}")
def consume():
# 建立到RabbitMQ的连接
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
# 声明队列
channel.queue_declare(queue='hello')
# 定义回调函数处理消息
channel.basic_consume(queue='hello', on_message_callback=process_message)
# 开始循环监听消息
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
# 启动多线程消费者
for i in range(5): # 假设我们启动5个线程
t = Thread(target=consume)
t.start()
这段代码定义了一个consume
函数,它建立到RabbitMQ的连接,声明一个队列,并使用多线程来并行处理接收到的消息。每个线程都会进入一个无限循环,等待并处理消息。这种模式可以有效提高消息处理的吞吐量,特别是在处理时间密集型任务时。
评论已关闭