上一篇:20分钟的工作被折腾一天的心得(2019-04-03 00:03:39)

rabbitmq学习笔记

2019年05月06日 13:52
<p>RabbitMQ是一个由erlang开发的AMQP(Advanved Message Queue)的开源实现。</p><p><br></p><p>RabbitMQ做什么的?</p><p><a href="https://www.rabbitmq.com/tutorials/tutorial-one-php.html">https://www.rabbitmq.com/tutorials/tutorial-one-php.html</a><a href="https://www.rabbitmq.com/tutorials/tutorial-one-php.html" target="_blank"></a></p><p>这里有形象化的解释:</p><pre>RabbitMQ is a message broker: it accepts and forwards messages. <br>You can think about it as a post office: when you put the mail that you want posting in a post box, <br>you can be sure that Mr. or Ms. Mailperson will eventually deliver the mail to your recipient. <br>In this analogy, RabbitMQ is a post box, a post office and a postman.<br><br>The major difference between RabbitMQ and the post office is that it doesn't deal with paper, <br>instead it accepts, stores and forwards binary blobs of data ‒ messages. <br></pre><p><br></p><p>译文:</p><p>RabbitMQ是一种message broker(不知道怎么翻译合适,就用原文吧)。它专注于接受和推送消息。我们可以把它想象成一个邮局:我们把要寄的信件放到邮箱,之后就会有邮递员把我们的信派发到收件人手上。按照这个想象的情景来类比的话,那么RabbitMQ就相当于邮箱、邮局和邮递员这一个整体组合。</p><p><br></p><p>RabbitMQ和邮局之间的主要区别:RabbitMQ并不处理消息,它只接收,存储,然后推送消息。</p><p><br></p><p><br></p><p>多个消费者,如何处理队列里消息的?</p><p>Round-robin(轮询调度)的方式使得每个消费者可以获得平均数量的消息。</p><p>Task(任务)的处理,在这里我们要将Task理解成消息,执行Task的一方称作消费者(任务执行者),而生成任务的一方则是生产者。</p><p><br></p><p>消息确认(Message Acknowledgement)的用途:</p><p>消费者A进程突然中止或者死掉,其就会自动发送消息确认通知给Rabbit MQ,RabbitMQ就会将原先派发给它的消息转而派发给其它消费者,从而保证了消息的稳定性,不会丢失。</p><p><br></p><p>实践代码:</p><pre>//worker.php<br><br>require_once __DIR__ . '/vendor/autoload.php';<br><br>use PhpAmqpLibConnectionAMQPStreamConnection;<br><br>$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');<br>$channel = $connection-&gt;channel();<br><br>$channel-&gt;queue_declare('hello', false, false, false, false);<br><br>echo " [*] Waiting for messages. To exit press CTRL+Cn";<br><br>$callback = function ($msg) {<br> echo ' [x] Received ', $msg-&gt;body, "n";<br> sleep(substr_count($msg-&gt;body, '.'));<br> echo " [x] Donen";<br> $msg-&gt;delivery_info['channel']-&gt;basic_ack($msg-&gt;delivery_info['delivery_tag']);};<br>};<br><br>// $channel-&gt;basic_consume('hello', 'bingo_a', false, true, false, false, $callback);<br>//加上消息确认机制,某消费者突然终止,其负责的消息就可以转让给其它消费者<br>$channel-&gt;basic_consume('hello', 'bingo_a', false, false, false, false, $callback);<br><br>while (count($channel-&gt;callbacks)) {<br> $channel-&gt;wait();<br>}<br><br>$channel-&gt;close();<br>$connection-&gt;close();<br></pre><pre>//new_task.php<br><br>require_once __DIR__ . '/vendor/autoload.php';<br>use PhpAmqpLibConnectionAMQPStreamConnection;<br>use PhpAmqpLibMessageAMQPMessage;<br><br><br>$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');<br>$channel = $connection-&gt;channel();<br><br>$channel-&gt;queue_declare('hello', false, false, false, false);<br><br>$data = implode(' ', array_slice($argv, 1));<br>if (empty($data)) {<br> $data = "Hello World!";<br>}<br><br>$msg = new AMQPMessage($data);<br>$channel-&gt;basic_publish($msg, '', 'hello');<br><br>echo " [x] Sent '".$data."'n";<br>$channel-&gt;close();<br>$connection-&gt;close();<br></pre><p><br></p><p>避免遗漏丢失消息,用如下命令查看:</p><pre>rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged<br></pre><p><br></p><p>消息持久化(Message durability):</p><p>上面消息确认是预防消费者程序崩溃导致消息丢失,但一旦RabbitMQ服务崩溃了呢?消息还能找回么?</p><p>这个持久化就是干这个用的:预防RabbitMQ服务崩溃而消息丢失。</p><pre>$channel-&gt;queue_declare('hello', false, false, false, false);<br>//这里第3个参数,设为true,就是代表持久化。生产者消费者必须都得设置<br>$channel-&gt;queue_declare('hello', false, true, false, false);<br></pre><p><br></p><pre>$msg = <span class="hljs-keyword">new</span> AMQPMessage( $data, <span class="hljs-keyword">array</span>(<span class="hljs-string">'delivery_mode'</span> =&gt; AMQPMessage::DELIVERY_MODE_PERSISTENT) );</pre><div>这里将消息设为持久,RabbitMQ会将消息写进磁盘,但会存在RabbitMQ已挂掉,还没来得及写入的情况,所以这种持久化不是高稳定可靠的。</div><div><br></div><div><br></div><div>公平分发(Fair Dispatch):</div><div>上面轮询分发有个问题,就是可能耗时的任务都跑到一个消费者去了,从而导致有的消费者太忙,有的消费者太闲。忙的可能造成任务处理等待过长。</div><div>公平分发,就是确保每个消费者每次只能接收一个任务。如果某个消费者任务还没处理完或者还没发送消息确认(就是消息要转让),就派发到下一个比较闲的消费者身上去。</div><div><br></div><div>代码解决:</div><div><pre>$channel-&gt;basic_qos(null, 1, null);<br></pre></div><div><br></div><div>发布与订阅(Publish / Subscribe)</div><div>与前面不同的是,这里专注于把一个消息同时发送到多个消费者那里去。</div><div><br></div><div>交换机(Exchange)</div><div>前面是生产者直接将消息推送到队列,消费者直接从队列里取出消息。</div><div>而交换机是相当于生产者与队列之间的媒介: 生产者先把生成的消息放到交换机,然后由交换机决定消息如何处理,可能是推到某个队列,也可能是推到多个或所有队列,也或者可能是直接丢弃。</div><div><br></div><div>交换机如何处理消息,是由交换机类型来决定的:</div><div><pre>direct 直接模式,根据路由关键字是否匹配绑定关键字,从而推到某个或某些队列<br>fanout 广播模式,推送到所有队列<br>topic 通配符模式<br>headers<br></pre></div><div><br></div><div>绑定(Binding)</div><div>交换机与一个队列之间的关系,我们就称之为绑定。</div><div><br></div><div>路由(Routing)</div><div>当交换机需要将消息推送到指定的队列,就需要用到路由关键字。</div><div>当交换机为广播模式时,会直接忽略路由关键字。</div><div><br></div><div>通配符交换机(Topic Exchange)</div><div>目的只是为了让交换机推送消息时关于队列的选择性变得更加灵活。</div><div><pre>当绑定关键字为#时,此时交换机相当于广播模式(Fanout)的交换机<br>当绑定关键字不含有*和#时,此时交换机相当于直接模式(direct)的交换机<br></pre></div><div><br></div><div><br></div><div>远程调用(RPC, Remote Procedure Call)</div><div><br></div><div><br></div>
  • 2019年05月05日 21:06文章创建
  • 2019年05月06日 13:52文章发布
上一篇:20分钟的工作被折腾一天的心得(2019-04-03 00:03:39)
我要评论
«-必填,限2-20个字符,中文/字母/字母数字组合
«-评论后,邮箱会收到激活链接,未激活邮箱的留言,将无法显示
评论列表
暂无评论,期待你的评论哦!
回到顶部