应用场景

目前常见的应用软件都有消息的延迟推送的影子,应用也极为广泛,例如:

  • 淘宝七天自动确认收货。在签收商品后,物流系统会在七天后延时发送一个消息给支付系统,通知支付系统将款打给商家,这个过程持续七天,就是使用了消息中间件的延迟推送功能。
  • 12306 购票支付确认页面。在选好票点击确定跳转的页面中往往都会有倒计时,代表着 30分钟内订单不确认的话将会自动取消订单。其实在下订单那一刻开始购票业务系统就会发送一个延时消息给订单系统,延时30分钟,告诉订单系统订单未完成,如果我们在30分钟内完成了订单,则可以通过逻辑代码判断来忽略掉收到的消息。

在上面两种场景中,如果使用下面两种传统解决方案无疑大大降低了系统的整体性能和吞吐量:

  • 使用 redis 给订单设置过期时间,最后通过判断 redis中是否还有该订单来决定订单是否已经完成。这种解决方案相较于消息的延迟推送性能较低,因为我们知道redis都是存储于内存中,我们遇到恶意下单或者刷单的将会给内存带来巨大压力。
  • 使用传统的数据库轮询来判断数据库表中订单的状态,这无疑增加了IO次数,性能极低。
  • 使用 jvm 原生的 DelayQueue,也是大量占用内存,而且没有持久化策略,系统宕机或者重启都会丢失订单信息。

问题分析

消息从生产端到消费端消费要经过3个步骤:

  1. 生产端发送消息到RabbitMQ;
  2. RabbitMQ发送消息到消费端;
  3. 消费端消费这条消息;

这3个步骤中的每一步都有可能导致消息丢失,所以要有一些措施来保证系统的可靠性。磁盘损坏,机房爆炸等等都能导致数据丢失,当然这种都是极小概率发生,能做到99.999999%消息不丢失,就是可靠的了。

生产端可靠性投递

比如消息在网络传输的过程中发生网络故障消息丢失,或者消息投递到RabbitMQ时RabbitMQ挂了,那消息也可能丢失。针对以上情况,RabbitMQ本身提供了一些机制。

事务消息机制

事务消息机制由于会严重降低性能,所以一般不采用这种方法,在此不扩展了,而采用另一种轻量级的解决方案——confirm消息确认机制

confirm消息确认机制

什么是confirm消息确认机制?

就是生产端投递的消息一旦投递到RabbitMQ后,RabbitMQ就会发送一个确认消息给生产端,让生产端知道已经收到消息了,否则这条消息就可能已经丢失了,需要生产端重新发送消息了。

RabbitMQ消息处理

消息持久化

什么是消息持久化呢?

RabbitMQ收到消息后将这个消息暂时存在了内存中,如果RabbitMQ挂了,那重启后数据就丢失了,所以相关的数据应该持久化到硬盘中,这样就算RabbitMQ重启后也可以到硬盘中取数据恢复。

如何持久化呢?

message消息到达RabbitMQ后先是到exchange交换机中,然后路由给queue队列,最后发送给消费端。所有需要给exchange、queue和message都进行持久化,这样,如果RabbitMQ收到消息后挂了,重启后会自行恢复消息。

消息入库

前面提到了会有极端情况,比如RabbitMQ收到消息还没来得及将消息持久化到硬盘时,RabbitMQ挂了,这样消息还是丢失了,或者RabbitMQ在发送确认消息给生产端的过程中,由于网络故障而导致生产端没有收到确认消息,这样生产端就不知道RabbitMQ到底有没有收到消息,就不好做接下来的处理。

所以除了RabbitMQ提供的一些机制外,也要做一些消息补偿机制,以应对一些极端情况。

消息入库,就是将要发送的消息保存到数据库中

首先发送消息前先将消息保存到数据库中,有一个状态字段status=0,表示生产端将消息发送给了RabbitMQ但还没收到确认;在生产端收到确认后将status设为1,表示RabbitMQ已收到消息。这里有可能会出现上面说的两种情况,所以生产端这边开一个定时器,定时检索消息表,将status=0并且超过固定时间后,还没收到确认的消息取出重发(第二种情况下这里会造成消息重复,消费者端要做幂等性),可能重发还会失败,所以可以做一个最大重发次数,超过就做另外的处理。

这样消息就可以可靠性投递到RabbitMQ中了,而生产端也可以感知到了。

注:本文案例没实现消息入库方案,可自行加逻辑,不难

消费端消息不丢失

默认情况下,以下3种情况会导致消息丢失:

  • 在RabbitMQ将消息发出后,消费端还没接收到消息之前,发生网络故障,消费端与RabbitMQ断开连接,此时消息会丢失;
  • 在RabbitMQ将消息发出后,消费端还没接收到消息之前,消费端挂了,此时消息会丢失;
  • 消费端正确接收到消息,但在处理消息的过程中发生异常或宕机了,消息也会丢失。

其实,上述3中情况导致消息丢失归根结底是因为RabbitMQ的自动ack机制,即默认RabbitMQ在消息发出后就立即将这条消息删除,而不管消费端是否接收到,是否处理完,导致消费端消息丢失时RabbitMQ自己又没有这条消息了。

所以就需要将自动ack机制改为手动ack机制。

对于RabbitMQ服务端而言,队列中的消息分成了两个部分:一部分是等待投递给消费端的消息一部分是已经投递给消费端,但是还没有收到消费端确认信号的消息。如果RabbitMQ一直没有收到消费端的确认信号,并且消费此消息的消费端已经断开连接或宕机(RabbitMQ会自己感知到),则RabbitMQ会安排该消息重新进入队列(放在队列头部),等待投递给下一个消费者,当然也有能还是原来的那个消费端,当然消费端也需要确保幂等性

之前一般采用死信队列+TTL过期时间来实现延迟队列,现在RabbitMQ 官方提供了延迟队列的插件,这个插件是实验性的,但相当稳定,废话不多说开整

死信队列

死信(Dead Letter) 是Rabbitmq 提供的一种机制。当一条消息满足下列条件之一那么它会成为死信:

  • 消息被否定确认 (如channel.basicNack) 并且此时requeue属性被设置为false
  • 消息在队列的存活时间超过设置的TTL时间
  • 消息队列的消息数量已经超过最大队列长度

若配置了死信队列,死信会被 Rabbitmq 投到死信队列中。

在Rabbitmq 中创建死信队列的操作流程大概是:

  • 创建一个交换机作为死信交换机
  • 在业务队列中配置 x-dead-letter-exchangex-dead-letter-routing-key,将第一步的交换机设为业务队列的死信交换机
  • 在死信交换机上创建队列,并监听此队列

死信队列的设计目的:为了存储没有被正常消费的消息,便于排查和重新投递

死信队列同样也没有对投递时间做出保证,在第一条消息成为死信之前,后面的消息即使过期也不会投递为死信。

为了解决这个问题,Rabbit 官方推出了延迟投递插件 rabbitmq-delayed-message-exchange ,推荐使用官方插件来做延时消息。

插件安装

注:延迟插件 rabbitmq-delayed-message-exchange 是在 RabbitMQ 3.5.7 及以上的版本才支持的,依赖 Erlang/OPT 18.0 及以上运行环境。

  • 此插件需要 Erlang 23.2 或更高版本。Erlang windows下载

  • 最新版本针对 RabbitMQ3.8.x,较早的系列已不受支持。RabbitMQ下载

  • RabbitMQ 官方提供的延迟队列插件,下载放置到 RabbitMQ 根目录下的 plugins 内。延迟队列插件下载

  • 进入RabbitMQ安装目录的sbin目录下,在cmd窗口使用命令启用延迟插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

  • 禁用插件,使用如下命令,但请注意,所有尚未交付的延迟消息都将丢失
rabbitmq-plugins disable rabbitmq_delayed_message_exchange

注:本demo在windows环境下模拟,插件采用rabbitmq-delayed-message-exchange v3.8.x版本;插件启用或禁用时,若Rabbitmq服务一直是启用状态的话,需重启使其生效

实现延迟推送信息

依赖文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>spring-boot-project</artifactId>
<groupId>cn.goitman</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>rabbitmq-delayed-demo</artifactId>

<dependencies>
<!--Web 项目开发starter-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<!-- 消息队列依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.1.4.RELEASE</version>
</dependency>

<!-- log4j日志-->
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>

<!-- fastJson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.76</version>
</dependency>

</dependencies>
</project>

配置文件

一般来说消息生产和消费是两个独立的项目,配置应该分开,这里为了方便就整合在一块啦

server:
port: 8080
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
# 连接超时,单位毫秒,0表示无穷大,不超时
connection-timeout: 15000
# 开启confirm确认机制
#publisher-confirms: true
publisher-confirm-type: correlated
# 开启return确认机制
publisher-returns: true
# 设置为true后,路由不到队列的消息不会被自动删除,从而被return消息模式监听到
template:
mandatory: true
listener:
simple:
# 表示消息确认方式,其有三种配置方式,分别是none(不确认)、manual(手动确认)和auto(自动确认);默认auto
acknowledge-mode: manual
# 最小消费者数量
concurrency: 1
# 最大消费者数量
max-concurrency: 10
# 在单个请求中处理的消息个数,必须大于等于transaction(事务)数量.
prefetch: 2
# 当ack模式为auto时,一个事务(ack间)处理的消息数量
#transaction-size:
  1. 在RabbitMQ中可以虚拟消息服务器VirtualHost,每个VirtualHost相当月一个相对独立的RabbitMQ服务器,每个VirtualHost之间是相互隔离的。exchange、queue、message不能互通。
  2. 在springboot2.2.0.RELEASE版本之前是amqp正式支持的属性,用来配置消息发送到交换器之后是否触发回调方法,在2.2.0及之后该属性过期使用spring.rabbitmq.publisher-confirm-type属性配置代替,用来配置更多的确认类型;
  • NONE值是禁用发布确认模式,是默认值
  • CORRELATED值是发布消息成功到交换器后会触发回调方法
  • SIMPLE值经测试有两种效果,其一效果和CORRELATED值一样会触发回调方法,其二在发布消息成功后使用rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie方法等待broker节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie方法如果返回false则会关闭channel,则接下来无法发送消息到broker;

引导类

package cn.goitman;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
* @author Nicky
* @version 1.0
* @className RabbitmqApplication
* @blog goitman.cn | blog.csdn.net/minkeyto
* @description 引导类
* @date 2021/7/9 17:07
*/
@SpringBootApplication
public class RabbitmqApplication {
public static void main(String[] args) {
SpringApplication.run(RabbitmqApplication.class, args);
}
}

实体类

模拟一个订单对象,发送、接收对象信息

package cn.goitman.pojo;

/**
* @author Nicky
* @version 1.0
* @className Order
* @blog goitman.cn | blog.csdn.net/minkeyto
* @description 订单对象
* @date 2021/7/9 15:43
*/
public class Order {

// 订单ID
public String orderId;

// 订单状态,0:投递中、1:消费中、2:消费成功
public String orderStatus;

public Order() {
}

public Order(String orderId, String orderStatus) {
this.orderId = orderId;
this.orderStatus = orderStatus;
}

public String getOrderId() {
return orderId;
}

public void setOrderId(String orderId) {
this.orderId = orderId;
}

public String getOrderStatus() {
return orderStatus;
}

public void setOrderStatus(String orderStatus) {
this.orderStatus = orderStatus;
}
}

配置类

package cn.goitman.config;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* @author Nicky
* @version 1.0
* @className RabbitmqConfig
* @blog goitman.cn | blog.csdn.net/minkeyto
* @description 配置类,创建交换机、路由键、队列和之间的关联绑定
* @date 2021/7/9 17:06
*/
@Configuration
public class RabbitmqConfig {

private final static Logger logger = LoggerFactory.getLogger(RabbitmqConfig.class);

// 交换机名称
public static final String DELAYED_EXCHANGE = "delayedExchange";
// 队列名称
public static final String DELAYED_QUEUE = "delayedQueue";
// 路由键,#匹配一个或多个词
public static final String DELAYED_KEY = "delayed.#";

/**
* 创建主题模式交换机,
*/
@Bean
public TopicExchange delayedExchange() {
// 参数一:交换机名称;参数二:数据是否持久化;参数三:数据是否自动删除
TopicExchange exchange = new TopicExchange(DELAYED_EXCHANGE, true, false);
// 开启延迟队列
exchange.setDelayed(true);
return exchange;
}

/**
* 创建队列
*/
@Bean
public Queue delayedQueue() {
// 参数一:队列名称;参数二:数据是否持久化
return new Queue(DELAYED_QUEUE, true);
}

/**
* 绑定交换机和队列之间的联系,并配置路由键字符
*/
@Bean
public Binding delayedBinding(){
return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with(DELAYED_KEY);
}

/**
* 消费者JSON数据反序列化
*/
@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
/* 不设置手动确认,将会报错:
* Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)
*/
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
return factory;
}
}

消息生产者

package cn.goitman.component;

import cn.goitman.config.RabbitmqConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.UUID;

/**
* @author Nicky
* @version 1.0
* @className ProducerSender
* @blog goitman.cn | blog.csdn.net/minkeyto
* @description 消息生产者
* @date 2021/7/9 16:06
*/
@Component
public class ProducerSender {

private final static Logger logger = LoggerFactory.getLogger(ProducerSender.class);

@Autowired
private RabbitTemplate rabbitTemplate;

/**
* 消息发送确认回调方法,确保消息是否发送到交换机
*/
final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {

/**
* correlationData:SpringBoot提供的业务标识对象,封装业务ID信息,需要在发送消息时传入此参数,这里才能接收到,否则是null
* ack:消息发送的结果状态,成功是true,失败是false
* cause:发送失败的描述信息,如果发送成功是null。
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
logger.info("correlationData:{},ack:{},cause:{}",correlationData.toString(), ack, cause);
}
};

/**
* 消息发送失败回调方法,可能是队列或路由键不存在等等
*/
final RabbitTemplate.ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
/**
* message:发送的信息内容
* replyCode:状态码,200为成功
* replyText:失败信息
* exchange:交换机名称
* routingKey:路由键
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
logger.info("returnedMessage:{},replyCode:{},replyText:{},exchange:{},routingKey:{}",
new String(message.getBody()), replyCode, replyText, exchange, routingKey);
}
};

/**
* 消息发送,一般定时任务配合
*/
public void sendMessage(Object message) {
// 消息发送确认,处理消息到交换机之间的逻辑
rabbitTemplate.setConfirmCallback(confirmCallback);
// 设为true,消息不会自动删除,而被return消息模式监听
rabbitTemplate.setMandatory(true);
// 消息失败监听,处理交换机到队列之间的逻辑
rabbitTemplate.setReturnCallback(returnCallback);
// 生产者JSON数据序列化
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());

// 消息发送,标准发送信息和延时发送差异在于MessagePostProcessor
// rabbitTemplate.convertAndSend(RabbitmqConfig.DELAYED_EXCHANGE,"delayed.boot",message,new CorrelationData(UUID.randomUUID().toString().replace("-","")));
rabbitTemplate.convertAndSend(RabbitmqConfig.DELAYED_EXCHANGE, "delayed.boot", message, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 设置消息持久化
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
// 设置消息延迟发送时间,单位毫秒ms
message.getMessageProperties().setDelay(6000);
return message;
}
// 消息唯一ID
}, new CorrelationData(UUID.randomUUID().toString().replace("-", "")));
}
}

消息消费者

package cn.goitman.component;

import cn.goitman.pojo.Order;
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
* @author Nicky
* @version 1.0
* @className ConsumerReceiver
* @blog goitman.cn | blog.csdn.net/minkeyto
* @description 消息消费者
* @date 2021/7/9 16:07
*/
@Component
public class ConsumerReceiver {

private final static Logger logger = LoggerFactory.getLogger(ConsumerReceiver.class);

/**
* ·@RabbitListener可以标注在类上,当在类上时需@RabbitHandler配合使用,
* 如有多个@RabbitHandler,根据MessageConverter转换后的对象来匹配哪个方法处理
*
* ·@RabbitListener(queues = "delayedQueue",containerFactory = "")
* ·containerFactory:可指定一个RabbitListenerContainerFactory的bean,默认为rabbitListenerContainerFactory的实例
* 也可在rabbitListenerContainerFactory实例上的@Bean注解中标记名称如:@Bean("rabbitListenerContainerFactory2")
*/
@RabbitListener(queues = "delayedQueue")
public void receiverMessage(Message msg , Channel channel) throws IOException {
// 应避免脏数据的接收,若数据一直消费失败而退回队列,队列又一直发送数据给消费者,将造成无限循环,导致内存溢出系统崩溃
Order order = JSONObject.parseObject(new String(msg.getBody(),"UTF-8"), Order.class);
logger.info("order:{}",order.toString());

// 获取消息数量,可和批量确认一起使用
// channel.basicQos(10);

// boolean flag = ****(); 在此做逻辑,返回boolean类型决定消息是走确认机制,还是退回机制
boolean flag = true;

if (flag) {
/*
* 确认机制,参数一:消息唯一标识;参数二:是否批量确认,false为不开启
* 若开启批量确认,最后一条确认的ID,会把之前未确认的消息一并确认
* 开启批量后需做好幂等性处理,若消息在未确认之前,连接中断会造成重复消费
*/
channel.basicAck(msg.getMessageProperties().getDeliveryTag(),false);
logger.info("消费成功,ID:{}",msg.getMessageProperties().getDeliveryTag());
}else {
/*
* 退回机制,参数一:唯一标识符;参数二:是否批量退回,false为单条退回;参数三:是否把消息退回队列中,false为废弃消息
* 若有多个消费者需做好幂等性处理,避免消息重复消费
*/
channel.basicNack(msg.getMessageProperties().getDeliveryTag(),false,true);
logger.info("消费失败,ID:{}",msg.getMessageProperties().getDeliveryTag());
}
}
}

测试类

当然也可以写个测试方法,这里就这样啦

package cn.goitman.controller;

import cn.goitman.component.ProducerSender;
import cn.goitman.pojo.Order;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;

/**
* @author Nicky
* @version 1.0
* @className MessageSendTest
* @blog goitman.cn | blog.csdn.net/minkeyto
* @description 测试
* @date 2021/7/9 17:06
*/
@RestController
public class MessageSendTest {

@Autowired
private ProducerSender producerSender;

@PostMapping("/delayedSend")
public void delayedSend(@RequestBody Order order){
producerSender.sendMessage(order);
}
}

测试结果

交换机

绑定交换机和队列之间的联系,并配置路由键字符

队列

6秒后交换机将数据发送到队列,队列即时发送给消费端消费数据

源码地址:https://github.com/wangdaicong/spring-boot-project/tree/master/rabbitmq-delayed-demo