RabbitMQ极快上手延迟队列
安装
参考文章:
https://blog.csdn.net/miaoye520/article/details/123207661
https://blog.csdn.net/lvoelife/article/details/126658695
安装Erlang,并添加环境变量ERLANG_HOME,命令行运行erl
安装rabbitmq,rabbitmq-server-3.12.0.exe
注意Erlang要选择对应的版本
安装RabbitMQ-Plugins插件,rabbitmq-plugins enable rabbitmq_management
账号密码 guest,guest
使用
参考文章:
https://blog.csdn.net/weixin_45698935/article/details/123481137
https://www.liwenzhou.com/posts/Go/rabbitmq-1/
Go实践:
go get github.com/streadway/amqp
基本使用:
生产者
-
package main
-
-
import (
-
"github.com/streadway/amqp"
-
"log"
-
)
-
-
type App struct {
-
Name string
-
Num int
-
}
-
-
type Root struct {
-
Apps []*App
-
}
-
-
func main() {
-
// 1.尝试连接RabbitMQ,建立连接
-
// 该连接抽象了套接字连接,并为我们处理协议版本协商和认证等。
-
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
-
if err != nil {
-
log.Fatalf(err.Error())
-
}
-
defer conn.Close()
-
-
// 2.创建一个通道, 大多数API都是该通道操作的
-
ch, err := conn.Channel()
-
defer ch.Close()
-
-
// 3.声明消息要发送的队列
-
q, err := ch.QueueDeclare("hello", false, false, false, false, nil)
-
if err != nil {
-
log.Fatalf(err.Error())
-
}
-
-
body := "hello world12"
-
err = ch.Publish("", q.Name, false, false, amqp.Publishing{
-
ContentType: "text/plain",
-
Body: []byte(body),
-
})
-
if err != nil {
-
log.Fatalf(err.Error())
-
}
-
-
return
-
}
消费者
-
package main
-
-
import (
-
"github.com/streadway/amqp"
-
"log"
-
)
-
-
func main() {
-
// 1.尝试连接RabbitMQ,建立连接
-
// 该连接抽象了套接字连接,并为我们处理协议版本协商和认证等。
-
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
-
if err != nil {
-
log.Fatalf(err.Error())
-
}
-
defer conn.Close()
-
-
// 2.创建一个通道, 大多数API都是该通道操作的
-
ch, err := conn.Channel()
-
defer ch.Close()
-
-
// 3.声明消息要发送的队列
-
q, err := ch.QueueDeclare("hello", false, false, false, false, nil)
-
if err != nil {
-
log.Fatalf(err.Error())
-
}
-
-
// 获取接收消息的Delivery通道
-
msgs, err := ch.Consume(
-
q.Name, // queue
-
"", // consumer
-
true, // auto-ack
-
false, // exclusive
-
false, // no-local
-
false, // no-wait
-
nil, // args
-
)
-
if err != nil {
-
log.Println(err.Error(), "Failed to register a consumer")
-
}
-
-
forever := make(chan bool)
-
-
go func() {
-
for d := range msgs {
-
log.Printf("Received a message: %s", d.Body)
-
}
-
}()
-
-
log.Printf(" [*] Waiting for messages. To exit press CTRL C")
-
<-forever
-
}
延迟队列
不应该是队列,而应该是堆。将先过期的消息排在前面
需要安装插件:有插件的支持 Community Plugins — RabbitMQ rabbitmq_delayed_message_exchange
首先要引入一个概念:死信队列,当我们的发送的消息被接收端nck或reject,消息在队列的存活时间超过设定的 TTL,消息数量超过最大队列长度,这样的消息会被认为是死信(“dead letter”)通过配置的死信交换机这样的死信可以被投递到对应的死信队列中
发送者:
发送者的实现就很简单了,就和普通的发送实现几乎一致,因为反正就是投递到对应的队列中就可以了,只需要将发送消息的部分,在消息的 header 中加入 x-delay
字段表示当前消息的 TTL 就可以了,也就是设定延迟时间,注意单位为毫秒
-
package main
-
-
import (
-
"log"
-
"os"
-
"strings"
-
-
"github.com/streadway/amqp"
-
)
-
-
func main() {
-
failOnError := func (err error, msg string) {
-
if err != nil {
-
log.Fatalf("%s: %s", msg, err)
-
}
-
}
-
-
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
-
failOnError(err, "Failed to connect to RabbitMQ")
-
defer conn.Close()
-
-
ch, err := conn.Channel()
-
failOnError(err, "Failed to open a channel")
-
defer ch.Close()
-
-
body := bodyFrom(os.Args)
-
// 将消息发送到延时队列上
-
err = ch.Publish(
-
"", // exchange 这里为空则不选择 exchange
-
"test_delay", // routing key
-
false, // mandatory
-
false, // immediate
-
amqp.Publishing{
-
ContentType: "text/plain",
-
Body: []byte(body),
-
Expiration: "10000", // 设置五秒的过期时间
-
})
-
failOnError(err, "Failed to publish a message")
-
-
log.Printf(" [x] Sent %s", body)
-
}
-
-
func bodyFrom(args []string) string {
-
var s string
-
if (len(args) < 2) || os.Args[1] == "" {
-
s = "hello3"
-
} else {
-
s = strings.Join(args[1:], " ")
-
}
-
return s
-
}
接收者
-
package main
-
-
import (
-
"log"
-
-
"github.com/streadway/amqp"
-
)
-
-
func main() {
-
-
failOnError := func (err error, msg string) {
-
if err != nil {
-
log.Fatalf("%s: %s", msg, err)
-
}
-
}
-
-
// 建立链接
-
conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
-
failOnError(err, "Failed to connect to RabbitMQ")
-
defer conn.Close()
-
-
ch, err := conn.Channel()
-
failOnError(err, "Failed to open a channel")
-
defer ch.Close()
-
-
// 声明一个主要使用的 exchange
-
err = ch.ExchangeDeclare(
-
"logs", // name
-
"fanout", // type
-
true, // durable
-
false, // auto-deleted
-
false, // internal
-
false, // no-wait
-
nil, // arguments
-
)
-
failOnError(err, "Failed to declare an exchange")
-
-
// 声明一个常规的队列, 其实这个也没必要声明,因为 exchange 会默认绑定一个队列
-
q, err := ch.QueueDeclare(
-
"test_logs", // name
-
false, // durable
-
false, // delete when unused
-
true, // exclusive
-
false, // no-wait
-
nil, // arguments
-
)
-
failOnError(err, "Failed to declare a queue")
-
-
/**
-
* 注意,这里是重点!!!!!
-
* 声明一个延时队列, ß我们的延时消息就是要发送到这里
-
*/
-
_, errDelay := ch.QueueDeclare(
-
"test_delay", // name
-
false, // durable
-
false, // delete when unused
-
true, // exclusive
-
false, // no-wait
-
amqp.Table{
-
// 当消息过期时把消息发送到 logs 这个 exchange
-
"x-dead-letter-exchange":"logs",
-
}, // arguments
-
)
-
failOnError(errDelay, "Failed to declare a delay_queue")
-
-
err = ch.QueueBind(
-
q.Name, // queue name, 这里指的是 test_logs
-
"", // routing key
-
"logs", // exchange
-
false,
-
nil)
-
failOnError(err, "Failed to bind a queue")
-
-
// 这里监听的是 test_logs
-
msgs, err := ch.Consume(
-
q.Name, // queue name, 这里指的是 test_logs
-
"", // consumer
-
true, // auto-ack
-
false, // exclusive
-
false, // no-local
-
false, // no-wait
-
nil, // args
-
)
-
failOnError(err, "Failed to register a consumer")
-
-
forever := make(chan bool)
-
-
go func() {
-
for d := range msgs {
-
log.Printf(" [x] %s", d.Body)
-
}
-
}()
-
-
log.Printf(" [*] Waiting for logs. To exit press CTRL C")
-
<-forever
-
}
参考:golang 使用 rabbitmq 延迟队列-腾讯云开发者社区-腾讯云
这篇好文章是转载于:学新通技术网
- 版权申明: 本站部分内容来自互联网,仅供学习及演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,请提供相关证据及您的身份证明,我们将在收到邮件后48小时内删除。
- 本站站名: 学新通技术网
- 本文地址: /boutique/detail/tanhggkbfh
系列文章
更多
同类精品
更多
-
photoshop保存的图片太大微信发不了怎么办
PHP中文网 06-15 -
《学习通》视频自动暂停处理方法
HelloWorld317 07-05 -
word里面弄一个表格后上面的标题会跑到下面怎么办
PHP中文网 06-20 -
Android 11 保存文件到外部存储,并分享文件
Luke 10-12 -
photoshop扩展功能面板显示灰色怎么办
PHP中文网 06-14 -
微信公众号没有声音提示怎么办
PHP中文网 03-31 -
excel下划线不显示怎么办
PHP中文网 06-23 -
excel打印预览压线压字怎么办
PHP中文网 06-22 -
TikTok加速器哪个好免费的TK加速器推荐
TK小达人 10-01 -
怎样阻止微信小程序自动打开
PHP中文网 06-13