RabbitMQ是一个由erlang开发的AMQP(Advanved Message Queue)的开源实现。
RabbitMQ做什么的?
https://www.rabbitmq.com/tutorials/tutorial-one-php.html
这里有形象化的解释:
RabbitMQ is a message broker: it accepts and forwards messages.
You can think about it as a post office: when you put the mail that you want posting in a post box,
you can be sure that Mr. or Ms. Mailperson will eventually deliver the mail to your recipient.
In this analogy, RabbitMQ is a post box, a post office and a postman.
The major difference between RabbitMQ and the post office is that it doesn't deal with paper,
instead it accepts, stores and forwards binary blobs of data ‒ messages.
译文:
RabbitMQ是一种message broker(不知道怎么翻译合适,就用原文吧)。它专注于接受和推送消息。我们可以把它想象成一个邮局:我们把要寄的信件放到邮箱,之后就会有邮递员把我们的信派发到收件人手上。按照这个想象的情景来类比的话,那么RabbitMQ就相当于邮箱、邮局和邮递员这一个整体组合。
RabbitMQ和邮局之间的主要区别:RabbitMQ并不处理消息,它只接收,存储,然后推送消息。
多个消费者,如何处理队列里消息的?
Round-robin(轮询调度)的方式使得每个消费者可以获得平均数量的消息。
Task(任务)的处理,在这里我们要将Task理解成消息,执行Task的一方称作消费者(任务执行者),而生成任务的一方则是生产者。
消息确认(Message Acknowledgement)的用途:
消费者A进程突然中止或者死掉,其就会自动发送消息确认通知给Rabbit MQ,RabbitMQ就会将原先派发给它的消息转而派发给其它消费者,从而保证了消息的稳定性,不会丢失。
实践代码:
//worker.php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLibConnectionAMQPStreamConnection;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('hello', false, false, false, false);
echo " [*] Waiting for messages. To exit press CTRL+Cn";
$callback = function ($msg) {
echo ' [x] Received ', $msg->body, "n";
sleep(substr_count($msg->body, '.'));
echo " [x] Donen";
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);};
};
// $channel->basic_consume('hello', 'bingo_a', false, true, false, false, $callback);
//加上消息确认机制,某消费者突然终止,其负责的消息就可以转让给其它消费者
$channel->basic_consume('hello', 'bingo_a', false, false, false, false, $callback);
while (count($channel->callbacks)) {
$channel->wait();
}
$channel->close();
$connection->close();
//new_task.php
require_once __DIR__ . '/vendor/autoload.php';
use PhpAmqpLibConnectionAMQPStreamConnection;
use PhpAmqpLibMessageAMQPMessage;
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
$channel->queue_declare('hello', false, false, false, false);
$data = implode(' ', array_slice($argv, 1));
if (empty($data)) {
$data = "Hello World!";
}
$msg = new AMQPMessage($data);
$channel->basic_publish($msg, '', 'hello');
echo " [x] Sent '".$data."'n";
$channel->close();
$connection->close();
避免遗漏丢失消息,用如下命令查看:
rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged
消息持久化(Message durability):
上面消息确认是预防消费者程序崩溃导致消息丢失,但一旦RabbitMQ服务崩溃了呢?消息还能找回么?
这个持久化就是干这个用的:预防RabbitMQ服务崩溃而消息丢失。
$channel->queue_declare('hello', false, false, false, false);
//这里第3个参数,设为true,就是代表持久化。生产者消费者必须都得设置
$channel->queue_declare('hello', false, true, false, false);
$msg = new AMQPMessage( $data, array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT) );
$channel->basic_qos(null, 1, null);
direct 直接模式,根据路由关键字是否匹配绑定关键字,从而推到某个或某些队列
fanout 广播模式,推送到所有队列
topic 通配符模式
headers
当绑定关键字为#时,此时交换机相当于广播模式(Fanout)的交换机
当绑定关键字不含有*和#时,此时交换机相当于直接模式(direct)的交换机