消息丢失的场景
- 生产者发送消息到 MQ 有可能丢失消息
- MQ 收到消息后写入硬盘可能丢失消息
- 消息写入硬盘后,硬盘坏了丢失消息
- 消费者消费 MQ 也可能丢失消息
- 整个 MQ 节点挂了丢失消息
生产者发送消息时如何保证不丢失?
解决发送时消息丢失的问题可以采用 RocketMQ 自带的事物消息机制
事物消息原理:首先生产者会发送一个half 消息(对原始消息的封装),该消息对消费者不可见,MQ 通过 ACK 机制返回消息接受状态, 生产者执行本地事务并且返回给 MQ 一个状态(Commit、RollBack 等),如果是 Commit 的话 MQ 就会把消息给到下游, RollBack 的话就会丢弃该消息,状态如果为 UnKnow 的话会过一段时间回查本地事务状态,默认回查 15 次,一直是 UnKnow 状态的话就会丢弃此消息。
为什么先发一个 half 消息,作用就是先判断下 MQ 有没有问题,服务正不正常。
MQ 收到消息后写入硬盘如何保证不丢失?
数据存盘绕过缓存,改为同步刷盘,这一步需要修改 Broker 的配置文件,将 flushDiskType 改为 SYNC_FLUSH 同步刷盘策略,默认的是 ASYNC_FLUSH 异步刷盘,一旦同步刷盘返回成功,那么就一定保证消息已经持久化到磁盘中了。
消息写入硬盘后,硬盘坏了如何保证不丢失?
为了保证磁盘损坏导致丢失数据,RocketMQ 采用主从机构,集群部署,Leader 中的数据在多个 Follower 中都存有备份,防止单点故障导致数据丢失。
Master 节点挂了怎么办?Master 节点挂了之后 DLedger 登场
- 接管 MQ 的 commitLog
- 选举从节点
- 文件复制 uncommited 状态 多半从节点收到之后改为 commited
消费者消费 MQ 如何保证不丢失?
- 如果是网络问题导致的消费失败可以进行重试机制,默认每条消息重试 16 次
- 多线程异步消费失败,MQ 认为已经消费成功但是实际上对于业务逻辑来说消息是没有落地的,解决方案就是按照 mq 官方推荐的先执行本地事务再返回成功状态。
整个 MQ 节点挂了如何保证不丢失?
这种极端情况可以消息发送失败之后先存入本地,例如放到缓存中,另外启动一个线程扫描缓存的消息去重试发送。
安装
NameServer
docker run -d -p 9876:9876 --name rmqserver apache/rocketmq ./mqnamesrv
Broker
vi /docker/rocketmq/conf/broker.conf
# 所属集群名称,如果节点较多可以配置多个
brokerClusterName = DefaultCluster
# broker名称,master和slave使用相同的名称,表明他们的主从关系
brokerName = broker-a
# 0表示Master,大于0表示不同的
slave brokerId = 0
# 表示几点做消息删除动作,默认是凌晨4点
deleteWhen = 04
# 在磁盘上保留消息的时长,单位是小时
fileReservedTime = 48
# 有三个值:SYNC_MASTER,ASYNC_MASTER,SLAVE;同步和异步表示Master和Slave之间同步数据的机 制;
brokerRole = ASYNC_MASTER
# 刷盘策略,取值为:ASYNC_FLUSH,SYNC_FLUSH表示同步刷盘和异步刷盘;SYNC_FLUSH消息写入磁盘后 才返回成功状态,ASYNC_FLUSH不需要;
flushDiskType = ASYNC_FLUSH
# 设置broker节点所在服务器的ip地址
brokerIP1 = 192.168.2.252
# 剩余磁盘比例
diskMaxUsedSpaceRatio=99
docker run -d -p 10911:10911 -p 10909:10909\
--name rmqbroker --link rmqserver:namesrv\
-e "NAMESRV_ADDR=namesrv:9876" -e "JAVA_OPTS=-Duser.home=/opt"\
-e "JAVA_OPT_EXT=-server -Xms128m -Xmx128m"\
-v /docker/rocketmq/conf/broker.conf:/etc/rocketmq/broker.conf \
apache/rocketmq ./mqbroker -c /etc/rocketmq/broker.conf
控制台
docker run -d --name rmqconsole -p 8180:8080 --link rmqserver:namesrv\
-e "JAVA_OPTS=-Drocketmq.namesrv.addr=namesrv:9876\
-Dcom.rocketmq.sendMessageWithVIPChannel=false"\
-t styletang/rocketmq-console-ng
使用
<!-- rocketmq-->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
</dependency>
发送
rocketmq.name-server=192.168.2.252:9876
rocketmq.producer.group=test-group
@GetMapping("/test01")
public Result test01() {
String str = RandomUtil.randomString(8);
rocketmtTemplate.convertAndSend("test-topic", str);
StaticLog.info("发送消息:" + str);
return Result.ok();
}
接收
rocketmq.name-server=192.168.2.252:9876
@Service
@RocketMQMessageListener(topic = "test-topic", consumerGroup = "test-group")
public class RocketMQReceive implements RocketMQListener<String> {
@Override
public void onMessage(String s) {
System.out.println("接收到消息:" + s);
}
}
消息类型
普通消息
可靠同步发送
同步发送是指消息发送方发出数据后,会在收到接收方发回响应之后才发下一个数据包的通讯方式。 此种方式应用场景非常广泛例如重要通知邮件、报名短信通知、营销短信系统等。
可靠异步发送
异步发送是指发送方发出数据后,不等接收方发回响应,接着发送下个数据包的通讯方式。发送方通过回调接口接收服务器响应,并对响应结果进行处理。
异步发送般用于链路耗时较长,对RT响应时间较为敏感的业务场景,例如用户视频上传后通知启动转码服务,转码完成后通知推送转码结果等。
单向发送
单向发送是指发送方只负责发送消息,不等待服务器回应且没有回调函数触发,即只发送请求不等待应答。
适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集。
顺序消息
顺序消息是消息队列提供的一种严格按照顺序来发布和消费的消息类型。
事务消息
RocketMQ提供了事务消息,通过事务消息就能达到分布式事务的最终一致。