1. 安装、配置及常规命令
1.1. Centos下的安装方法
# 有两种方式可选:
# 方式1:直接使用yum现有的库:
yum install erlang
yum install rabbitmq
# 方式2:官网使用最新的版本安装:
# 安装erlang的repo
curl -s https://packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh | sudo bash
# 安装rabbitmq的repo
curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.rpm.sh | sudo bash
# 安装
yum install erlang
yum install rabbitmq
1.2. 基础配置
# 允许management,启动web服务,可以从15672端口看到rabbitmq的队列数据
rabbitmq-plugins enable rabbitmq_management
# 设置用户名和密码
rabbitmqctl add_user keen-rabbit keen123
rabbitmqctl set_user_tags keen-rabbit administrator
rabbitmqctl set_permissions -p / keen-rabbit ".*" ".*" ".*"
1.3. 常规命令
# 后台启动rabbitmq的几种方式
方式1:service rabbitmq start / service rabbitmq restart
方式2:systemctl start rabbitmq / systemctl restart rabbitmq
方式3:rabbitmq-server --detached
# 停止和启动rabbitmq的结点
rabbitmqctl stop_app
rabbitmqctl start_app
# 重置rabbitmq(比如删除队列、清除用户密码。注意,这个操作会清除用户密码,记得运行用户名密码重置的3条命令)。运行前,需要先停止节点服务,rabbitmqctl stop_app
rabbitmqctl reset
# 查看队列以及交换器
rabbitmqctl list_queues
rabbitmqctl list_exchanges
1.4. Docker下rabbitmq安装
参考《Docker笔记》
2. 两种模型
rabbitmq的消息模型的核心是:消息的投递,实际上,不是投递给queue,而是投递给exchange,再由exchange决定把消息投递到哪个queue。
而exchange如何决定把消息投递到哪个queue呢?是根据消息的参数决定,以及根据bind的queue决定。
exchange只能把消息投递给bind到该exchange上的queue;对这些已bind的队列,再通过消息的参数route key来决定投递到哪个queue。如果route key为“”,则表示广播消息到所有bind的queue。
以上,就是rabbitmq的各种模型的根本原理。
2.1. 工作队列模型
一个生产者,多个消费者竞争消费队列内的消息(队列内的同一个消息,不会被多个消费者拿到)
2.1.1. producer
package main
import (
"log"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// 与rabbitmq-server建立连接
conn, err := amqp.Dial("amqp://keen-rabbit:keen123@192.168.0.100:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
// channel是一条连接的任务抽象层,后续所有的操作,都是基于此连接的api的操作
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// 定义一个队列,用于发送消息。注意,只有队列不存在的时候,才会创建队列。如果队列存在,则获取该队列
q, err := ch.QueueDeclare(
"hello", // name
false, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
// 发布消息
body := "Hello World!"
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
}
2.1.2. consumer
package main
import (
"log"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// 与rabbitmq-server建立连接
conn, err := amqp.Dial("amqp://keen-rabbit:keen123@192.168.0.100:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
// 声明连接通道
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// 查找hello队列
q, err := ch.QueueDeclare(
"hello", // name
false, // durable
false, // delete when usused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
// 告诉rabbitmq-server给我们发消息,声明一下我们是消费者,向rabbitmq注册一下。不注册的话,rabbitmq是不会返回数据的。
// 声明完了,会返回一个channel通道msgs,相当于建立了管道连接,后续会通过此管道给我们发数据
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
// 创建一个无缓存的channel,用来不死锁的卡住当前主线程的goroutine,防止主线程退出
// 任何goroutine,都可以向此channel发送数据,用以退出当前主线程
forever := make(chan bool)
go func() {
// 开始循环读取管道数据
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
// 无缓存,所以读会卡死,直到有写入数据
<-forever
}
consumer可以有多个实例,这样同时消费队列中的数据,并互斥进行
2.1.3. 说明及注意点
(1)消费者挂掉了,如何确保消息不丢失
// 以上示例,设置了auto-ack为true。这样,当rabbitmq分发一条消息到一个消费者,消费者就会立刻自动回复说表示它已收到该消息。这样rabbitmq就会立刻把此消息标记为已删除。如果此时杀掉该消费者,这条消息也就丢弃了。而且,之前分发给这个消费者的还未处理的消息,也都丢弃了
// 如果希望消息不丢失,也即一个消费者挂了,这条消息能被重新投递给其他活着的消费者,auto-ack就需要设置为false。这样,如果消费者不在了(channel被关闭了、连接关闭了、TCP连接丢失了等都称为不在)而导致未发送ack,那么rabbitmq就会认为这个消息还没有被消费过,就会重新投递到队列中去。这样,如果其他消费者在线,这条消息就会优先被投递给这个消费者。
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
false, // auto-ack <=============== 看这里
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
forever := make(chan bool)
go func() {
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
dot_count := bytes.Count(d.Body, []byte("."))
t := time.Duration(dot_count)
time.Sleep(t * time.Second)
log.Printf("Done")
d.Ack(false) // <=============== 看这里
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever
(2)rabbitmq-server挂掉了,如何确保消息不丢失
// 为了在rabbitmq-server挂掉的时候,消息也不要丢弃掉,需要把queue和message都声明为durable(可持久化的),也就是队列不要丢,消息也不要丢。
// 需要注意的是,由于之前已经创建过该队列的话,这里修改属性,需要把队列删除掉。
// 队列持久化(producer和consumer都需要修改)
q, err := ch.QueueDeclare(
"hello", // name
true, // durable <=============== 看这里
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
// 消息持久化(rabbitmq并不会针对每一条消息,都写磁盘,而是写入到cache,所以这里不是严格的持久化)
err = ch.Publish(
"", // exchange
q.Name, // routing key
false, // mandatory
false,
amqp.Publishing {
DeliveryMode: amqp.Persistent, // <=============== 看这里
ContentType: "text/plain",
Body: []byte(body),
})
(3)如何确保消息被公平投递
// 默认情况下,消息的调度不会去检查unack,也就是说,这里可能存在大量unack的consumer不断获取到新消息。这里还有一点没有说明的是,我们分布式的机器上,每个consumer都有一个自己的队列,rabbitmq-server有一个总队列,每次consumer队列会去向总队列拿数据,也就是会预先分配好任务给各个consumer
// 为了阻止这种情况发生,需要将consumer预取的消息数,设置为1个
err = ch.Qos(
1, // prefetch count
0, // prefetch size
false, // global
)
failOnError(err, "Failed to set QoS")
2.2. 发布订阅模型
一个生产者广播消息,多个消费者消费同样的消息
2.2.1. publisher
package main
import (
"log"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// 与rabbitmq-server建立连接
conn, err := amqp.Dial("amqp://keen-rabbit:keen123@192.168.0.100:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
// channel是一条连接的任务抽象层,后续所有的操作,都是基于此连接的api的操作
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
// 定义一个交换机,类型是fanout(广播类型),当publish的时候,可以广播消息到它所知道的队列(已bind队列)
err = ch.ExchangeDeclare(
"logs", // name
"fanout", // type
true, // durable 设置不设置无所谓,exchange可以不需要持久化
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare an exchange")
// 发布消息
body := "Hello World!"
err = ch.Publish(
"logs", // exchange
"", // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
failOnError(err, "Failed to publish a message")
}
2.2.2. subscriber
package main
import (
"log"
"github.com/streadway/amqp"
)
func failOnError(err error, msg string) {
if err != nil {
log.Fatalf("%s: %s", msg, err)
}
}
func main() {
// 与rabbitmq-server建立连接
conn, err := amqp.Dial("amqp://keen-rabbit:keen123@192.168.0.100:5672/")
failOnError(err, "Failed to connect to RabbitMQ")
defer conn.Close()
// 声明连接通道
ch, err := conn.Channel()
failOnError(err, "Failed to open a channel")
defer ch.Close()
err = ch.ExchangeDeclare(
"logs", // name
"fanout", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
// 定义一个随机名队列
q, err := ch.QueueDeclare(
"", // name,随机队列名
false, // durable
false, // delete when usused
true, // exclusive,独占模式,连接断开的时候,队列自动清除
false, // no-wait
nil, // arguments
)
failOnError(err, "Failed to declare a queue")
// 将随机名队列,绑定到logs exchange上去
err = ch.QueueBind(
q.Name, // queue name
"", // routing key
"logs", // exchange
false,
nil,
)
failOnError(err, "Failed to bind a queue")
// 告诉rabbitmq-server给我们发消息,声明一下我们是消费者,向rabbitmq注册一下。不注册的话,rabbitmq是不会返回数据的。
// 声明完了,会返回一个channel通道msgs,相当于建立了管道连接,后续会通过此管道给我们发数据
msgs, err := ch.Consume(
q.Name, // queue
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
failOnError(err, "Failed to register a consumer")
// 创建一个无缓存的channel,用来不死锁的卡住当前主线程的goroutine,防止主线程退出
// 任何goroutine,都可以向此channel发送数据,用以退出当前主线程
forever := make(chan bool)
go func() {
// 开始循环读取管道数据
for d := range msgs {
log.Printf("Received a message: %s", d.Body)
}
}()
log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
// 无缓存,所以读会卡死,直到有写入数据
<-forever
}
2.2.3. 注意事项
// 在worker模型中,我们使用的exchange是默认的(publish的exchange参数为“”),而route key都是特定队列。因此,这样就可以确保消息发往的地方,都是某个rabbitmq-server上的特定queue,大家也都往这个queue上去取消息,也就实现了消息的竞争消费
// 而在publish/subscribe模型中,并未指定route key,因此是把消息都投递到bind到exchange的queue上去。因此,要实现广播模型,每个consumer的队列名必须不同,所以这里通常使用随机队列名。
// 大多数场景中,广播消息都是实时有效的,因此用随机队列名的好处,还有一个,那就是每次程序启动,队列都是空的。从这点而言,随机队列名的,不适合durable可持久化消息
// 如果不想每次队列起来,都是空队列,需要保留原始未消费的消息,还必须针对队列分别取不同名字。
// !!!!!!这里就需要一个先后顺序关系。如果消费者先起来,bind,然后producer再发送消息,这样就可以保证不丢消息。而如果producer先起来,发现没有任何bind的queue,则发送消息会丢失!!!!!!
3. 各种exchange
3.1. Fanout Exchange
前面设置了fanout广播类型的exchange,我们看到,在bind的时候,传递的route key为空。这是因为,fanout类型,会忽略route key字段
// 声明的exchange类型
err = ch.ExchangeDeclare(
"logs", // name
"fanout", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
// 队列绑定的时候,route key的设置
err = ch.QueueBind(
q.Name, // queue name
"", // routing key
"logs", // exchange
false,
nil,
)
// 消息发送的时候,route key的设置
err = ch.Publish(
"logs", // exchange
"", // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
3.2. Direct Exchange
使用Direct类型的Exchange,可以做到精细化的消息分发
// producer:声明的exchange类型
err = ch.ExchangeDeclare(
"logs", // name
"direct", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
// consumer1:队列绑定的时候,route key的设置
err = ch.QueueBind(
q.Name, // queue name
"black", // routing key
"logs", // exchange
false,
nil,
)
// consumer2:队列绑定的时候,route key的设置
err = ch.QueueBind(
q.Name, // queue name
"white", // routing key
"logs", // exchange
false,
nil,
)
// producer:消息发送的时候,route key的设置
err = ch.Publish(
"logs", // exchange
"black", // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
3.3. Topic Exchange
direct的exchange设置的route key是某个固定的。如果想要多个不同key都进入到同一个队列,需要使用topic exchange。
但是,route key并不是无上限的设置,最多允许的route key的字节数是256
如果多个都绑定同一个关键字,则可以把消息发送到多个队列上去
// producer:声明的exchange类型
err = ch.ExchangeDeclare(
"logs-topic", // name
"topic", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
// consumer1:队列绑定的时候,route key的设置
err = ch.QueueBind(
q.Name, // queue name
"lazy.*", // routing key
"logs-topic", // exchange
false,
nil,
)
// consumer2:队列绑定的时候,route key的设置
err = ch.QueueBind(
q.Name, // queue name
"*.elephan", // routing key
"logs-topic", // exchange
false,
nil,
)
// producer:消息发送的时候,route key的设置
err = ch.Publish(
"logs-topic", // exchange
"lazy.fox", // routing key 发往consumer1
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
// producer:消息发送的时候,route key的设置
err = ch.Publish(
"logs-topic", // exchange
"orange.elephan", // routing key 发往consumer2
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
// producer:消息发送的时候,route key的设置
err = ch.Publish(
"logs-topic", // exchange
"*.elephan", // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Body: []byte(body),
})
需要注意的是,使用*和#
*表示匹配字符,也即如果producer注册的route key为lazy.orange,那么可用的bind的route key也就lazy.*或者*.orange,特别要注意顺序问题
而#表示匹配所有key,也就是如果producer注册的route key为lazy.orange,bind为#的话,直接匹配所有的key,或者lazy.#,匹配lazy.开头的所有key
3.4. Headers exchange
这里是用到参数,作为匹配的key,而非route key
// producer:声明的exchange类型
err = ch.ExchangeDeclare(
"logs-headers", // name
"headers", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
// consumer1:队列绑定的时候,header匹配项的设置
table := amqp.Table{}
table["user"] = "keen"
table["name"] = "xxxx"
tabel["x-match"] = "any"
err = ch.QueueBind(
q.Name, // queue name
"", // routing key
"logs-headers", // exchange
false,
tabel,
)
// consumer2:队列绑定的时候,header的设置
table := amqp.Table{}
table["user"] = "keen"
table["psw"] = "123456"
tabel["x-match"] = "all"
err = ch.QueueBind(
q.Name, // queue name
"", // routing key
"logs-headers", // exchange
false,
nil,
)
// producer:消息发送的时候,header的设置
header := amqp.Table{}
header["user"] = "keen"
header["psw"] = "123456"
err = ch.Publish(
"logs-headers", // exchange
"", // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "text/plain",
Headers: header,
Body: []byte(body),
})