• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

RabbitMQ的5大核心概念

武飞扬头像
Michael_lcf
帮助1

RabbitMQ基本操作: https://blog.csdn.net/Michael_lcf/article/details/124677268
RabbitMQ的5大核心概念: https://blog.csdn.net/Michael_lcf/article/details/126435452
RabbitMQ实现分布式WebSocket通信: https://blog.csdn.net/Michael_lcf/article/details/126403772

在管理界面中,可以看到有很多选项:
学新通

RabbitMQ的5大核心概念:Connection(连接)、Channel(信道)、Exchange(交换机)、Queue(队列)、Virtual host(虚拟主机)。

1、RabbitMQ的工作模型

学新通 学新通

Broker表示RabbitMQ服务,每个Broker里面至少有一个Virtual host虚拟主机;每个虚拟主机中有自己的Exchange交换机、Queue队列;Exchange与Queue之间通过RoutingKey形成绑定关系Binding。producer(生产者)和consumer(消费者)通过与Broker建立Connection来保持连接,然后在Connection的基础上建立若干Channel信道,用来发送与接收消息。

2、核心概念之Connection(连接)

每个producer(生产者)或者consumer(消费者)要通过RabbitMQ发送与消费消息,首先就要与RabbitMQ建立连接,这个连接就是Connection。
Connection是一个TCP长连接。

3、核心概念之Channel(信道)

Channel是在Connection的基础上建立的虚拟连接,RabbitMQ中大部分的操作都是使用Channel完成的。例如:声明Queue、声明Exchange、发布消息、消费消息等。

有了Connection,完全可以使用Connection完成Channel的工作,为什么还要引入Channel虚拟连接的概念呢

因为现在的程序都是支持多线程的,如果没有Channel,那么每个线程在访问RabbitMQ时都要建立一个Connection这样的TCP连接,对于操作系统来说,建立和销毁TCP连接是非常大的开销,在系统访问流量高峰时,会严重影响系统性能。
Channel就是为了解决这种问题,通常情况下,每个线程创建单独的Channel进行通讯,每个Channel都有自己的channel id帮助Broker和客户端识别Channel,所以Channel之间是完全隔离的。

ConnectionChannel之间的关系可以比作光纤电缆,如果把Connection比作一条光纤电缆,那么Channel就相当于是电缆中的一束光纤。

4、核心概念之Virtual host(虚拟主机)

Virtual host是一个虚拟主机的概念,一个Broker中可以有多个Virtual host,每个Virtual host都有一套自己的Exchange和Queue,同一个Virtual host中的Exchange和Queue不能重名,不同的Virtual host中的Exchange和Queue名字可以一样。这样,不同的用户在访问同一个RabbitMQ Broker时,可以创建自己单独的Virtual host,然后在自己的Virtual host中创建Exchange和Queue,很好地做到了不同用户之间相互隔离的效果。
学新通

5、核心概念之Queue(队列)

Queue是一个用来存放消息的队列,生产者发送的消息会被放到Queue中,消费者消费消息时也是从Queue中取走消息。

6、核心概念之Exchange(交换机)

Exchange是一个比较重要的概念,它是消息到达RabbitMQ的第一站,主要负责根据不同的分发规则将消息分发到不同的Queue,供订阅了相关Queue的消费者消费到指定的消息。那Exchange有哪些分发消息的规则呢?这就要说到Exchange的4种类型了:directfanouttopicheaders

在介绍这4种类型的Exchange之前,我们先来了解一下另外一个比较重要的概念:Routing key,翻译成中文就是路由键。当我们创建好ExchangeQueue之后,需要使用Routing key(通常叫作Binding key)将它们绑定起来,producer在向Exchange发送一条消息的时候,必须指定一个Routing key,然后Exchange接收到这条消息之后,会解析Routing key,然后根据Exchange和Queue的绑定规则,将消息分发到符合规则的Queue中。
学新通

6.1、direct

direct的意思是直接的,direct类型的Exchange会将消息转发到指定Routing key的Queue上,Routing key的解析规则为精确匹配。也就是只有当producer发送的消息的Routing key与某个Binding key相等时,消息才会被分发到对应的Queue上。
学新通

比如我们现在有一个direct类型的Exchange,它下面绑定了三个Queue,Binding key分别是ORDER/GOODS/STOCK:
学新通

然后我们向该Exchange中发送一条消息,消息的Routing key是goods
学新通

按照规则分析,这条消息应该被路由到MY_EXCHANGE_GOODS_QUEUE这个Queue。消息发送成功之后,我们去Queues中查看,发现确实只有MY_EXCHANGE_GOODS_QUEUE这个QUEUE接收到了一条消息。
学新通

进入这个队列,通过getMessage取出消息查看,确实是我们刚才手动发送的那条消息。
学新通

注:direct类型的Exchange在分发消息时,必须保证producer发送消息的Routing key与Exchange和Queue绑定的Binding key相等才可以。

6.2、fanout

fanout是扇形的意思,该类型通常叫作广播类型。fanout类型的Exchange不处理Routing key,而是会将发送给它的消息路由到所有与它绑定的Queue上。
学新通

比如我们现在有一个fanout类型的Exchange,它下面绑定了三个Queue,Binding key分别是ORDER/GOODS/STOCK:
学新通

然后我们向该Exchange中发送12条消息,消息的Routing key随便填一个值abc:
学新通

按照规则分析,这条消息应该被路由到所有与该Exchange绑定的Queue,即三个Queue都应该会受到消息。消息发送成功之后,我们去Queues中查看,发现确实每个QUEUE都接收到了12条消息。
学新通

进入这三个QUEUE,通过getMessage取出消息查看,确实是我们刚才手动发送的那条消息。
学新通

注:fanout类型的Exchange不管Routing key是什么,它都会将接收到的消息分发给所有与自己绑定了的Queue上。

6.3、topic

topic的意思是主题,topic类型的Exchange会根据通配符对Routing key进行匹配,只要Routing key满足某个通配符的条件,就会被路由到对应的Queue上。通配符的匹配规则如下:

● Routing key必须是一串字符串,每个单词用 “.” 分隔;
● 符号 “#” 表示匹配一个或多个单词;
● 符号 “*” 表示匹配一个单词。

例如:
*.123” 能够匹配到 “abc.123”,但匹配不到 “abc.def.123”;
#.123” 既能够匹配到 “abc.123”,也能匹配到 “abc.def.123”。

比如我们现在有一个topic类型的Exchange,它下面绑定了4个Queue,Binding key分别是 *.ordergoods.*#.stockuser.#
学新通

然后我们向该Exchange中发送一条消息,消息的Routing key为:user.abc.order

学新通
按照规则分析,USER.ABC.ORDER这个Routing key只可以匹配到 “USER.#” ,所以,这条消息应该被路由到MY_TOPIC_USER_QUEUE这个Queue中。消息发送成功之后,我们去Queues中查看,发现结果符合我们的预期。
学新通

进入这个QUEUE,通过getMessage取出消息查看,确实是我们刚才手动发送的那条消息。
学新通

6.4、headers

日常工作中,以上三种类型的Exchange已经能够满足我们基本上所有的需求了,headers模式并不经常使用,我们只需要对headers Exchange有一个基本的了解就可以了。

headers Exchange中,Exchange与Queue之间的绑定不再通过Binding key绑定,而是通过Arguments绑定。比如我们现在有一个headers类型的Exchange,下面通过不同的Arguments绑定了三个Queue:

学新通

producer在发送消息时可以添加headers属性,Exchange接收到消息后,会解析headers属性,只要我们上面配置的Arguments中的所有属性全部被包含在Headers中并且值相等,那么这条消息就会被路由到对应的Queue中。

比如我们向上面的Exchange中发送一条消息,消息的Headers中添加“x=1”:

学新通

根据规则,只有queue1这个队列满足x=1的条件,queue2中的y=2条件不满足,所以,消息应该只被路由到queue1队列中。消息发送成功后,我们可以看到queue1确实收到了消息:
学新通

并且这条消息就是我们刚才手动发送的消息:

学新通

然后我们再发送一条消息,消息的headers中有两个属性:x=1,y=2:

学新通

根据规则,my_headers_x1_queuex=1 的条件满足,my_headers_x1y2_queuex=1y=2的条件满足,my_headers_y2_queuey=2的条件满足,所以,这三个Queue应该都能够收到这条消息。消息发送成功后,结果符合预期:
学新通

这条消息就是我们刚才手动发送的消息:
学新通

7. 延迟队列

7.1. 延迟队列概念

延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。

7.2. 延迟队列使用场景

1.订单在十分钟之内未支付则自动取消
2.新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒。
3.用户注册成功后,如果三天内没有登陆则进行短信提醒。
4.用户发起退款,如果三天内没有得到处理则通知相关运营人员。
5.预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议

这些场景都有一个特点,需要在某个事件发生之后或者之前的指定时间点完成某一项任务,如:发生订单生成事件,在十分钟之后检查该订单支付状态,然后将未支付的订单进行关闭;看起来似乎使用定时任务,一直轮询数据,每秒查一次,取出需要被处理的数据,然后处理不就完事了吗?如果数据量比较少,确实可以这样做,比如:对于“如果账单一周内未支付则进行自动结算”这样的需求,如果对于时间不是严格限制,而是宽松意义上的一周,那么每天晚上跑个定时任务检查一下所有未支付的账单,确实也是一个可行的方案。但对于数据量比较大,并且时效性较强的场景,如:“订单十分钟内未支付则关闭“,短期内未支付的订单数据可能会有很多,活动期间甚至会达到百万甚至千万级别,对这么庞大的数据量仍旧使用轮询的方式显然是不可取的,很可能在一秒内无法完成所有订单的检查,同时会给数据库带来很大压力,无法满足业务要求而且性能低下。

学新通

7.3. RabbitMQ中的TTL

TTL是什么呢?TTL是RabbitMQ中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间,单位是毫秒。换句话说,如果一条消息设置了TTL属性或者进入了设置TTL属性的队列,那么这条消息如果在TTL设置的时间内没有被消费,则会成为"死信"。如果同时配置了队列的TTL和消息的TTL,那么较小的那个值将会被使用,有两种方式设置TTL。

7.3.1. 消息设置TTL

针对每条消息设置TTL。
学新通

7.3.2. 队列设置TTL

创建队列的时候设置队列的 “x-message-ttl” 属性。
学新通

7.3.3. 队列TTL和消息TTL两者的区别

如果设置了队列的TTL属性,则一旦消息过期,就会被队列丢弃(如果配置了死信队列被丢到死信队列中);
如果设置了消息的TTL属性,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间;
如果不设置TTL,表示消息永远不会过期;
如果将TTL设置为0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃。

TTL 死信队列 可以实现延时队列的功能,想想看延时队列不就是想要消息延迟多久被处理吗,TTL则刚好能让消息在延迟多久之后成为死信,另一方面,成为死信的消息都会被投递到死信队列里,这样只需要消费者一直消费死信队列里的消息就完事了,因为里面的消息都是希望被立即处理的消息。

7.4 Rabbitmq插件实现延迟队列

#用rabbitmq-diagnostics查看 RabbitMQ 版本
rabbitmq-diagnostics server_version
#用rabbitmq-diagnostics查看 Erlang 版本
rabbitmq-diagnostics erlang_version

在官网上下载https://www.rabbitmq.com/community-plugins.html,下载 rabbitmq_delayed_message_exchange插件,放置到/usr/lib/rabbitmq/lib/rabbitmq_server-3.8.8/plugins目录

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

systemctl start rabbitmq-server.service
systemctl status rabbitmq-server.service
systemctl restart rabbitmq-server.service

8 死信队列

死信的来源:
消息TTL过期。
队列达到最大长度(队列满了,无法再添加数据到mq中)。
消息被拒绝(basic.rejectbasic.nack)并且 requeue=false。

学新通

#、(0_0) 分割线 啊哈

哇咔咔、到此你看到了 希望的小尾巴!!!!!!

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhggkhca
系列文章
更多 icon
同类精品
更多 icon
继续加载