当前位置:网站首页 > R语言数据分析 > 正文

enoent解决办法rabbitmq(rabbitmq queuedeclare)



目录

  • 1 简介
  • 2 RabbitMQ架构
  • 3 安装
  • 4 具体使用
  • 4.1 一个生产者发送单个消息
  • 4.2 多个消费者
  • 4.3 代码改进
  • 4.3.1 消息确认机制
  • 4.3.2 消息持久化
  • 4.3.3 取消平均分配
  • 4.3.4 代码合并
  • 5 交换机
  • 5.1 Exchange介绍
  • 5.2 fanout模式
  • 4.5.1 使用fanout交换器
  • 4.5.2 使用默认队列
  • 4.5.3 创建交换器和队列的binding
  • 4.5.4 完整代码
  • 5.3 direct模式
  • 5.3.1 建立binding
  • 5.3.2使用direct交换器
  • 5.3.4 完整代码
  • 5.4 topic模式
  • 5.4.1 使用topic交换器
  • 5.4.2 完整代码
  • 6 实现RPC
  • 5.1 建立客户端
  • 5.2 Callback queue
  • 5.3 Correlation id
  • 5.4 完整代码
  • 7 封装成类使用
  • 消费者
  • 生产者

RabbitMQ时一个消息中间件,接受并分发消息。你可以把它看作一个邮局:当你想寄邮件,放到邮箱,邮递员就会把信送给收件人。RabbitMQ就相当于邮箱,邮局和送件人。

RabbitMQ和邮局不同的是,RabbitMQ不会处理信件,而是接受,存储,发送二进制类型的数据消息。

RabbitMQ的优点:

python中rabbitmq中heartbeat的最优配置_发送消息

1.生产者(producer):负责发送消息的程序
2.队列(queue):相当于在RabbitMQ中的邮箱。尽管消息在RabbitMQ和你的应用中流动,但消息只会存储在队列中。它本质上是一个很大的消息缓冲区。多个生产者可以发送消息到同一个队列,并且多个消费者可以接受同一个队列的消息。
3.消费者(consumer):可以理解为接受者,是一个一直等待接受消息的程序。




docker安装

官网的python安装

python中rabbitmq中heartbeat的最优配置_python_02

python中rabbitmq中heartbeat的最优配置_rabbitmq_03

这部分我们完成一个小的python程序:一个生产者发送单个消息,一个消费者接受并打印消息。

python中rabbitmq中heartbeat的最优配置_分布式_04

send发送者(生产者)

python中rabbitmq中heartbeat的最优配置_python_05

编写程序,发送单个消息到队列。

1.建立连接

这里是连接本地的RabbitMQ,如果想连接其他机器,可以修改ip地址。

2.声明队列

接下来,在发送之前,需要确认队列已经存在,如果发送消息到一个不存在队列,RabbitMQ会丢弃这个消息,下面创建名字为hello的队列:

3.发送消息

这样,就可以准备发消息了,现在要发送一个消息内容为hello world字符串消息到hello队列,但是并不能直接发送到队列,需要经过交换器,将在后面详细介绍。现在就使用一个名称为空字符串的默认交换器。交换器的主要作用是把消息精确发送到该到的队列,队列的名字需要在routing key参数中声明:

4.关闭连接

最后,我们需要手动关闭连接,确认消息已经准确发送到RabbitMQ:

Receive

python中rabbitmq中heartbeat的最优配置_分布式_06

编写,将从队列接受消息并打印。

1.建立连接

前面发送消息的代码一样。

2.声明队列

下一步也是要确认队列已经存在,仍然需要先创建一个队列,可以多次创建,但是只会创建一个。

为什么我们前面声明过队列了还要在一次声明?这是为了确认队列一定存在。例如,有时不确定哪个程序先启动,所以最好重复声明。

3.回调函数

接受消息要更复杂,通常要给队列指定一个回调函数,当接收到消息,就会调用回调函数。在本例子中,回调函数就简单打印消息:

4.接受消息

接下来,需要告诉RabbitMQ,这个回调函数需要从队列接受消息:

执行到上面的命令时,必须确认hello队列已经存在,auto_ack参数会在后面讲解。

代码运行

运行消费者,会一直阻塞等待接受消息:

运行生产者,立马发送消息到队列,被消费者消费并打印:

消费者消费掉消息打印出来

这个概念在网站应用中非常有用,它允许在一个http请求到来时,处理负责的任务逻辑。

下面我们改造下前面一个示例,把生产者命名为new_task.py,用来生产多个任务。然后我们自定义消息,结尾含有不同个数的"."。然后在消费者中,我们接受消息,根据接受消息中的’.'的个数,我们sleep不同的时间,来模拟不同的耗时的任务。

然后先通过终端启动两个worker:

再在终端创建6个任务

结果发现,两个worker各消费3个,执行3个任务。

RabbitMQ默认会按顺序发送消息给下个消费者,每个消费者会收到相同的消息,这种分配消息的方式称为。

4.3.1 消息确认机制

当一个消费者处理一个任务时,可能要很久,那当消费者在处理一个耗时任务到一半时,突然中断,会发生什么?在我们前面的代码中,当消息发送给消费者后,马上就被删除了。在这种情况下,在消费者处理任务的过程中杀死消费者,消息将会丢失。同时也会丢失这个消费者所有未处理的消息。

如果我们不想丢失任何任务,比如,一个消费者杀死后,把任务发给另一个消费者。

为了确保消息绝不会丢失,RabbitMQ支持 。是指消费者发回给RabbitMQ,并告诉RabbitMQ,特定的消息被接受并处理了,可以被删除了。

如果一个消费者中断(信道关闭,连接关闭,或者TCP连接丢失)而没有发送ack,RabbitMQ就知道了消息没有被完全处理,并且把消息重新放到队列里。如果有其他消费者在线,消息就会马上发送给另一个消费者。这样就可以确保即使消费者意外中断,消息也不会丢失。

消费者发送ack有个默认30min的超时时间,这帮助检测到从未发送ack的消费者,当然也可以增加这个超时时间。

下面修改worker.py如下:

同样,我们启动两个消费者,然后生产一个延时6秒的任务,当其中一个消费者接受消息处理任务时,ctrl+c停止,这时,另一个消费者就会消费之前的消息了。

注意

1.必须发送回相同的信道,如果使用不通的信道,会报错。

4.3.2 消息持久化

上面我们已经学习如何保证消费者死掉的情况下任务不会丢失。但是如果RabbitMQ服务中断,我们的任务仍然可能丢失。

当RabbitMQ中断或宕机,它会忘记队列和消息,除非你告诉他。我们需要确认两件事保证消息不回丢失:确认队列和消息都已经持久化了。

首先,队列持久化,设置如下:

这个命令本身时正确的,但是按照我们的步骤,是不生效的。因为之前我们已经声明一个叫’hello’的非持久化queue了。RabbitMQ不允许使用不同参数通信定义已经存在的queue,否则会报错。我们可以重新定义一个新的queue,比如task_queue

这个queue需要同时在生产者和消费者中应用。

下面需要对消息进行持久化,通过以下代码:

注意:这种方式并不能保证信息绝不丢失。尽管这样告诉RabbitMQ保存信息到磁盘,但是仍然有短暂时间RabbitMQ接受了消息但是还未来得及保存。要保证持久化比较健壮,可以使用publisher confirms(待研究)。

4.3.3 取消平均分配

有一种场景,两个消费者,但是奇数的消息都很重,处理很慢,偶数的消息都很轻,处理很快,这样就导致一个消费者持续的繁忙,而另一个很闲。但是RabbitMQ并不知道这些,还是继续平均分配。

导致这个的原因是当消息进入队列时RabbitMQ才分配消息,但是并不关系从消费者来的unack的消息数量。

为了避免这种情况,可以使用channel中的basic_qos方法设置prefetch_count=1。这是使用basic.qos协议方法告诉RabbitMQ不要同一时间给一个消费者超过一条消息。换句话说,就是在一个消费者结束或者ack上一个消息之前,不要再分配消息给消费者,而是分配给下一个闲置的消费者。

python中rabbitmq中heartbeat的最优配置_分布式_07

4.3.4 代码合并

RabbitMQ中的消息模式的核心是生产者从不直接发送消息给队列。事实上,生产者经常不知道消息是否被任何队列接受了。

生产者只能发送消息到exchange(交换器)。交换器非常简单,一端接受生产者发送的消息,另一端把消息推送到队列。交换器必须准确知道它接受到消息后要做什么。是否发送消息到特定队列?是否发送到多个队列?或者是否丢弃。具体的规则就是通过交换器类型定义。

有4中交换器类型:

  • direct:默认的模式,消息直接发送到路由键匹配的队列
  • topic:匹配订阅模式,通过路由键匹配,但是更灵活,可以根据一定的路由规则灵活匹配
  • headers:很少使用,
  • fanout:发布/订阅模式的交换器,当你发送一条消息的时候,交换器会把消息广播到所有附加到这个交换器的队列上。

补充:

1.列出交换器

要列出服务器上所有的交换器,可以使用rabbitmqctl

在这个列表中,会有一些amq.*交换器和默认交换器,都是默认创建的,现在可能用不到。

也可以在rabbitmq的页面管理系统上查看。

2.默认交换器

前面文档中我们并没有具体说明交换器,但是仍然能够发送消息到队列,因为我们使用了默认交换器,并命名了空字符串。代码如下:

其中exchange参数就是交换器的名字,空字符串表示默认或匿名交换器:消息会被发路由到名称为routing_key的参数的队列中(如果存在)。

在本部分,将做一些不同的–发布一个消息到多个消费者,这个模式称为"publish/subscribe"(发布/订阅)模式。

为了说明这个模式,我们创建一个简单的日志系统。它由两个程序组成:一是发出log信息,二是接受并打印信息。

在我们的日志系统中,所有正在运行的接受程序都会收到消息。这样,就可以运行一个接受者并把日志导入磁盘,并且同时可以运行另一个接受者在屏幕查看日志。

4.5.1 使用fanout交换器

本质上,发布的日志消息会广播到所有的接受者。所以我们需要选择fanout交换器,如下:

我们前面使用的都是特定名字的队列,能为队列命名是很重要的–我们需要指定消费者到相同的队列。如果你想在消费者和生产者之间共享队列,那么给队列命名是非常重要的。

但是在我们的日志系统并不需要,我们想监听所有的日志消息,而不是他们的子集。并且我们只关心目前正在流动的消息而不是更早的。为此,需要做以下两件事:使用默认队列,创建交换器和队列的连接。

4.5.2 使用默认队列

首先,无论何时我们连接Rabbit,我们需要新的空的队列。为此,我们可以给队列取个随机的名字,或者让服务器选择一个随机的名字给我们,我们可以通过在声明队列时给个空的队列名:

在result.method.queue就包含可一个随机的队列名,比如,像amq.gen-JzTY20BRgKO-HjmUJj0wLg。

第二,一旦消费者连接关闭,队列也要被删除,在exclusive参数定义如下:

4.5.3 创建交换器和队列的binding

我们已经创建了一个扇形交换器和队列,现在就需要告诉交换器发送信息给队列。交换器和队列之间的联系就是binding。

python中rabbitmq中heartbeat的最优配置_发送消息_08

目前为止,交换器就可以发消息给队列了。

补充:可以列出存在的binding:

4.5.4 完整代码

python中rabbitmq中heartbeat的最优配置_回调函数_09

这个程序和以前没什么大的不同,最重要的是我们把消息推送到我们的交换器而不是匿名交换器。当发送的时候我们需要提供一个,但是对于类型交换器会被忽略。

可以看到,当我们要自己定义一个交换器时而不是用默认的交换器时,必须在建立连接后声明一个交换器,因为发布一个不存在的交换器是禁止的。

如果没有队列绑定到交换器,消息就会丢失,但是这对于我们时ok的,因为如果没有消费者在监听,我们可以安全的丢弃消息。

如果想把日志保存到文件,只要在控制台输入命令:

如想在控制台看日志,在另一个终端运行:

使用命令可以看到我们创建的bindings和queues,运行两个程序,你会看到如下:

另外可以发布一个消息,会发现,两个消费者都收到消息了,如果两个消费者有不同的callback,就会根据接受的消息完成自己功能。

上一节我们创建了一个简单的日志系统,可以把消息广播给所有消费者。

本章我们要新添加一个特性–让它只可以订阅一部分消息。比如,我们仅能导入严重的错误信息到日志文件,但是仍然可以在控制台打印所有的日志信息。

5.3.1 建立binding

之前我们已经创建了bindings,他是交换器和队列之间的联系。

bindings有另外一个额外的参数,为了和交换器里的routing_key分开,我们这里叫它,如下:

binding key的作用取决于交换去的类型,对于fanout交换器,或忽略他的值。

5.3.2使用direct交换器

我们之前的日志系统会把消息广播给所有的消费者,现在,我们想根据严重程度过滤消息。例如,我们只想把很严重的日志写入磁盘,不存入警告和信息日志。

我们将使用类型的交换器,路由算法也很简单 – 信息发到binding key和消息的routing key可以完全匹配的队列。

python中rabbitmq中heartbeat的最优配置_分布式_10

如上图,我们可以看到,一个交换器x有两个队列绑定它,第一个队列通过叫orange的binding key绑定,第二个通过队列有两个bindings,一个叫black,另一个叫green。

这样,routing key 为orange的消息就发送到Q1队列,routing key 为black和green的消息就发送到Q2队列,其他所有的消息都会被丢弃。

python中rabbitmq中heartbeat的最优配置_python_11

5.3.4 完整代码

python中rabbitmq中heartbeat的最优配置_python_12

如果你只想保存’warning’和’error’的日志信息:

如果只在控制台看日志:

如下,写入一个错误日志:

前一节中我们改进了我们的日志系统:使用可以提供选择性接受消息的direct类型交换器代替只能简单广播的fanout交换器。

尽管direct交换器改进了系统,但是它仍然局限性 - 不能根据多标准路由消息。在日志系统中,我们可能不仅想根据严重行订阅消息,也想根据发出消息的来源。这就需要我们下面要讲的topic交换器。

5.4.1 使用topic交换器

发送到topic交换器的消息不能有随意的routing key ,必须是一个以逗号分割的词列表。单词可以是任何词,但是要能说明连接的消息的特征。一个有效的routing key 的例子:“stock.usd.nyse”,“nyse.vmw”,“quick.orange.rabbit”。每个单词的最大长度为255字节。

一般binding keys(交换器和队列的连接)也要相同的形式。topic交换器背后的逻辑和direct类似 - 发送一个具有特定routing key的消息会被传递到所有绑定了匹配的binding key的所有队列。binding key有两个非常重要点:

  • (star)代表一个单词
  • (hash)代表0个或多个单词

python中rabbitmq中heartbeat的最优配置_python_13

在上面图中的例子中,我们打算发送描述动物的消息。消息有三个单词(两个逗号)组成的routing key。在routing key的第一个单词描述敏捷性,第二个描述颜色,第三个描述物种:".."。

我们将创建三个binding:Q1绑定的binding key,Q2绑定和.

总结以上binding如下:

  • Q1接受所有颜色为orange的动物
  • Q2接受所有的rabbit和lazy类型的动物

加入一个消息的routing key为"quick.orange.rabbit",那么这个消息会发送到两个队列。一个消息的routing key为"lazy.orange.elephant",则也会发送到两个队列。 "quick.orange.fox"的消息只会发送到第一个队列。"lazy.brown.fox"只会发送到第二个。"lazy.brown.fox"只会发送到第二个队列一次,尽管他匹配两个bindings。“quick.brown.fox” 的消息不满足任何binding,会被丢弃。

如果我们打破协议,发送一个routing key有四个单词的消息会怎么样,比如"quick.orange.male.rabbit"?其实,这种消息不匹配任何bindings而被丢弃。

但是,另一方面,“lazy.orange.male.rabbit”,尽管有4个单词,却匹配最后一个binding,所以会发送到第二个队列。

补充:

topic交换器非常强大,可以表现为其他的交换器。

当队列和"#"binding key绑定 - 它就会接受所有消息,忽略routing key - 就像fanout交换器;

当在binding中没有使用"*“和”#"两个字符时,就会表现为direct交换器类型。

5.4.2 完整代码

在新的日志系统里,我们准备使用具有两个单词的routing key,像".",分别表示来源和严重程度。

下面我们来试一下效果

如果想接受所有消息:

接受所有来自"kern"的消息:

接受严重程度为"critical"的消息:

也可以创建多个binding:

来发送一条routing key 为"kern.critical"的消息:

前面,学习了使用去分配耗时任务给多个消费者,但是如果需要运行一个函数或者远程的主机,并等待结果呢?这种被称为远程过程调用(RPC)。

在本文中,通过RabbitMQ创建一个RPC系统:一个客户端和一个可扩展的RPC服务。模拟一个耗时任务,可以创建一个返回斐波那契数列的RPC服务。

为了说明RPC服务如何使用,我们创建一个简单的客户类,定义一个函数call,用来发送RPC请求并阻塞等待响应。

完整代码见后面。

客户端发送请求消息,服务器回复一个消息。为了接受服务器发回的消息,客户端需要在请求中发送一个’callback’队列地址,如下:

补充:消息属性

AMQP 0-9-1协议定义了消息的14个性质,大部分很少使用,以下是主要的一些:

  • delivery_mode:标记消息是持久的(值是2)还是暂时的(除了2的其他值)
  • content_type:描述消息的编码类型。例如使用来表示使用json编码
  • reply_to:通常用来命名一个回复队列
  • correlation_id:可以用来关联RPC响应和请求

上面的方法中我们建议给每个RPC请求创建一个callback队列,我们需要给每个队列创建一个单独的callback队列。

这样就引发一个问题,当队列接收到响应后,并不知道这个响应属于哪个请求。这就需要,给每个请求一个唯一的值。然后,当在callback队列接受到消息,就会检查这个属性,基于属性值,就可以把响应和请求匹配。如果是一个未知的值,就会安全删除消息。

你可能会问,为什么在callback队列中我们要丢弃不知道的消息,而不是抛一个错误?因为在服务端有条件竞争的可能性。尽管很少发生,RPC服务还是可能在发送响应之后,但是在发送ack之前,挂掉了。如果发生,重启TPC服务就会再次发送请求。这就是为什么客户端需要处理重复消息,并且RPC服务需要幂等的。

python中rabbitmq中heartbeat的最优配置_rabbitmq_14

我们的RPC的工作原理如下:

  • 当客户端启动,会创建一个匿名的唯一队列
  • 对于一个RPC请求,客户端发送的消息有两个属性:用来表示请求回复时的回调队列,和,每个请求的唯一值
  • 请求会发送到一个队列
  • RPC消费者则等待这个队列的请求,当接受到请求就会处理请求并发送带着结果的消息到reply_to字段的队列
  • 客户端等待回调队列的数据,当消息出现,会检查属性,如果个请求的值匹配,则会给应该返回响应。

服务端的原理如下:

  • 建立连接并声明队列
  • 声明fibonacci函数
  • 给声明回调函数,这是rpc服务的核心。当接受请求时会执行这个函数并发回响应
  • 我们可能想运行多个服务,为了公平分配任务,我们需要设置

客户端的运行原理如下:

  • 建立连接,信道,并给每个请求的回复声明一个专有的队列
  • 订阅(basic_consume)该队列,则就能接受RPC服务的回复
  • 回调函数会在接受到回复后执行。每个RPC返回的消息会检查,如果检查通过,会保存在self.response,并且结束消费循环
  • 接下来定义主要的函数call函数,它来发送RPC请求
  • 在call函数中,我们生成一个唯一的数字并保存,在on_response函数会使用它与RPC服务发回来的值比较
  • 在call方法中,我们发的消息有两个属性:reply_to和correlation_id
  • 最后我们有一个循环,直到RPC返回响应。

到此这篇enoent解决办法rabbitmq(rabbitmq queuedeclare)的文章就介绍到这了,更多相关内容请继续浏览下面的相关推荐文章,希望大家都能在编程的领域有一番成就!

版权声明


相关文章:

  • spark面试题(spark面试题shuffle)2025-11-05 08:27:05
  • deepsort复现(deeplabv3+复现)2025-11-05 08:27:05
  • 连接redis不需要用户名么为什么(连接redis不需要用户名么为什么不能用)2025-11-05 08:27:05
  • strace工具(stan工具)2025-11-05 08:27:05
  • spring教程电子书(spring教程 csdn)2025-11-05 08:27:05
  • prgrm怎么读(prigrammer怎么读)2025-11-05 08:27:05
  • treeswap官网(treesoft官网)2025-11-05 08:27:05
  • traceroute(traceroute命令不存在)2025-11-05 08:27:05
  • score是什么意思(华为hmscore是什么意思)2025-11-05 08:27:05
  • Raise a suilen演唱会(illenium演唱会2021)2025-11-05 08:27:05
  • 全屏图片