• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

应用SpringBoot 整合 RabbitMQ

武飞扬头像
情绪大瓜皮丶
帮助1

引入依赖

创建 maven 项目 RabbirMQ,引入所使用的依赖

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.2.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        <!-- JSON -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>2.0.20</version>
        </dependency>

        <!-- Lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
    </dependencies>
学新通

RabbitMQ 生产者

配置文件以及主启动类

首先在项目中创建子 maven 项目 RabbitProducer,配置 RabbitMQ 服务地址

server:
  port: 8081

spring:
  rabbitmq:
    host: <RabbitMq Server ip>
    port: 5672

创建主启动类

@SpringBootApplication
public class ProducerApplication {

    public static void main(String[] args) {
        SpringApplication.run(ProducerApplication.class);
    }

}

配置 RabbitMQ 的 Exchange 以及 Queue

此处分别配置两种模式的交换机 Exchange,即 Direct 模式和 Topic 模式,有关交换机模式的内容请阅读RabbitMQ 基础

Direct Exchange

@Configuration
public class RabbitConfigFroDirect {
    /**
     * 创建 Direct 模式 Queue
     * Queue name:My_Direct_Queue01
     */
    @Bean
    public Queue directQueue01() {
        return new Queue("My_Direct_Queue01");
    }
    /**
     * 创建 Direct 模式 Queue
     * Queue name:My_Direct_Queue02
     */
    @Bean
    public Queue directQueue02() {
        return new Queue("My_Direct_Queue02");
    }
    /**
     * 创建 Direct 模式 Exchange
     * Exchange name:My_Direct_Exchange
     */
    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange("My_Direct_Exchange");
    }
    /**
     * 绑定创建的 Exchange 以及 Queue01
     * Routing Key:Queue01
     */
    @Bean
    public Binding bindingDirect01() {
        return BindingBuilder.bind(directQueue01()).to(directExchange()).with("Queue01");
    }
    /**
     * 绑定创建的 Exchange 以及 Queue02
     * Routing Key:Queue02
     */
    @Bean
    public Binding bindingDirect02() {
        return BindingBuilder.bind(directQueue02()).to(directExchange()).with("Queue02");
    }

}
学新通

Topic Exchange

@Configuration
public class RabbitConfigForTopic {
    /**
     * 创建 Topic 模式 Queue
     * Queue name:My_Topic_Queue01
     */
    @Bean
    public Queue topicQueue01() {
        return new Queue("My_Topic_Queue01");
    }
    /**
     * 创建 Topic 模式 Queue
     * Queue name:My_Topic_Queue02
     */
    @Bean
    public Queue topicQueue02() {
        return new Queue("My_Topic_Queue02");
    }
    /**
     * 创建 Topic 模式 Exchange
     * Exchange name:My_Topic_Exchange
     */
    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange("My_Topic_Exchange");
    }
    /**
     * 绑定创建的 Exchange 以及 Queue01
     * Routing Key:*.topic
     */
    @Bean
    public Binding bindingTopic01() {
        return BindingBuilder.bind(topicQueue01()).to(topicExchange()).with("*.topic");
    }
    /**
     * 绑定创建的 Exchange 以及 Queue02
     * Routing Key:#.topic
     */
    @Bean
    public Binding bindingTopic02() {
        return BindingBuilder.bind(topicQueue02()).to(topicExchange()).with("#.topic");
    }

}
学新通

编写消息发送的服务类

编写消息发送服务类的接口,定义发送消息的方法,需要指定 Routing Key 以及消息内容

public interface ProducerService {

    /**
     * 发送消息至 RabbitMQ 队列
     * @param routingKey routingKey
     * @param detail 消息内容
     * @return 处理结果
     */
    String sendMessage(String routingKey, String detail);

}

对不同模式的 Exchange 分别实现上述接口,实现消息发送的方法。在调用消息发送的方法时,需要指定 Exchange、Routing Key 以及 消息内容

@Service
public class DirectProducerServiceImpl implements ProducerService {

    @Resource
    private AmqpTemplate amqpTemplate;

    /**
     * 发送消息
     * @param routingKey routingKey
     * @param detail 消息内容
     * @return 结果
     */
    @Override
    public String sendMessage(String routingKey, String detail) {
        amqpTemplate.convertAndSend("My_Direct_Exchange", routingKey, detail);
        return "消息发送成功:{Exchange:"   "My_Direct_Exchange|"   "Routing Key:"   routingKey   "|data:"   detail   "}";
    }
}
学新通
@Service
public class TopicProducerServiceImpl implements ProducerService {

    @Resource
    private AmqpTemplate amqpTemplate;

    /**
     * 发送消息
     * @param routingKey routingKey
     * @param detail 消息内容
     * @return 结果
     */
    @Override
    public String sendMessage(String routingKey, String detail) {
        amqpTemplate.convertAndSend("My_Topic_Exchange", routingKey, detail);
        return "消息发送成功:{Exchange:"   "My_Topic_Exchange|"   "Routing Key:"   routingKey   "|data:"   detail   "}";
    }

}
学新通

编写控制层接收请求并调用消息发送服务

@RestController
@RequestMapping("/producer")
public class ProducerController {

    @Resource(name = "directProducerServiceImpl")
    private ProducerService directProducerService;

    @Resource(name = "topicProducerServiceImpl")
    private ProducerService topicProducerService;

    /**
     * 发送消息
     * @param routingKey routingKey
     * @param detail 消息内容
     * @return 结果
     */
    @GetMapping ("/direct/{routingKey}/{detail}")
    public String sendForDirect(@PathVariable("routingKey") String routingKey,
                                @PathVariable("detail") String detail) {
        return directProducerService.sendMessage(routingKey, detail);
    }

    /**
     * 发送消息
     * @param routingKey routingKey
     * @param detail 消息内容
     * @return 结果
     */
    @GetMapping ("/topic/{routingKey}/{detail}")
    public String sendForTopic(@PathVariable("routingKey") String routingKey,
                                @PathVariable("detail") String detail) {
        return topicProducerService.sendMessage(routingKey, detail);
    }

}
学新通

RabbitMQ 消费者

配置文件以及主启动类

创建子 maven 项目 RabbitConsumer,编写配置文件

server:
 port: 8081

spring:
 rabbitmq:
 host: <RabbitMq Server ip>
 port: 5672

创建主启动类

@SpringBootApplication
public class ConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(ConsumerApplication.class);
    }

}

编写消息接收服务

使用@RabbitListener注解监听指定的队列,获取数据并进行处理即可

注:一般消息的传输使用 JSON 字符串的形式实现

@Component
public class RabbitReceiver {

    /**
     * 监听消息队列 My_Direct_Queue01
     */
    @RabbitListener(queues = "My_Direct_Queue01")
    public void directReceiver01(String detail) {
        System.out.println("消息接收成功:{"   "Queue:My_Direct_Queue01|data:"   detail   "}");
    }
    /**
     * 监听消息队列 My_Direct_Queue02
     */
    @RabbitListener(queues = "My_Direct_Queue02")
    public void directReceiver02(String detail) {
        System.out.println("消息接收成功:{"   "Queue:My_Direct_Queue02|data:"   detail   "}");
    }
    /**
     * 监听消息队列 My_Topic_Queue01
     */
    @RabbitListener(queues = "My_Topic_Queue01")
    public void topicReceiver01(String detail) {
        System.out.println("消息接收成功:{"   "Queue:My_Topic_Queue01|data:"   detail   "}");
    }
    /**
     * 监听消息队列 My_Topic_Queue02
     */
    @RabbitListener(queues = "My_Topic_Queue02")
    public void topicReceiver02(String detail) {
        System.out.println("消息接收成功:{"   "Queue:My_Topic_Queue02|data:"   detail   "}");
    }

}
学新通

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhfjjakf
系列文章
更多 icon
同类精品
更多 icon
继续加载