RabbitMQ的消息模式

RabbitMQ在应用中一定包含下面三种角色

生产者 - 消费者模式

一个消息只能被一个消费者取走。 client端充当生产者或者消费者,消息都存储在server端的消息队列中。

客户端,不管是生产者还是消费者,都要连接到server先。

connection = pika.BlockingConnection(pika.ConnectionParameters(host='10.117.175.50'))  //连接到server端

channel = connection.channel() // 

channel.queue_declare(queue='hello')   //声明server端消息队列的名称

对于生产者,向队列publish

channel.basic_publish(exchange='', routing_key='hello',  body='Hello World!') //routing_key设置为queue的名称

对于消费者,从队列中consume

def callback(ch, method, properties, body):
     ...
channel.basic_consume(callback, queue='hello', no_ack=True)   // 设计一个回调函数来处理消息。

参数delivery_mode的作用

delivery_mode = 2 意味着要publish的这条消息在server重启之后依然要不丢失。

channel.basic_qos(prefetch_count=X)的作用

在有多个消费者的情况下,默认的如果不设置prefetch_count,server会把消息轮流分给各个消费者去处理。

消费者如果设置了prefetch_count=1,那么它在处理完消息之后需要发出确认ack, ch.basic_ack(delivery_tag = method.delivery_tag) server只会在收到它的ack之后才会再分给它新消息。 server端挑选消费者的一个依据就是看消费者对应的channel上未ack的消息数是否达到其设置的prefetch_count个数。

发布 - 订阅模式

多播,一个消息被所有订阅者接收。 发布者可以有多个,订阅者可以有多个。

需要设置exchange channel.exchange_declare(exchange=’logs’, type=’fanout’) // 设置exchange的类型

对于消息发布者

channel.basic_publish(exchange='logs', routing_key='', body=message)  // routing_key设置为空

对于消息订阅者

result = channel.queue_declare(exclusive=True)

channel.queue_bind(exchange='logs', queue=result.method.queue)  // 绑定到特定的queue

def callback(ch, method, properties, body):
  print(" [x] %r" % body)

channel.basic_consume(callback, queue=result.method.queue, no_ack=True)

channel.start_consuming()

消息分类的发布-订阅模式

消息是有分类的,比如log条目的级别error/warn/info。 消息根据routing_key的设定进行Routing。 发布者按照分类进行消息发布,订阅者按需进行消息订阅。

对于发布者

channel.exchange_declare(exchange='direct_logs', type='direct')
channel.basic_publish(exchange='direct_logs', routing_key=severity, body=message)  // 需要设定当前消息的routing_key

对于订阅者

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

for severity in severities:
 channel.queue_bind(exchange='direct_logs', queue=queue_name, routing_key=severity)  // 需要按需绑定routing_key

消息多重分类的发布-订阅模式

消息的分类更多,还支持对分类进行正则匹配。注意通配符和别的地方不太一样。

对于发布者

channel.exchange_declare(exchange='topic_logs', type='topic')

channel.basic_publish(exchange='topic_logs', routing_key=routing_key, body=message) // 需要设定当前消息的

对于订阅者

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue

for binding_key in binding_keys:
  channel.queue_bind(exchange='topic_logs', queue=queue_name, routing_key=binding_key)  // 需要按需绑定routing_key

RPC请求 - 响应模式

将rabbitmq设计成一个RPC服务框架。这个设计很有意思。 RPC请求者是主动方。PRC服务者是被动方。

RPC请求者,首先订阅消息,回调函数是处理PRC响应值。 客户端需要主动发布消息来作为RPC请求。

RPC服务者,首先订阅消息,回调函数是处理PRC的请求,计算,并发布消息来作为PRC的响应。

服务者取出一条消息(作为RPC请求,来自于某个请求者),之后会再发一条消息(作为RPC响应)给那个请求者。 所以需要设置 properties=pika.BasicProperties(correlation_id = props.correlation_id) 以记住消息请求者的身份。

*****
Written by Lu.dev on 30 March 2016