消息队列中间件 - 详解RabbitMQ6种模式
RabbitMQ是一种流行的开源消息队列系统,用于通信系统的可靠传递,异步处理以及资源的扩展。RabbitMQ支持多种消息模式,以下是其中的六种:
- 简单模式(Simple)
- 工作队列模式(Work Queue)
- 发布/订阅模式(Publish/Subscribe)
- 路由模式(Routing)
- 主题模式(Topics)
- RPC模式(RPC)
以下是Python中使用pika
库的RabbitMQ客户端代码示例:
- 简单模式:
生产者:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
channel.basic_publish(exchange='',
routing_key='hello',
body='Hello World!')
print(" [x] Sent 'Hello World!'")
connection.close()
消费者:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print(f" [x] Received {body}")
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()
- 工作队列模式:
多个消费者竞争模式:
import pika
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
message = 'Hello World!'
channel.basic_publish(exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
))
print(f" [x] Sent {message}")
connection.close()
消费者:
import pika
import time
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.queue_declare(queue='task_queue', durable=True)
def callback(ch, method, properties, body):
print(f" [x] Received {body}")
time.sleep(body.count(b'.'))
print(f" [x] Done")
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='task_queue', on_message_callback=callback)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consumi
评论已关闭