上一篇:docker部署redis主从(2019-05-01 19:20:35)
文章大纲

rabbitmq学习笔记

2019-05-06 13:52:38

RabbitMQ是一个由erlang开发的AMQP(Advanved Message Queue)的开源实现。


RabbitMQ做什么的?

https://www.rabbitmq.com/tutorials/tutorial-one-php.html

这里有形象化的解释:

RabbitMQ is a message broker: it accepts and forwards messages. 
You can think about it as a post office: when you put the mail that you want posting in a post box,
you can be sure that Mr. or Ms. Mailperson will eventually deliver the mail to your recipient.
In this analogy, RabbitMQ is a post box, a post office and a postman.

The major difference between RabbitMQ and the post office is that it doesn't deal with paper,
instead it accepts, stores and forwards binary blobs of data ‒ messages.


译文:

RabbitMQ是一种message broker(不知道怎么翻译合适,就用原文吧)。它专注于接受和推送消息。我们可以把它想象成一个邮局:我们把要寄的信件放到邮箱,之后就会有邮递员把我们的信派发到收件人手上。按照这个想象的情景来类比的话,那么RabbitMQ就相当于邮箱、邮局和邮递员这一个整体组合。


RabbitMQ和邮局之间的主要区别:RabbitMQ并不处理消息,它只接收,存储,然后推送消息。



多个消费者,如何处理队列里消息的?

Round-robin(轮询调度)的方式使得每个消费者可以获得平均数量的消息。

Task(任务)的处理,在这里我们要将Task理解成消息,执行Task的一方称作消费者(任务执行者),而生成任务的一方则是生产者。


消息确认(Message Acknowledgement)的用途:

消费者A进程突然中止或者死掉,其就会自动发送消息确认通知给Rabbit MQ,RabbitMQ就会将原先派发给它的消息转而派发给其它消费者,从而保证了消息的稳定性,不会丢失。


实践代码:

//worker.php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLibConnectionAMQPStreamConnection;

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare('hello', false, false, false, false);

echo " [*] Waiting for messages. To exit press CTRL+Cn";

$callback = function ($msg) {
echo ' [x] Received ', $msg->body, "n";
sleep(substr_count($msg->body, '.'));
echo " [x] Donen";
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);};
};

// $channel->basic_consume('hello', 'bingo_a', false, true, false, false, $callback);
//加上消息确认机制,某消费者突然终止,其负责的消息就可以转让给其它消费者
$channel->basic_consume('hello', 'bingo_a', false, false, false, false, $callback);

while (count($channel->callbacks)) {
$channel->wait();
}

$channel->close();
$connection->close();
//new_task.php

require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLibConnectionAMQPStreamConnection;
use PhpAmqpLibMessageAMQPMessage;


$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

$channel->queue_declare('hello', false, false, false, false);

$data = implode(' ', array_slice($argv, 1));
if (empty($data)) {
$data = "Hello World!";
}

$msg = new AMQPMessage($data);
$channel->basic_publish($msg, '', 'hello');

echo " [x] Sent '".$data."'n";
$channel->close();
$connection->close();


避免遗漏丢失消息,用如下命令查看:

rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged


消息持久化(Message durability):

上面消息确认是预防消费者程序崩溃导致消息丢失,但一旦RabbitMQ服务崩溃了呢?消息还能找回么?

这个持久化就是干这个用的:预防RabbitMQ服务崩溃而消息丢失。

$channel->queue_declare('hello', false, false, false, false);
//这里第3个参数,设为true,就是代表持久化。生产者消费者必须都得设置
$channel->queue_declare('hello', false, true, false, false);


$msg = new AMQPMessage( $data, array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT) );
这里将消息设为持久,RabbitMQ会将消息写进磁盘,但会存在RabbitMQ已挂掉,还没来得及写入的情况,所以这种持久化不是高稳定可靠的。


公平分发(Fair Dispatch):
上面轮询分发有个问题,就是可能耗时的任务都跑到一个消费者去了,从而导致有的消费者太忙,有的消费者太闲。忙的可能造成任务处理等待过长。
公平分发,就是确保每个消费者每次只能接收一个任务。如果某个消费者任务还没处理完或者还没发送消息确认(就是消息要转让),就派发到下一个比较闲的消费者身上去。

代码解决:
$channel->basic_qos(null, 1, null);

发布与订阅(Publish / Subscribe)
与前面不同的是,这里专注于把一个消息同时发送到多个消费者那里去。

交换机(Exchange)
前面是生产者直接将消息推送到队列,消费者直接从队列里取出消息。
而交换机是相当于生产者与队列之间的媒介: 生产者先把生成的消息放到交换机,然后由交换机决定消息如何处理,可能是推到某个队列,也可能是推到多个或所有队列,也或者可能是直接丢弃。

交换机如何处理消息,是由交换机类型来决定的:
direct 直接模式,根据路由关键字是否匹配绑定关键字,从而推到某个或某些队列
fanout 广播模式,推送到所有队列
topic 通配符模式
headers

绑定(Binding)
交换机与一个队列之间的关系,我们就称之为绑定。

路由(Routing)
当交换机需要将消息推送到指定的队列,就需要用到路由关键字。
当交换机为广播模式时,会直接忽略路由关键字。

通配符交换机(Topic Exchange)
目的只是为了让交换机推送消息时关于队列的选择性变得更加灵活。
当绑定关键字为#时,此时交换机相当于广播模式(Fanout)的交换机
当绑定关键字不含有*和#时,此时交换机相当于直接模式(direct)的交换机


远程调用(RPC, Remote Procedure Call)


上一篇:docker部署redis主从(2019-05-01 19:20:35)
我要评论
评论列表