Keen的博客

记录所思、所想、所遇

欢迎来到我的个人站~


RabbitMQ原理

1. 安装、配置及常规命令

1.1. Centos下的安装方法

# 有两种方式可选:

# 方式1:直接使用yum现有的库:
yum install erlang
yum install rabbitmq

# 方式2:官网使用最新的版本安装:
# 安装erlang的repo
curl -s https://packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh | sudo bash
# 安装rabbitmq的repo
curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.rpm.sh | sudo bash
# 安装
yum install erlang
yum install rabbitmq

1.2. 基础配置

# 允许management,启动web服务,可以从15672端口看到rabbitmq的队列数据
rabbitmq-plugins enable rabbitmq_management

# 设置用户名和密码
rabbitmqctl add_user keen-rabbit keen123
rabbitmqctl set_user_tags keen-rabbit administrator
rabbitmqctl set_permissions -p / keen-rabbit  ".*" ".*" ".*"

1.3. 常规命令

# 后台启动rabbitmq的几种方式
方式1:service rabbitmq start   / service rabbitmq restart
方式2:systemctl start rabbitmq / systemctl restart rabbitmq
方式3:rabbitmq-server --detached

# 停止和启动rabbitmq的结点
rabbitmqctl stop_app
rabbitmqctl start_app

# 重置rabbitmq(比如删除队列、清除用户密码。注意,这个操作会清除用户密码,记得运行用户名密码重置的3条命令)。运行前,需要先停止节点服务,rabbitmqctl stop_app
rabbitmqctl reset

# 查看队列以及交换器
rabbitmqctl list_queues
rabbitmqctl list_exchanges

1.4. Docker下rabbitmq安装

参考《Docker笔记》

2. 两种模型

rabbitmq的消息模型的核心是:消息的投递,实际上,不是投递给queue,而是投递给exchange,再由exchange决定把消息投递到哪个queue。
而exchange如何决定把消息投递到哪个queue呢?是根据消息的参数决定,以及根据bind的queue决定。
exchange只能把消息投递给bind到该exchange上的queue;对这些已bind的队列,再通过消息的参数route key来决定投递到哪个queue。如果route key为“”,则表示广播消息到所有bind的queue。

以上,就是rabbitmq的各种模型的根本原理。

2.1. 工作队列模型

一个生产者,多个消费者竞争消费队列内的消息(队列内的同一个消息,不会被多个消费者拿到)

jpg

2.1.1. producer

package main

import (
	"log"

	"github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	// 与rabbitmq-server建立连接
	conn, err := amqp.Dial("amqp://keen-rabbit:keen123@192.168.0.100:5672/")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	// channel是一条连接的任务抽象层,后续所有的操作,都是基于此连接的api的操作
	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()

	// 定义一个队列,用于发送消息。注意,只有队列不存在的时候,才会创建队列。如果队列存在,则获取该队列
	q, err := ch.QueueDeclare(
		"hello", // name
		false,   // durable
		false,   // delete when unused
		false,   // exclusive
		false,   // no-wait
		nil,     // arguments
	)
	failOnError(err, "Failed to declare a queue")

	// 发布消息
	body := "Hello World!"
	err = ch.Publish(
		"",     // exchange
		q.Name, // routing key
		false,  // mandatory
		false,  // immediate
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(body),
		})
	failOnError(err, "Failed to publish a message")
}

2.1.2. consumer

package main

import (
	"log"

	"github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	// 与rabbitmq-server建立连接
	conn, err := amqp.Dial("amqp://keen-rabbit:keen123@192.168.0.100:5672/")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	// 声明连接通道
	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()

	// 查找hello队列
	q, err := ch.QueueDeclare(
		"hello", // name
		false,   // durable
		false,   // delete when usused
		false,   // exclusive
		false,   // no-wait
		nil,     // arguments
	)
	failOnError(err, "Failed to declare a queue")

	// 告诉rabbitmq-server给我们发消息,声明一下我们是消费者,向rabbitmq注册一下。不注册的话,rabbitmq是不会返回数据的。
	// 声明完了,会返回一个channel通道msgs,相当于建立了管道连接,后续会通过此管道给我们发数据
	msgs, err := ch.Consume(
		q.Name, // queue
		"",     // consumer
		true,   // auto-ack
		false,  // exclusive
		false,  // no-local
		false,  // no-wait
		nil,    // args
	)
	failOnError(err, "Failed to register a consumer")

	// 创建一个无缓存的channel,用来不死锁的卡住当前主线程的goroutine,防止主线程退出
	// 任何goroutine,都可以向此channel发送数据,用以退出当前主线程
	forever := make(chan bool)

	go func() {
		// 开始循环读取管道数据
		for d := range msgs {
			log.Printf("Received a message: %s", d.Body)
		}
	}()

	log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
	// 无缓存,所以读会卡死,直到有写入数据
	<-forever
}

consumer可以有多个实例,这样同时消费队列中的数据,并互斥进行

2.1.3. 说明及注意点

(1)消费者挂掉了,如何确保消息不丢失

// 以上示例,设置了auto-ack为true。这样,当rabbitmq分发一条消息到一个消费者,消费者就会立刻自动回复说表示它已收到该消息。这样rabbitmq就会立刻把此消息标记为已删除。如果此时杀掉该消费者,这条消息也就丢弃了。而且,之前分发给这个消费者的还未处理的消息,也都丢弃了
// 如果希望消息不丢失,也即一个消费者挂了,这条消息能被重新投递给其他活着的消费者,auto-ack就需要设置为false。这样,如果消费者不在了(channel被关闭了、连接关闭了、TCP连接丢失了等都称为不在)而导致未发送ack,那么rabbitmq就会认为这个消息还没有被消费过,就会重新投递到队列中去。这样,如果其他消费者在线,这条消息就会优先被投递给这个消费者。
msgs, err := ch.Consume(
  q.Name, // queue
  "",     // consumer
  false,  // auto-ack <=============== 看这里
  false,  // exclusive
  false,  // no-local
  false,  // no-wait
  nil,    // args
)
failOnError(err, "Failed to register a consumer")

forever := make(chan bool)

go func() {
  for d := range msgs {
    log.Printf("Received a message: %s", d.Body)
    dot_count := bytes.Count(d.Body, []byte("."))
    t := time.Duration(dot_count)
    time.Sleep(t * time.Second)
    log.Printf("Done")
    d.Ack(false) // <=============== 看这里
  }
}()

log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
<-forever

(2)rabbitmq-server挂掉了,如何确保消息不丢失

// 为了在rabbitmq-server挂掉的时候,消息也不要丢弃掉,需要把queue和message都声明为durable(可持久化的),也就是队列不要丢,消息也不要丢。
// 需要注意的是,由于之前已经创建过该队列的话,这里修改属性,需要把队列删除掉。

// 队列持久化(producer和consumer都需要修改)
q, err := ch.QueueDeclare(
  "hello",      // name
  true,         // durable <=============== 看这里
  false,        // delete when unused
  false,        // exclusive
  false,        // no-wait
  nil,          // arguments
)
failOnError(err, "Failed to declare a queue")

// 消息持久化(rabbitmq并不会针对每一条消息,都写磁盘,而是写入到cache,所以这里不是严格的持久化)
err = ch.Publish(
  "",           // exchange
  q.Name,       // routing key
  false,        // mandatory
  false,
  amqp.Publishing {
    DeliveryMode: amqp.Persistent,  // <=============== 看这里
    ContentType:  "text/plain",
    Body:         []byte(body),
})

(3)如何确保消息被公平投递

// 默认情况下,消息的调度不会去检查unack,也就是说,这里可能存在大量unack的consumer不断获取到新消息。这里还有一点没有说明的是,我们分布式的机器上,每个consumer都有一个自己的队列,rabbitmq-server有一个总队列,每次consumer队列会去向总队列拿数据,也就是会预先分配好任务给各个consumer
// 为了阻止这种情况发生,需要将consumer预取的消息数,设置为1个
err = ch.Qos(
  1,     // prefetch count
  0,     // prefetch size
  false, // global
)
failOnError(err, "Failed to set QoS")

2.2. 发布订阅模型

一个生产者广播消息,多个消费者消费同样的消息

jpg

2.2.1. publisher

package main

import (
	"log"

	"github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	// 与rabbitmq-server建立连接
	conn, err := amqp.Dial("amqp://keen-rabbit:keen123@192.168.0.100:5672/")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	// channel是一条连接的任务抽象层,后续所有的操作,都是基于此连接的api的操作
	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()

	// 定义一个交换机,类型是fanout(广播类型),当publish的时候,可以广播消息到它所知道的队列(已bind队列)
	err = ch.ExchangeDeclare(
		"logs",   // name
		"fanout", // type
		true,     // durable	设置不设置无所谓,exchange可以不需要持久化
		false,    // auto-deleted
		false,    // internal
		false,    // no-wait
		nil,      // arguments
	)
	failOnError(err, "Failed to declare an exchange")

	// 发布消息
	body := "Hello World!"
	err = ch.Publish(
		"logs", // exchange
		"",     // routing key
		false,  // mandatory
		false,  // immediate
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(body),
		})
	failOnError(err, "Failed to publish a message")
}

2.2.2. subscriber

package main

import (
	"log"

	"github.com/streadway/amqp"
)

func failOnError(err error, msg string) {
	if err != nil {
		log.Fatalf("%s: %s", msg, err)
	}
}

func main() {
	// 与rabbitmq-server建立连接
	conn, err := amqp.Dial("amqp://keen-rabbit:keen123@192.168.0.100:5672/")
	failOnError(err, "Failed to connect to RabbitMQ")
	defer conn.Close()

	// 声明连接通道
	ch, err := conn.Channel()
	failOnError(err, "Failed to open a channel")
	defer ch.Close()

	err = ch.ExchangeDeclare(
		"logs",   // name
		"fanout", // type
		true,     // durable
		false,    // auto-deleted
		false,    // internal
		false,    // no-wait
		nil,      // arguments
	)

	// 定义一个随机名队列
	q, err := ch.QueueDeclare(
		"",    // name,随机队列名
		false, // durable
		false, // delete when usused
		true,  // exclusive,独占模式,连接断开的时候,队列自动清除
		false, // no-wait
		nil,   // arguments
	)
	failOnError(err, "Failed to declare a queue")

	// 将随机名队列,绑定到logs exchange上去
	err = ch.QueueBind(
		q.Name, // queue name
		"",     // routing key
		"logs", // exchange
		false,
		nil,
	)
	failOnError(err, "Failed to bind a queue")

	// 告诉rabbitmq-server给我们发消息,声明一下我们是消费者,向rabbitmq注册一下。不注册的话,rabbitmq是不会返回数据的。
	// 声明完了,会返回一个channel通道msgs,相当于建立了管道连接,后续会通过此管道给我们发数据
	msgs, err := ch.Consume(
		q.Name, // queue
		"",     // consumer
		true,   // auto-ack
		false,  // exclusive
		false,  // no-local
		false,  // no-wait
		nil,    // args
	)
	failOnError(err, "Failed to register a consumer")

	// 创建一个无缓存的channel,用来不死锁的卡住当前主线程的goroutine,防止主线程退出
	// 任何goroutine,都可以向此channel发送数据,用以退出当前主线程
	forever := make(chan bool)

	go func() {
		// 开始循环读取管道数据
		for d := range msgs {
			log.Printf("Received a message: %s", d.Body)
		}
	}()

	log.Printf(" [*] Waiting for messages. To exit press CTRL+C")
	// 无缓存,所以读会卡死,直到有写入数据
	<-forever
}

2.2.3. 注意事项

// 在worker模型中,我们使用的exchange是默认的(publish的exchange参数为“”),而route key都是特定队列。因此,这样就可以确保消息发往的地方,都是某个rabbitmq-server上的特定queue,大家也都往这个queue上去取消息,也就实现了消息的竞争消费

// 而在publish/subscribe模型中,并未指定route key,因此是把消息都投递到bind到exchange的queue上去。因此,要实现广播模型,每个consumer的队列名必须不同,所以这里通常使用随机队列名。
// 大多数场景中,广播消息都是实时有效的,因此用随机队列名的好处,还有一个,那就是每次程序启动,队列都是空的。从这点而言,随机队列名的,不适合durable可持久化消息
// 如果不想每次队列起来,都是空队列,需要保留原始未消费的消息,还必须针对队列分别取不同名字。

// !!!!!!这里就需要一个先后顺序关系。如果消费者先起来,bind,然后producer再发送消息,这样就可以保证不丢消息。而如果producer先起来,发现没有任何bind的queue,则发送消息会丢失!!!!!!

3. 各种exchange

3.1. Fanout Exchange

前面设置了fanout广播类型的exchange,我们看到,在bind的时候,传递的route key为空。这是因为,fanout类型,会忽略route key字段

// 声明的exchange类型
err = ch.ExchangeDeclare(
		"logs",   // name
		"fanout", // type
		true,     // durable
		false,    // auto-deleted
		false,    // internal
		false,    // no-wait
		nil,      // arguments
    )
    
// 队列绑定的时候,route key的设置
err = ch.QueueBind(
		q.Name, // queue name
		"",     // routing key
		"logs", // exchange
		false,
		nil,
    )

// 消息发送的时候,route key的设置
err = ch.Publish(
		"logs", // exchange
		"",     // routing key
		false,  // mandatory
		false,  // immediate
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(body),
		})

3.2. Direct Exchange

使用Direct类型的Exchange,可以做到精细化的消息分发

// producer:声明的exchange类型
err = ch.ExchangeDeclare(
		"logs",   // name
		"direct", // type
		true,     // durable
		false,    // auto-deleted
		false,    // internal
		false,    // no-wait
		nil,      // arguments
    )
    
// consumer1:队列绑定的时候,route key的设置
err = ch.QueueBind(
		q.Name, // queue name
		"black",     // routing key
		"logs", // exchange
		false,
		nil,
    )

// consumer2:队列绑定的时候,route key的设置
err = ch.QueueBind(
		q.Name, // queue name
		"white",     // routing key
		"logs", // exchange
		false,
		nil,
    )

// producer:消息发送的时候,route key的设置
err = ch.Publish(
		"logs", // exchange
		"black",     // routing key
		false,  // mandatory
		false,  // immediate
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(body),
		})

3.3. Topic Exchange

direct的exchange设置的route key是某个固定的。如果想要多个不同key都进入到同一个队列,需要使用topic exchange。
但是,route key并不是无上限的设置,最多允许的route key的字节数是256
如果多个都绑定同一个关键字,则可以把消息发送到多个队列上去

// producer:声明的exchange类型
err = ch.ExchangeDeclare(
		"logs-topic",   // name
		"topic", // type
		true,     // durable
		false,    // auto-deleted
		false,    // internal
		false,    // no-wait
		nil,      // arguments
    )
    
// consumer1:队列绑定的时候,route key的设置
err = ch.QueueBind(
		q.Name, // queue name
		"lazy.*",     // routing key
		"logs-topic", // exchange
		false,
		nil,
    )

// consumer2:队列绑定的时候,route key的设置
err = ch.QueueBind(
		q.Name, // queue name
		"*.elephan",     // routing key
		"logs-topic", // exchange
		false,
		nil,
    )

// producer:消息发送的时候,route key的设置
err = ch.Publish(
		"logs-topic", // exchange
		"lazy.fox",     // routing key  发往consumer1
		false,  // mandatory
		false,  // immediate
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(body),
        })
        
// producer:消息发送的时候,route key的设置
err = ch.Publish(
		"logs-topic", // exchange
		"orange.elephan",     // routing key  发往consumer2
		false,  // mandatory
		false,  // immediate
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(body),
        })

// producer:消息发送的时候,route key的设置
err = ch.Publish(
		"logs-topic", // exchange
		"*.elephan",     // routing key
		false,  // mandatory
		false,  // immediate
		amqp.Publishing{
			ContentType: "text/plain",
			Body:        []byte(body),
		})

需要注意的是,使用*和#
*表示匹配字符,也即如果producer注册的route key为lazy.orange,那么可用的bind的route key也就lazy.*或者*.orange,特别要注意顺序问题
而#表示匹配所有key,也就是如果producer注册的route key为lazy.orange,bind为#的话,直接匹配所有的key,或者lazy.#,匹配lazy.开头的所有key

3.4. Headers exchange

这里是用到参数,作为匹配的key,而非route key

// producer:声明的exchange类型
err = ch.ExchangeDeclare(
		"logs-headers",   // name
		"headers", // type
		true,     // durable
		false,    // auto-deleted
		false,    // internal
		false,    // no-wait
		nil,      // arguments
    )
    
// consumer1:队列绑定的时候,header匹配项的设置
table := amqp.Table{}
table["user"] = "keen"
table["name"] = "xxxx"
tabel["x-match"] = "any"
err = ch.QueueBind(
		q.Name, // queue name
		"",     // routing key
		"logs-headers", // exchange
		false,
		tabel,
    )

// consumer2:队列绑定的时候,header的设置
table := amqp.Table{}
table["user"] = "keen"
table["psw"] = "123456"
tabel["x-match"] = "all"
err = ch.QueueBind(
		q.Name, // queue name
		"",     // routing key
		"logs-headers", // exchange
		false,
		nil,
    )

// producer:消息发送的时候,header的设置
header := amqp.Table{}
header["user"] = "keen"
header["psw"] = "123456"
err = ch.Publish(
		"logs-headers", // exchange
		"",     // routing key
		false,  // mandatory
		false,  // immediate
		amqp.Publishing{
			ContentType: "text/plain",
			Headers:	 header,
			Body:        []byte(body),
		})

打赏一个呗

取消

感谢您的支持,我会继续努力的!

扫码支持
扫码支持
扫码打赏,你说多少就多少