RabbitMQ的核心组成部分

image-20220224114922858

**核心概念:

Server:又称Broker ,接受客户端的连接,实现AMQP实体服务。 安装rabbitmq-server

Connection:连接,应用程序与Broker的网络连接 TCP/IP/ 三次握手和四次挥手

Channel:网络信道,几乎所有的操作都在Channel中进行,Channel是进行消息读写的通道,客户端可以建立对各Channel,每个Channel代表一个会话任务。

Message :消息:服务与应用程序之间传送的数据,由Properties和body组成,Properties可是对消息进行修饰,比如消息的优先级,延迟等高级特性,Body则就是消息体的内容。

Virtual Host 虚拟地址,用于进行逻辑隔离,最上层的消息路由,一个虚拟主机理由可以有若干个Exhange和Queueu,同一个虚拟主机里面不能有相同名字的Exchange

Exchange:交换机,接受消息,根据路由键发送消息到绑定的队列。(==不具备消息存储的能力==)

Bindings:Exchange和Queue之间的虚拟连接,binding中可以保护多个routing key.

Routing key:是一个路由规则,虚拟机可以用它来确定如何路由一个特定消息。

Queue:队列:也成为Message Queue,消息队列,保存消息并将它们转发给消费者。

安装

# docker 安装
docker run -d --rm --name rabbitmq -p 5672:5672 -p 15672:15672 -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=ybd0612 rabbitmq:3.9-management

高可用

RabbitMQ 是基于主从(非分布式)做高可用性的。

RabbitMQ 有三种模式:单机模式、普通集群模式、镜像集群模式。

单机模式

单机开发测试使用

普通集群模式

没有做到分布式:要么消费者每次随机连接一个实例拉取数据,有数据拉取开销,要么固定连接实例消费数据,导致单实例性能瓶颈。(就是个普通集群)

该方案主要是提高吞吐量的,就是说让集群中多个节点来服务某个 queue 的读写操作。

缺点:1. 集群内部可能产生大量的数据传输

​ 2. 可用性无保障,queue所在节点宕机,数据就丢了

镜像集群模式

无论是元数据还是 queue 里的消息都会存在于多个实例上,每个 RabbitMQ 节点都有这个 queue 的一个完整镜像,包含 queue 的全部数据。每次写消息到 queue 的时候,都会自动把消息同步到多个实例的 queue 上。

好处在于任何一个机器宕机了,其他机器都有包含这个queue的完整数据。坏处在于性能开销大,每次都要同步到所有机器上。第二,不是分布式的,没有扩展性。如果某个queue负载很重,加机器,新增的机器也包含这个queue的所有数据,没有办法线性扩展queue。

消息可靠性

生产者

可以选择用 RabbitMQ 提供的事务功能,就是生产者发送数据之前开启 RabbitMQ 事务 channel.txSelect() ,然后发送消息,如果消息没有成功被 RabbitMQ 接收到,那么生产者会收到异常报错,此时就可以回滚事务 channel.txRollback() ,然后重试发送消息;如果收到了消息,那么可以提交事务 channel.txCommit()

RabbitMQ 事务机制是同步的,吞吐量会下来,因为太耗性能

一般来说,如果要确保说写 RabbitMQ 的消息别丢,可以开启 confirm 模式,在生产者那里设置开启 confirm 模式之后,每次写的消息都会分配一个唯一的 id,然后如果写入了 RabbitMQ 中,RabbitMQ 会给回传一个 ack 消息,告诉你这个消息 ok 了。如果 RabbitMQ 没能处理这个消息,会回调 nack 接口,告诉你这个消息接收失败,可以重试。而且可以结合这个机制自己在内存里维护每个消息 id 的状态,如果超过一定时间还没接收到这个消息的回调,可以重发。

事务机制和 confirm 机制最大的不同在于,事务机制是同步的,提交一个事务之后会阻塞在那儿,但是 confirm 机制是异步的,发送个消息之后就可以发送下一个消息,然后那个消息 RabbitMQ 接收了之后会异步回调接口通知这个消息接收到了。

所以一般在生产者这块避免数据丢失,都是用 confirm 机制的。

已经在 transaction 事务模式的 channel 是不能再设置成 confirm 模式的,即这两种模式是不能共存的。

RabbitMQ

开启 RabbitMQ 的持久化,消息写入之后会持久化到磁盘,哪怕是 RabbitMQ 自己挂了,恢复之后会自动读取之前存储的数据,一般数据不会丢。除非极其罕见的是,RabbitMQ 还没持久化,自己就挂了,可能导致少量数据丢失,但是这个概率较小。

  • 创建 queue 的时候将其设置为持久化。这样就可以保证 RabbitMQ 持久化 queue 的元数据,但是它是不会持久化 queue 里的数据的。
  • 第二个是发送消息的时候将消息的 deliveryMode 设置为 2。就是将消息设置为持久化的,此时 RabbitMQ 就会将消息持久化到磁盘上去。

消费端

为了保证消息从队列中可靠地到达消费者,RabbitMQ 提供了消息确认机制。消费者在声明队列时,可以指定 noAck 参数,当 noAck=false,RabbitMQ 会等待消费者显式发回 ack 信号后,才从内存(和磁盘,如果是持久化消息)中移去消息。否则,一旦消息被消费者消费,RabbitMQ 会在队列中立即删除它。

RabbitMQ的角色分类

1:none:

  • 不能访问management plugin

2:management:查看自己相关节点信息

  • 列出自己可以通过AMQP登入的虚拟机
  • 查看自己的虚拟机节点 virtual hosts的queues,exchanges和bindings信息
  • 查看和关闭自己的channels和connections
  • 查看有关自己的虚拟机节点virtual hosts的统计信息。包括其他用户在这个节点virtual hosts中的活动信息。

3:Policymaker

  • 包含management所有权限
  • 查看和创建和删除自己的virtual hosts所属的policies和parameters信息。

4:Monitoring

  • 包含management所有权限
  • 罗列出所有的virtual hosts,包括不能登录的virtual hosts。
  • 查看其他用户的connections和channels信息
  • 查看节点级别的数据如clustering和memory使用情况
  • 查看所有的virtual hosts的全局统计信息。

5:Administrator

  • 最高权限
  • 可以创建和删除virtual hosts
  • 可以查看,创建和删除users
  • 查看创建permisssions
  • 关闭所有用户的connections

消息模式

简单模式

image-20220224103207787

一个生产者对应一个消费者

生产者将消息发送到“hello”队列。消费者从该队列接收消息。

work模式

image-20220224110636750

一个生产者对应多个消费者,但是只能有一个消费者获得消息

有两种消费模式

  1. 轮询模式的分发:一个消费者一条,按均分配;
  2. 公平分发:根据消费者的消费能力进行公平分发,处理快的处理的多,处理慢的处理的少;按劳分配;

公平分发需要设置应答模式为手动应答 autoAck=false

设置通道qos channel.basicQos(1);

手动应答 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);

发布/订阅模式

image-20220224112205388

一个消费者将消息首先发送到交换器,交换器绑定到多个队列,然后被监听该队列的消费者所接收并消费

X表示交换器,在RabbitMQ中,交换器主要有四种类型:direct、fanout、topic、headers

fanout模式

该类型的交换器会把所有的消息路由到与其绑定的队列中。无论路由键与绑定键是否比配。

direct模式

direct类型则是当路由键和绑定键完全匹配时才会存消息到队列

topic模式

direct增强模糊匹配

  • 路由键与绑定键单词之间用"."分割比如(aaa.bbb.ccc)
  • 绑定键的使用"#"匹配多个单词(可以是0个)比如(aaa.#可以匹配路由键是aaa.bbb.ccc和aaa的)
  • 绑定键的使用"*"匹配一个单词比如(aaa.*匹配路由键是aaa.bbb、aaa.ccc或aaa但是不匹配aaa.bbb.ccc)

headers模式

不处理路由键。而是根据发送的消息内容中的headers属性进行匹配。在绑定Queue与Exchange时指定一组键值对;当消息发送到RabbitMQ时会取到该消息的headers与Exchange绑定时指定的键值对进行匹配;如果完全匹配则消息会路由到该队列,否则不会路由到该队列。headers属性是一个键值对,可以是Hashtable,键值对的值可以是任何类型。而fanout,direct,topic 的路由键都需要要字符串形式的。

匹配规则x-match有下列两种类型:

  • x-match = all :表示所有的键值对都匹配才能接受到消息
  • x-match = any :表示只要有键值对匹配就能接受到消息

整合springboot配置

starter

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置

 rabbitmq:
    addresses: 127.0.0.1:6605,127.0.0.1:6606,127.0.0.1:6705 #指定client连接到的server的地址,多个以逗号分隔(优先取addresses,然后再取host)
#    port:
    ##集群配置 addresses之间用逗号隔开
    # addresses: ip:port,ip:port
    password: admin
    username: 123456
    virtual-host: / # 连接到rabbitMQ的vhost
    requested-heartbeat: #指定心跳超时,单位秒,0为不指定;默认60s
    publisher-confirms: #是否启用 发布确认
    publisher-reurns: # 是否启用发布返回
    connection-timeout: #连接超时,单位毫秒,0表示无穷大,不超时
    cache:
      channel.size: # 缓存中保持的channel数量
      channel.checkout-timeout: # 当缓存数量被设置时,从缓存中获取一个channel的超时时间,单位毫秒;如果为0,则总是创建一个新channel
      connection.size: # 缓存的连接数,只有是CONNECTION模式时生效
      connection.mode: # 连接工厂缓存模式:CHANNEL 和 CONNECTION
    listener:
      simple.auto-startup: # 是否启动时自动启动容器
      simple.acknowledge-mode: # 表示消息确认方式,其有三种配置方式,分别是none、manual和auto;默认auto
      simple.concurrency: # 最小的消费者数量
      simple.max-concurrency: # 最大的消费者数量
      simple.prefetch: # 指定一个请求能处理多少个消息,如果有事务的话,必须大于等于transaction数量.
      simple.transaction-size: # 指定一个事务处理的消息数量,最好是小于等于prefetch的数量.
      simple.default-requeue-rejected: # 决定被拒绝的消息是否重新入队;默认是true(与参数acknowledge-mode有关系)
      simple.idle-event-interval: # 多少长时间发布空闲容器时间,单位毫秒
      simple.retry.enabled: # 监听重试是否可用
      simple.retry.max-attempts: # 最大重试次数
      simple.retry.initial-interval: # 第一次和第二次尝试发布或传递消息之间的间隔
      simple.retry.multiplier: # 应用于上一重试间隔的乘数
      simple.retry.max-interval: # 最大重试时间间隔
      simple.retry.stateless: # 重试是有状态or无状态
    template:
      mandatory: # 启用强制信息;默认false
      receive-timeout: # receive() 操作的超时时间
      reply-timeout: # sendAndReceive() 操作的超时时间
      retry.enabled: # 发送重试是否可用
      retry.max-attempts: # 最大重试次数
      retry.initial-interval: # 第一次和第二次尝试发布或传递消息之间的间隔
      retry.multiplier: # 应用于上一重试间隔的乘数
      retry.max-interval: #最大重试时间间隔

创建队列和交换机并绑定

/**
 * @author Ybond
 */
@Component
public class QueueCofig {

    /**
     * 创建队列
     */
    @Bean
    public Queue addWatermarkQueue() {
        return new Queue("AddWatermark");
    }

    /**
     * 创建交换机
     */
    @Bean
    DirectExchange addWatermarkExchange() {
        return new DirectExchange("AddWatermark");
    }

    /**
     * 绑定
     */
    @Bean
    Binding bindingAddWatermarkExchange(Queue queue, DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("mq.addwatermark");
    }

}

生产

/**
 * @author Ybond
 */
@Component
public class AddWatermarkServiceImpl {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void send(WatermarkEntity watermarkEntity) {
        rabbitTemplate.convertAndSend("AddWatermark", "", watermarkEntity);
    }

}

消费

/**
 * @author Ybond
 */
@RabbitListener(queues = "AddWatermark")
@Component
public class AddWatermarkProcessor {

    @RabbitHandler()
    public void onMessage(WatermarkEntity watermarkEntity) {
        System.out.println("消费消息 " + watermarkEntity);
    }
}