目录
- 1 简介
- 2 RabbitMQ架构
- 3 安装
- 4 具体使用
- 4.1 一个生产者发送单个消息
- 4.2 多个消费者
- 4.3 代码改进
- 4.3.1 消息确认机制
- 4.3.2 消息持久化
- 4.3.3 取消平均分配
- 4.3.4 代码合并
- 5 交换机
- 5.1 Exchange介绍
- 5.2 fanout模式
- 4.5.1 使用fanout交换器
- 4.5.2 使用默认队列
- 4.5.3 创建交换器和队列的binding
- 4.5.4 完整代码
- 5.3 direct模式
- 5.3.1 建立binding
- 5.3.2使用direct交换器
- 5.3.4 完整代码
- 5.4 topic模式
- 5.4.1 使用topic交换器
- 5.4.2 完整代码
- 6 实现RPC
- 5.1 建立客户端
- 5.2 Callback queue
- 5.3 Correlation id
- 5.4 完整代码
- 7 封装成类使用
- 消费者
- 生产者
RabbitMQ时一个消息中间件,接受并分发消息。你可以把它看作一个邮局:当你想寄邮件,放到邮箱,邮递员就会把信送给收件人。RabbitMQ就相当于邮箱,邮局和送件人。
RabbitMQ和邮局不同的是,RabbitMQ不会处理信件,而是接受,存储,发送二进制类型的数据消息。
RabbitMQ的优点:

1.生产者(producer):负责发送消息的程序
2.队列(queue):相当于在RabbitMQ中的邮箱。尽管消息在RabbitMQ和你的应用中流动,但消息只会存储在队列中。它本质上是一个很大的消息缓冲区。多个生产者可以发送消息到同一个队列,并且多个消费者可以接受同一个队列的消息。
3.消费者(consumer):可以理解为接受者,是一个一直等待接受消息的程序。
docker安装
官网的python安装


这部分我们完成一个小的python程序:一个生产者发送单个消息,一个消费者接受并打印消息。

send发送者(生产者)

编写程序,发送单个消息到队列。
1.建立连接
这里是连接本地的RabbitMQ,如果想连接其他机器,可以修改ip地址。
2.声明队列
接下来,在发送之前,需要确认队列已经存在,如果发送消息到一个不存在队列,RabbitMQ会丢弃这个消息,下面创建名字为hello的队列:
3.发送消息
这样,就可以准备发消息了,现在要发送一个消息内容为hello world的字符串消息到hello队列,但是并不能直接发送到队列,需要经过交换器,将在后面详细介绍。现在就使用一个名称为空字符串的默认交换器。交换器的主要作用是把消息精确发送到该到的队列,队列的名字需要在routing key参数中声明:
4.关闭连接
最后,我们需要手动关闭连接,确认消息已经准确发送到RabbitMQ:
Receive

编写,将从队列接受消息并打印。
1.建立连接
前面发送消息的代码一样。
2.声明队列
下一步也是要确认队列已经存在,仍然需要先创建一个队列,可以多次创建,但是只会创建一个。
为什么我们前面声明过队列了还要在一次声明?这是为了确认队列一定存在。例如,有时不确定哪个程序先启动,所以最好重复声明。
3.回调函数
接受消息要更复杂,通常要给队列指定一个回调函数,当接收到消息,就会调用回调函数。在本例子中,回调函数就简单打印消息:
4.接受消息
接下来,需要告诉RabbitMQ,这个回调函数需要从队列接受消息:
执行到上面的命令时,必须确认hello队列已经存在,auto_ack参数会在后面讲解。
代码运行
运行消费者,会一直阻塞等待接受消息:
运行生产者,立马发送消息到队列,被消费者消费并打印:
消费者消费掉消息打印出来
这个概念在网站应用中非常有用,它允许在一个http请求到来时,处理负责的任务逻辑。
下面我们改造下前面一个示例,把生产者命名为new_task.py,用来生产多个任务。然后我们自定义消息,结尾含有不同个数的"."。然后在消费者中,我们接受消息,根据接受消息中的’.'的个数,我们sleep不同的时间,来模拟不同的耗时的任务。
然后先通过终端启动两个worker:
再在终端创建6个任务
结果发现,两个worker各消费3个,执行3个任务。
RabbitMQ默认会按顺序发送消息给下个消费者,每个消费者会收到相同的消息,这种分配消息的方式称为。
4.3.1 消息确认机制
当一个消费者处理一个任务时,可能要很久,那当消费者在处理一个耗时任务到一半时,突然中断,会发生什么?在我们前面的代码中,当消息发送给消费者后,马上就被删除了。在这种情况下,在消费者处理任务的过程中杀死消费者,消息将会丢失。同时也会丢失这个消费者所有未处理的消息。
如果我们不想丢失任何任务,比如,一个消费者杀死后,把任务发给另一个消费者。
为了确保消息绝不会丢失,RabbitMQ支持 。是指消费者发回给RabbitMQ,并告诉RabbitMQ,特定的消息被接受并处理了,可以被删除了。
如果一个消费者中断(信道关闭,连接关闭,或者TCP连接丢失)而没有发送ack,RabbitMQ就知道了消息没有被完全处理,并且把消息重新放到队列里。如果有其他消费者在线,消息就会马上发送给另一个消费者。这样就可以确保即使消费者意外中断,消息也不会丢失。
消费者发送ack有个默认30min的超时时间,这帮助检测到从未发送ack的消费者,当然也可以增加这个超时时间。
下面修改worker.py如下:
同样,我们启动两个消费者,然后生产一个延时6秒的任务,当其中一个消费者接受消息处理任务时,ctrl+c停止,这时,另一个消费者就会消费之前的消息了。
注意:
1.必须发送回相同的信道,如果使用不通的信道,会报错。
4.3.2 消息持久化
上面我们已经学习如何保证消费者死掉的情况下任务不会丢失。但是如果RabbitMQ服务中断,我们的任务仍然可能丢失。
当RabbitMQ中断或宕机,它会忘记队列和消息,除非你告诉他。我们需要确认两件事保证消息不回丢失:确认队列和消息都已经持久化了。
首先,队列持久化,设置如下:
这个命令本身时正确的,但是按照我们的步骤,是不生效的。因为之前我们已经声明一个叫’hello’的非持久化queue了。RabbitMQ不允许使用不同参数通信定义已经存在的queue,否则会报错。我们可以重新定义一个新的queue,比如task_queue
这个queue需要同时在生产者和消费者中应用。
下面需要对消息进行持久化,通过以下代码:
注意:这种方式并不能保证信息绝不丢失。尽管这样告诉RabbitMQ保存信息到磁盘,但是仍然有短暂时间RabbitMQ接受了消息但是还未来得及保存。要保证持久化比较健壮,可以使用publisher confirms(待研究)。
4.3.3 取消平均分配
有一种场景,两个消费者,但是奇数的消息都很重,处理很慢,偶数的消息都很轻,处理很快,这样就导致一个消费者持续的繁忙,而另一个很闲。但是RabbitMQ并不知道这些,还是继续平均分配。
导致这个的原因是当消息进入队列时RabbitMQ才分配消息,但是并不关系从消费者来的unack的消息数量。
为了避免这种情况,可以使用channel中的basic_qos方法设置prefetch_count=1。这是使用basic.qos协议方法告诉RabbitMQ不要同一时间给一个消费者超过一条消息。换句话说,就是在一个消费者结束或者ack上一个消息之前,不要再分配消息给消费者,而是分配给下一个闲置的消费者。

4.3.4 代码合并
RabbitMQ中的消息模式的核心是生产者从不直接发送消息给队列。事实上,生产者经常不知道消息是否被任何队列接受了。
生产者只能发送消息到exchange(交换器)。交换器非常简单,一端接受生产者发送的消息,另一端把消息推送到队列。交换器必须准确知道它接受到消息后要做什么。是否发送消息到特定队列?是否发送到多个队列?或者是否丢弃。具体的规则就是通过交换器类型定义。
有4中交换器类型:
- direct:默认的模式,消息直接发送到路由键匹配的队列
- topic:匹配订阅模式,通过路由键匹配,但是更灵活,可以根据一定的路由规则灵活匹配
- headers:很少使用,
- fanout:发布/订阅模式的交换器,当你发送一条消息的时候,交换器会把消息广播到所有附加到这个交换器的队列上。
补充:
1.列出交换器
要列出服务器上所有的交换器,可以使用rabbitmqctl
在这个列表中,会有一些amq.*交换器和默认交换器,都是默认创建的,现在可能用不到。
也可以在rabbitmq的页面管理系统上查看。
2.默认交换器
前面文档中我们并没有具体说明交换器,但是仍然能够发送消息到队列,因为我们使用了默认交换器,并命名了空字符串。代码如下:
其中exchange参数就是交换器的名字,空字符串表示默认或匿名交换器:消息会被发路由到名称为routing_key的参数的队列中(如果存在)。
在本部分,将做一些不同的–发布一个消息到多个消费者,这个模式称为"publish/subscribe"(发布/订阅)模式。
为了说明这个模式,我们创建一个简单的日志系统。它由两个程序组成:一是发出log信息,二是接受并打印信息。
在我们的日志系统中,所有正在运行的接受程序都会收到消息。这样,就可以运行一个接受者并把日志导入磁盘,并且同时可以运行另一个接受者在屏幕查看日志。
4.5.1 使用fanout交换器
本质上,发布的日志消息会广播到所有的接受者。所以我们需要选择fanout交换器,如下:
我们前面使用的都是特定名字的队列,能为队列命名是很重要的–我们需要指定消费者到相同的队列。如果你想在消费者和生产者之间共享队列,那么给队列命名是非常重要的。
但是在我们的日志系统并不需要,我们想监听所有的日志消息,而不是他们的子集。并且我们只关心目前正在流动的消息而不是更早的。为此,需要做以下两件事:使用默认队列,创建交换器和队列的连接。
4.5.2 使用默认队列
首先,无论何时我们连接Rabbit,我们需要新的空的队列。为此,我们可以给队列取个随机的名字,或者让服务器选择一个随机的名字给我们,我们可以通过在声明队列时给个空的队列名:
在result.method.queue就包含可一个随机的队列名,比如,像amq.gen-JzTY20BRgKO-HjmUJj0wLg。
第二,一旦消费者连接关闭,队列也要被删除,在exclusive参数定义如下:
4.5.3 创建交换器和队列的binding
我们已经创建了一个扇形交换器和队列,现在就需要告诉交换器发送信息给队列。交换器和队列之间的联系就是binding。

目前为止,交换器就可以发消息给队列了。
补充:可以列出存在的binding:
4.5.4 完整代码

这个程序和以前没什么大的不同,最重要的是我们把消息推送到我们的交换器而不是匿名交换器。当发送的时候我们需要提供一个,但是对于类型交换器会被忽略。
可以看到,当我们要自己定义一个交换器时而不是用默认的交换器时,必须在建立连接后声明一个交换器,因为发布一个不存在的交换器是禁止的。
如果没有队列绑定到交换器,消息就会丢失,但是这对于我们时ok的,因为如果没有消费者在监听,我们可以安全的丢弃消息。
如果想把日志保存到文件,只要在控制台输入命令:
如想在控制台看日志,在另一个终端运行:
使用命令可以看到我们创建的bindings和queues,运行两个程序,你会看到如下:
另外可以发布一个消息,会发现,两个消费者都收到消息了,如果两个消费者有不同的callback,就会根据接受的消息完成自己功能。
上一节我们创建了一个简单的日志系统,可以把消息广播给所有消费者。
本章我们要新添加一个特性–让它只可以订阅一部分消息。比如,我们仅能导入严重的错误信息到日志文件,但是仍然可以在控制台打印所有的日志信息。
5.3.1 建立binding
之前我们已经创建了bindings,他是交换器和队列之间的联系。
bindings有另外一个额外的参数,为了和交换器里的routing_key分开,我们这里叫它,如下:
binding key的作用取决于交换去的类型,对于fanout交换器,或忽略他的值。
5.3.2使用direct交换器
我们之前的日志系统会把消息广播给所有的消费者,现在,我们想根据严重程度过滤消息。例如,我们只想把很严重的日志写入磁盘,不存入警告和信息日志。
我们将使用类型的交换器,路由算法也很简单 – 信息发到binding key和消息的routing key可以完全匹配的队列。

如上图,我们可以看到,一个交换器x有两个队列绑定它,第一个队列通过叫orange的binding key绑定,第二个通过队列有两个bindings,一个叫black,另一个叫green。
这样,routing key 为orange的消息就发送到Q1队列,routing key 为black和green的消息就发送到Q2队列,其他所有的消息都会被丢弃。

5.3.4 完整代码

如果你只想保存’warning’和’error’的日志信息:
如果只在控制台看日志:
如下,写入一个错误日志:
前一节中我们改进了我们的日志系统:使用可以提供选择性接受消息的direct类型交换器代替只能简单广播的fanout交换器。
尽管direct交换器改进了系统,但是它仍然局限性 - 不能根据多标准路由消息。在日志系统中,我们可能不仅想根据严重行订阅消息,也想根据发出消息的来源。这就需要我们下面要讲的topic交换器。
5.4.1 使用topic交换器
发送到topic交换器的消息不能有随意的routing key ,必须是一个以逗号分割的词列表。单词可以是任何词,但是要能说明连接的消息的特征。一个有效的routing key 的例子:“stock.usd.nyse”,“nyse.vmw”,“quick.orange.rabbit”。每个单词的最大长度为255字节。
一般binding keys(交换器和队列的连接)也要相同的形式。topic交换器背后的逻辑和direct类似 - 发送一个具有特定routing key的消息会被传递到所有绑定了匹配的binding key的所有队列。binding key有两个非常重要点:
- (star)代表一个单词
- (hash)代表0个或多个单词

在上面图中的例子中,我们打算发送描述动物的消息。消息有三个单词(两个逗号)组成的routing key。在routing key的第一个单词描述敏捷性,第二个描述颜色,第三个描述物种:".."。
我们将创建三个binding:Q1绑定的binding key,Q2绑定和.
总结以上binding如下:
- Q1接受所有颜色为orange的动物
- Q2接受所有的rabbit和lazy类型的动物
加入一个消息的routing key为"quick.orange.rabbit",那么这个消息会发送到两个队列。一个消息的routing key为"lazy.orange.elephant",则也会发送到两个队列。 "quick.orange.fox"的消息只会发送到第一个队列。"lazy.brown.fox"只会发送到第二个。"lazy.brown.fox"只会发送到第二个队列一次,尽管他匹配两个bindings。“quick.brown.fox” 的消息不满足任何binding,会被丢弃。
如果我们打破协议,发送一个routing key有四个单词的消息会怎么样,比如"quick.orange.male.rabbit"?其实,这种消息不匹配任何bindings而被丢弃。
但是,另一方面,“lazy.orange.male.rabbit”,尽管有4个单词,却匹配最后一个binding,所以会发送到第二个队列。
补充:
topic交换器非常强大,可以表现为其他的交换器。
当队列和"#"binding key绑定 - 它就会接受所有消息,忽略routing key - 就像fanout交换器;
当在binding中没有使用"*“和”#"两个字符时,就会表现为direct交换器类型。
5.4.2 完整代码
在新的日志系统里,我们准备使用具有两个单词的routing key,像".",分别表示来源和严重程度。
下面我们来试一下效果
如果想接受所有消息:
接受所有来自"kern"的消息:
接受严重程度为"critical"的消息:
也可以创建多个binding:
来发送一条routing key 为"kern.critical"的消息:
前面,学习了使用去分配耗时任务给多个消费者,但是如果需要运行一个函数或者远程的主机,并等待结果呢?这种被称为远程过程调用(RPC)。
在本文中,通过RabbitMQ创建一个RPC系统:一个客户端和一个可扩展的RPC服务。模拟一个耗时任务,可以创建一个返回斐波那契数列的RPC服务。
为了说明RPC服务如何使用,我们创建一个简单的客户类,定义一个函数call,用来发送RPC请求并阻塞等待响应。
完整代码见后面。
客户端发送请求消息,服务器回复一个消息。为了接受服务器发回的消息,客户端需要在请求中发送一个’callback’队列地址,如下:
补充:消息属性
AMQP 0-9-1协议定义了消息的14个性质,大部分很少使用,以下是主要的一些:
- delivery_mode:标记消息是持久的(值是2)还是暂时的(除了2的其他值)
- content_type:描述消息的编码类型。例如使用来表示使用json编码
- reply_to:通常用来命名一个回复队列
- correlation_id:可以用来关联RPC响应和请求
上面的方法中我们建议给每个RPC请求创建一个callback队列,我们需要给每个队列创建一个单独的callback队列。
这样就引发一个问题,当队列接收到响应后,并不知道这个响应属于哪个请求。这就需要,给每个请求一个唯一的值。然后,当在callback队列接受到消息,就会检查这个属性,基于属性值,就可以把响应和请求匹配。如果是一个未知的值,就会安全删除消息。
你可能会问,为什么在callback队列中我们要丢弃不知道的消息,而不是抛一个错误?因为在服务端有条件竞争的可能性。尽管很少发生,RPC服务还是可能在发送响应之后,但是在发送ack之前,挂掉了。如果发生,重启TPC服务就会再次发送请求。这就是为什么客户端需要处理重复消息,并且RPC服务需要幂等的。

我们的RPC的工作原理如下:
- 当客户端启动,会创建一个匿名的唯一队列
- 对于一个RPC请求,客户端发送的消息有两个属性:用来表示请求回复时的回调队列,和,每个请求的唯一值
- 请求会发送到一个队列
- RPC消费者则等待这个队列的请求,当接受到请求就会处理请求并发送带着结果的消息到reply_to字段的队列
- 客户端等待回调队列的数据,当消息出现,会检查属性,如果个请求的值匹配,则会给应该返回响应。
服务端的原理如下:
- 建立连接并声明队列
- 声明fibonacci函数
- 给声明回调函数,这是rpc服务的核心。当接受请求时会执行这个函数并发回响应
- 我们可能想运行多个服务,为了公平分配任务,我们需要设置
客户端的运行原理如下:
- 建立连接,信道,并给每个请求的回复声明一个专有的队列
- 订阅(basic_consume)该队列,则就能接受RPC服务的回复
- 回调函数会在接受到回复后执行。每个RPC返回的消息会检查,如果检查通过,会保存在self.response,并且结束消费循环
- 接下来定义主要的函数call函数,它来发送RPC请求
- 在call函数中,我们生成一个唯一的数字并保存,在on_response函数会使用它与RPC服务发回来的值比较
- 在call方法中,我们发的消息有两个属性:reply_to和correlation_id
- 最后我们有一个循环,直到RPC返回响应。
版权声明:
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若内容造成侵权、违法违规、事实不符,请将相关资料发送至xkadmin@xkablog.com进行投诉反馈,一经查实,立即处理!
转载请注明出处,原文链接:https://www.xkablog.com/rfx/17090.html