• 首页 首页 icon
  • 工具库 工具库 icon
    • IP查询 IP查询 icon
  • 内容库 内容库 icon
    • 快讯库 快讯库 icon
    • 精品库 精品库 icon
    • 问答库 问答库 icon
  • 更多 更多 icon
    • 服务条款 服务条款 icon

RabbitMQ极快上手延迟队列

武飞扬头像
end for time
帮助1

安装

官网

参考文章:

https://blog.csdn.net/miaoye520/article/details/123207661

https://blog.csdn.net/lvoelife/article/details/126658695

安装Erlang,并添加环境变量ERLANG_HOME,命令行运行erl

安装rabbitmqrabbitmq-server-3.12.0.exe

注意Erlang要选择对应的版本

安装RabbitMQ-Plugins插件,rabbitmq-plugins enable rabbitmq_management

访问 http://localhost:15672/

账号密码 guest,guest

使用

中文文档

参考文章:
https://blog.csdn.net/weixin_45698935/article/details/123481137
https://www.liwenzhou.com/posts/Go/rabbitmq-1/

Go实践:

go get github.com/streadway/amqp 

基本使用:

生产者

  1.  
    package main
  2.  
  3.  
    import (
  4.  
    "github.com/streadway/amqp"
  5.  
    "log"
  6.  
    )
  7.  
  8.  
    type App struct {
  9.  
    Name string
  10.  
    Num int
  11.  
    }
  12.  
  13.  
    type Root struct {
  14.  
    Apps []*App
  15.  
    }
  16.  
  17.  
    func main() {
  18.  
    // 1.尝试连接RabbitMQ,建立连接
  19.  
    // 该连接抽象了套接字连接,并为我们处理协议版本协商和认证等。
  20.  
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
  21.  
    if err != nil {
  22.  
    log.Fatalf(err.Error())
  23.  
    }
  24.  
    defer conn.Close()
  25.  
  26.  
    // 2.创建一个通道, 大多数API都是该通道操作的
  27.  
    ch, err := conn.Channel()
  28.  
    defer ch.Close()
  29.  
  30.  
    // 3.声明消息要发送的队列
  31.  
    q, err := ch.QueueDeclare("hello", false, false, false, false, nil)
  32.  
    if err != nil {
  33.  
    log.Fatalf(err.Error())
  34.  
    }
  35.  
  36.  
    body := "hello world12"
  37.  
    err = ch.Publish("", q.Name, false, false, amqp.Publishing{
  38.  
    ContentType: "text/plain",
  39.  
    Body: []byte(body),
  40.  
    })
  41.  
    if err != nil {
  42.  
    log.Fatalf(err.Error())
  43.  
    }
  44.  
  45.  
    return
  46.  
    }
学新通

消费者

  1.  
    package main
  2.  
  3.  
    import (
  4.  
    "github.com/streadway/amqp"
  5.  
    "log"
  6.  
    )
  7.  
  8.  
    func main() {
  9.  
    // 1.尝试连接RabbitMQ,建立连接
  10.  
    // 该连接抽象了套接字连接,并为我们处理协议版本协商和认证等。
  11.  
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
  12.  
    if err != nil {
  13.  
    log.Fatalf(err.Error())
  14.  
    }
  15.  
    defer conn.Close()
  16.  
  17.  
    // 2.创建一个通道, 大多数API都是该通道操作的
  18.  
    ch, err := conn.Channel()
  19.  
    defer ch.Close()
  20.  
  21.  
    // 3.声明消息要发送的队列
  22.  
    q, err := ch.QueueDeclare("hello", false, false, false, false, nil)
  23.  
    if err != nil {
  24.  
    log.Fatalf(err.Error())
  25.  
    }
  26.  
  27.  
    // 获取接收消息的Delivery通道
  28.  
    msgs, err := ch.Consume(
  29.  
    q.Name, // queue
  30.  
    "",     // consumer
  31.  
    true,   // auto-ack
  32.  
    false,  // exclusive
  33.  
    false,  // no-local
  34.  
    false,  // no-wait
  35.  
    nil,    // args
  36.  
    )
  37.  
    if err != nil {
  38.  
    log.Println(err.Error(), "Failed to register a consumer")
  39.  
    }
  40.  
  41.  
    forever := make(chan bool)
  42.  
  43.  
    go func() {
  44.  
    for d := range msgs {
  45.  
    log.Printf("Received a message: %s", d.Body)
  46.  
    }
  47.  
    }()
  48.  
  49.  
    log.Printf(" [*] Waiting for messages. To exit press CTRL C")
  50.  
    <-forever
  51.  
    }
学新通

延迟队列

不应该是队列,而应该是堆。将先过期的消息排在前面

需要安装插件:有插件的支持 Community Plugins — RabbitMQ rabbitmq_delayed_message_exchange

首先要引入一个概念:死信队列,当我们的发送的消息被接收端nck或reject,消息在队列的存活时间超过设定的 TTL,消息数量超过最大队列长度,这样的消息会被认为是死信(“dead letter”)通过配置的死信交换机这样的死信可以被投递到对应的死信队列中

学新通

发送者:

发送者的实现就很简单了,就和普通的发送实现几乎一致,因为反正就是投递到对应的队列中就可以了,只需要将发送消息的部分,在消息的 header 中加入 x-delay 字段表示当前消息的 TTL 就可以了,也就是设定延迟时间,注意单位为毫秒

  1.  
    package main
  2.  
  3.  
    import (
  4.  
    "log"
  5.  
    "os"
  6.  
    "strings"
  7.  
  8.  
    "github.com/streadway/amqp"
  9.  
    )
  10.  
  11.  
    func main() {
  12.  
    failOnError := func (err error, msg string) {
  13.  
    if err != nil {
  14.  
    log.Fatalf("%s: %s", msg, err)
  15.  
    }
  16.  
    }
  17.  
  18.  
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
  19.  
    failOnError(err, "Failed to connect to RabbitMQ")
  20.  
    defer conn.Close()
  21.  
  22.  
    ch, err := conn.Channel()
  23.  
    failOnError(err, "Failed to open a channel")
  24.  
    defer ch.Close()
  25.  
  26.  
    body := bodyFrom(os.Args)
  27.  
    // 将消息发送到延时队列上
  28.  
    err = ch.Publish(
  29.  
    "", // exchange 这里为空则不选择 exchange
  30.  
    "test_delay",     // routing key
  31.  
    false, // mandatory
  32.  
    false, // immediate
  33.  
    amqp.Publishing{
  34.  
    ContentType: "text/plain",
  35.  
    Body:       []byte(body),
  36.  
    Expiration: "10000", // 设置五秒的过期时间
  37.  
    })
  38.  
    failOnError(err, "Failed to publish a message")
  39.  
  40.  
    log.Printf(" [x] Sent %s", body)
  41.  
    }
  42.  
  43.  
    func bodyFrom(args []string) string {
  44.  
    var s string
  45.  
    if (len(args) < 2) || os.Args[1] == "" {
  46.  
    s = "hello3"
  47.  
    } else {
  48.  
    s = strings.Join(args[1:], " ")
  49.  
    }
  50.  
    return s
  51.  
    }
学新通

接收者

  1.  
    package main
  2.  
  3.  
    import (
  4.  
    "log"
  5.  
  6.  
    "github.com/streadway/amqp"
  7.  
    )
  8.  
  9.  
    func main() {
  10.  
  11.  
    failOnError := func (err error, msg string) {
  12.  
    if err != nil {
  13.  
    log.Fatalf("%s: %s", msg, err)
  14.  
    }
  15.  
    }
  16.  
  17.  
    // 建立链接
  18.  
    conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
  19.  
    failOnError(err, "Failed to connect to RabbitMQ")
  20.  
    defer conn.Close()
  21.  
  22.  
    ch, err := conn.Channel()
  23.  
    failOnError(err, "Failed to open a channel")
  24.  
    defer ch.Close()
  25.  
  26.  
    // 声明一个主要使用的 exchange
  27.  
    err = ch.ExchangeDeclare(
  28.  
    "logs",   // name
  29.  
    "fanout", // type
  30.  
    true,     // durable
  31.  
    false,    // auto-deleted
  32.  
    false,    // internal
  33.  
    false,    // no-wait
  34.  
    nil,      // arguments
  35.  
    )
  36.  
    failOnError(err, "Failed to declare an exchange")
  37.  
  38.  
    // 声明一个常规的队列, 其实这个也没必要声明,因为 exchange 会默认绑定一个队列
  39.  
    q, err := ch.QueueDeclare(
  40.  
    "test_logs",    // name
  41.  
    false, // durable
  42.  
    false, // delete when unused
  43.  
    true,  // exclusive
  44.  
    false, // no-wait
  45.  
    nil,   // arguments
  46.  
    )
  47.  
    failOnError(err, "Failed to declare a queue")
  48.  
  49.  
    /**
  50.  
    * 注意,这里是重点!!!!!
  51.  
    * 声明一个延时队列, ß我们的延时消息就是要发送到这里
  52.  
    */
  53.  
    _, errDelay := ch.QueueDeclare(
  54.  
    "test_delay",    // name
  55.  
    false, // durable
  56.  
    false, // delete when unused
  57.  
    true,  // exclusive
  58.  
    false, // no-wait
  59.  
    amqp.Table{
  60.  
    // 当消息过期时把消息发送到 logs 这个 exchange
  61.  
    "x-dead-letter-exchange":"logs",
  62.  
    },   // arguments
  63.  
    )
  64.  
    failOnError(errDelay, "Failed to declare a delay_queue")
  65.  
  66.  
    err = ch.QueueBind(
  67.  
    q.Name, // queue name, 这里指的是 test_logs
  68.  
    "",     // routing key
  69.  
    "logs", // exchange
  70.  
    false,
  71.  
    nil)
  72.  
    failOnError(err, "Failed to bind a queue")
  73.  
  74.  
    // 这里监听的是 test_logs
  75.  
    msgs, err := ch.Consume(
  76.  
    q.Name, // queue name, 这里指的是 test_logs
  77.  
    "",     // consumer
  78.  
    true,   // auto-ack
  79.  
    false,  // exclusive
  80.  
    false,  // no-local
  81.  
    false,  // no-wait
  82.  
    nil,    // args
  83.  
    )
  84.  
    failOnError(err, "Failed to register a consumer")
  85.  
  86.  
    forever := make(chan bool)
  87.  
  88.  
    go func() {
  89.  
    for d := range msgs {
  90.  
    log.Printf(" [x] %s", d.Body)
  91.  
    }
  92.  
    }()
  93.  
  94.  
    log.Printf(" [*] Waiting for logs. To exit press CTRL C")
  95.  
    <-forever
  96.  
    }
学新通

参考:golang 使用 rabbitmq 延迟队列-腾讯云开发者社区-腾讯云

这篇好文章是转载于:学新通技术网

  • 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
  • 本站站名: 学新通技术网
  • 本文地址: /boutique/detail/tanhggkbfh
系列文章
更多 icon
同类精品
更多 icon
继续加载