• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

Rabbit MQ的基本使用

武飞扬头像
Naraseny
帮助1

目录

1. MQ是什么,有哪些作用?

2. 主要的MQ框架有哪些?

3. RabbitMQ安装

4. RabbitMQ中的主要概念

5. 消息队列的核心概念

6. 一个简单的生产者和消费者示例。


1. MQ是什么,有哪些作用?



消息队列(Message Queue,简称MQ),从字面意思上看,本质是个队列,FIFO先入先出,只不过队列中存放的内容是message而已。是程序与程序之间传递信息的一种方式。

作用:

异步: 一般适用于耗时但非核心业务的处理,即:程序无需等待非核心业务处理完成,即可进行下一步操作,有点像电路中的并行电路
解耦:通过引入消息总线,将模块之间的直接的调用,转化为消息的传递。
消峰:对于来不及及时处理的消息,可以暂存于消息队列里排队。
缺点:

可用性降低:系统加了对消费队列服务的依赖,依赖越多的系统越脆弱。
复杂度提高:加入消息队列,如果消息有重复如何处理? 消息丢失如何处理?
一致性问题:方法的调用者不需等待执行者的返回,如果执行者失败,则容易造成状态不一致。

2. 主要的MQ框架有哪些?



MQ框架非常之多,比较流行的有RabbitMq、ActiveMq、ZeroMq、kafka,以及阿里开源的RocketMQ。本文主要介绍RabbitMq。

RabbitMQ是以AMQP协议实现的一种消息中间件产品。AMQP是Advanced Message Queuing Protocol的简称,是一个面向消息中间件的开放式标准应用层协议。

常用消息队列对照表:
学新通

3. RabbitMQ安装


1)下载rabbitmq镜像

docker pull rabbitmq:management

使用management标签是下载包含管理控制台的版本,在下载镜像时没有指定版本号,则会到docker仓库中下载最新的镜像版本。

2)运行镜像

##方式一:默认guest用户,密码也是guest

$ docker run -d -p 5672:5672 -p 15672:15672 --name rabbitmq rabbitmq:management

方式二:

  1.  
    $ docker run -d \
  2.  
    --name my-rabbitmq \
  3.  
    -p 5672:5672 -p 15672:15672 \
  4.  
    -v /data:/var/lib/rabbitmq \
  5.  
    --hostname my-rabbitmq-host \
  6.  
    -e RABBITMQ_DEFAULT_VHOST=my_vhost \
  7.  
    -e RABBITMQ_DEFAULT_USER=admin \
  8.  
    -e RABBITMQ_DEFAULT_PASS=admin \
  9.  
    --restart=always \
  10.  
    rabbitmq:management

参数说明:

-d:后台运行容器

-name:指定容器名

-p:指定服务运行的端口(5672:应用访问端口;15672:控制台Web端口号)

-v:映射目录或文件,启动了一个数据卷容器,数据卷路径为:/var/lib/rabbitmq,再将此数据卷映射到住宿主机的/data目录

--hostname:主机名(RabbitMQ的一个重要注意事项是它根据所谓的 “节点名称” 存储数据,默认为主机名)

-e:指定环境变量;(

RABBITMQ_DEFAULT_VHOST:默认虚拟机名;

RABBITMQ_DEFAULT_USER:默认的用户名;

RABBITMQ_DEFAULT_PASS:默认用户名的密码)

--restart=always:当Docker重启时,容器能自动启动

rabbitmq:management:镜像名

注1:RABBITMQ_DEFAULT_VHOST=my_vhost,my_vhost名字请记好,在之后的编程中要用到,

如果启动时没指定,默认值为/

#4.进入RabbitMQ管理平台进行相关操作

注1:容器启动后,可以通过docker logs 窗口ID/容器名字 查看日志

docker logs my-rabbitmq

注2:停止并删除所有容器

docker stop $(docker ps -aq) && docker rm $(docker ps -aq)
 

docker ps

4)开放端口

  1.  
    firewall-cmd --zone=public --add-port=15672/tcp --permanent # 开放15672端口
  2.  
    firewall-cmd --zone=public --add-port=5672/tcp --permanent
  3.  
    firewall-cmd --reload # 配置立即生效

5)运行测试

http://192.168.164.128:15672/#/

4. RabbitMQ中的主要概念



RabbitMQ是实现了高级消息服务队列协议(AMQP)的开源消息代理软件,也称为面向消息的中间件。使用erlang语言开发。AMQP是Advanced Message Queuing Protocol的简称,是面向消息中间件的开放式标准应用协议,这样意味着RabbitMQ可以更容易的构建异构的系统。

1)Server(Broker) 服务器(或叫经纪人)

可以理解为消息队列服务器的主体,接收客户端连接,实现AMQP协议的消息队列和路由功能的进程

2) Virtual Host 虚拟机

虚拟主机的概念,它是对Broker的虚拟划分,将消息生产者,消费者,和它们依赖的AMQP相关的结构进行隔离,一般是从安全方面的考虑。类似权限控制组,一个Virtual Host里可以有多个Exchange和Queue。类似于一个数据库实例中可以创建多个数据库的概念。每个Virtual Host都是个独立的。

3)Exchange 交换机

交换机,接收生产者发送的消息,并根据Routing Key将消息路由到服务器中的队列Queue。交换机分为:fanout、direct、topic三种类型

4) Queue 队列

消息队列,用于存储还未被消费者消费的消息,用于存放业务数据

5)Message 消息

由Header和body组成,Header是由生产者添加的各种属性的集合,包括Message是否被持久化、优先级是多少、由哪个Message Queue接收等;body是真正需要发送的数据内容;

在业务系统中代码真正的业务数据

6)Binding Key 绑定关键字

绑定关键字,将一个特定的Exchange和一个特定的Queue绑定起来

7)Routing key:路由关键字,Exchange根据这个关键字进行消息投递

8)connection:连接,代理生产者,消费者、Broker之间通信的物理网络。

9)Channel:消息通道,用于连接生产者和消费者的逻辑结构。在客户端的每个连接里可以建立多个Channel,每个Channel代表一个会话,通过Channel可以隔离同一个连接中的不同交互内容。

10)Producer: 消息的生产者,制造消息并发送消息的程序

11)Consumer:消息消费者,接收并处理消息的程序。

5. 消息队列的核心概念



消息队列中的核心概念包括以下几个:生产者、队列、消费者、消息 。

生产者生产消息并投递到队列中,

消费者可以从队列中获取消息并消费,

消息指的是各个服务之间要传递的数据。

6. 一个简单的生产者和消费者示例。



本示例包含一个消息生产者,队列(RabbitMQ),消息消费者。消息生产者生产消息,并将消费发送到队列服务器,消息消费者监听并消费队列中的消息。

备注:本示例通过在原有的springcloud示例项目中加入新模块来演示RabbitMQ的基本使用。

6.1 消息发送者模块
1)引入必要的依赖

  1.  
    <dependencies>
  2.  
    <dependency>
  3.  
    <groupId>org.springframework.boot</groupId>
  4.  
    <artifactId>spring-boot-starter-web</artifactId>
  5.  
    </dependency>
  6.  
    <dependency>
  7.  
    <groupId>org.springframework.boot</groupId>
  8.  
    <artifactId>spring-boot-starter-websocket</artifactId>
  9.  
    </dependency>
  10.  
     
  11.  
    <dependency>
  12.  
    <groupId>org.projectlombok</groupId>
  13.  
    <artifactId>lombok</artifactId>
  14.  
    <optional>true</optional>
  15.  
    </dependency>
  16.  
    <dependency>
  17.  
    <groupId>org.springframework.boot</groupId>
  18.  
    <artifactId>spring-boot-starter-test</artifactId>
  19.  
    <scope>test</scope>
  20.  
    </dependency>
  21.  
    <dependency>
  22.  
    <groupId>org.springframework.boot</groupId>
  23.  
    <artifactId>spring-boot-starter-amqp</artifactId>
  24.  
    </dependency>
学新通

2)项目配置文件

  1.  
    server.port=8080
  2.  
    ## rabbitmq config
  3.  
    spring.rabbitmq.host=192.168.199.144
  4.  
    spring.rabbitmq.port=5672
  5.  
    spring.rabbitmq.username=admin
  6.  
    spring.rabbitmq.password=admin
  7.  
    spring.rabbitmq.virtual-host=my_vhost

3)启动类

  1.  
    /**
  2.  
    * @author Administrator
  3.  
    * @create 2020-02-1518:53
  4.  
    */
  5.  
    @SpringBootApplication
  6.  
    public class SenderApp {
  7.  
     
  8.  
    public static void main(String[] args) {
  9.  
    SpringApplication.run(SenderApp.class, args);
  10.  
    }
  11.  
     
  12.  
    }

4) RabbitMQ配置类

  1.  
    /**
  2.  
    * @author Administrator
  3.  
    * @create 2020-02-1519:04
  4.  
    */
  5.  
    @Configuration
  6.  
    @Slf4j
  7.  
    public class RabbitMQConfig {
  8.  
     
  9.  
    @Bean
  10.  
    public Queue helloQueue() {
  11.  
    return new Queue("hello");
  12.  
    }
  13.  
     
  14.  
    }

5)编写服务,用于向队列中存放 消息

  1.  
    package com.zking.rabbit.rabbitmqdemo.controller;
  2.  
     
  3.  
    import org.springframework.amqp.core.AmqpTemplate;
  4.  
    import org.springframework.beans.factory.annotation.Autowired;
  5.  
    import org.springframework.web.bind.annotation.RequestMapping;
  6.  
    import org.springframework.web.bind.annotation.RestController;
  7.  
     
  8.  
    import java.time.LocalDateTime;
  9.  
    import java.time.format.DateTimeFormatter;
  10.  
     
  11.  
    @RestController
  12.  
    public class ProviderController {
  13.  
    @Autowired
  14.  
    private AmqpTemplate amqpTemplate;
  15.  
    @RequestMapping("/send")
  16.  
    public String send(){
  17.  
    amqpTemplate.convertAndSend("hello","这是 生产向队列hello投递消息 时间:" LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
  18.  
    return "success";
  19.  
    }
  20.  
    }
学新通

运行项目测试

学新通

刷星一次页面 数量随着而变化 

学新通

6)编写消费者,消费消息 

  1.  
    package com.zking.rabbit.rabbitmqdemo.component;
  2.  
     
  3.  
    import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  4.  
    import org.springframework.amqp.rabbit.annotation.RabbitListener;
  5.  
    import org.springframework.stereotype.Component;
  6.  
     
  7.  
    @Component
  8.  
    @RabbitListener(queues = {"hello"})
  9.  
    public class Consumer {
  10.  
        @RabbitHandler
  11.  
        public void handler(String msg){
  12.  
            System.out.println("消费者接收 -- " msg);
  13.  
        }
  14.  
    }

学新通

 重启查看效果

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhgbgkgf
系列文章
更多 icon
同类精品
更多 icon
继续加载