应用场景

目前常见的应用软件都有消息的延迟推送的影子,应用也极为广泛,例如:

  • 淘宝七天自动确认收货。在签收商品后,物流系统会在七天后延时发送一个消息给支付系统,通知支付系统将款打给商家,这个过程持续七天,就是使用了消息中间件的延迟推送功能。
  • 12306 购票支付确认页面。在选好票点击确定跳转的页面中往往都会有倒计时,代表着 30分钟内订单不确认的话将会自动取消订单。其实在下订单那一刻开始购票业务系统就会发送一个延时消息给订单系统,延时30分钟,告诉订单系统订单未完成,如果我们在30分钟内完成了订单,则可以通过逻辑代码判断来忽略掉收到的消息。

在上面两种场景中,如果使用下面两种传统解决方案无疑大大降低了系统的整体性能和吞吐量:

  • 使用 redis 给订单设置过期时间,最后通过判断 redis中是否还有该订单来决定订单是否已经完成。这种解决方案相较于消息的延迟推送性能较低,因为我们知道redis都是存储于内存中,我们遇到恶意下单或者刷单的将会给内存带来巨大压力。
  • 使用传统的数据库轮询来判断数据库表中订单的状态,这无疑增加了IO次数,性能极低。
  • 使用 jvm 原生的 DelayQueue,也是大量占用内存,而且没有持久化策略,系统宕机或者重启都会丢失订单信息。

之前一般采用死信队列+TTL过期时间来实现延迟队列,现在RabbitMQ 官方提供了延迟队列的插件,这个插件是实验性的,但相当稳定,废话不多说开整

插件安装

  • 此插件需要 Erlang 23.2 或更高版本。Erlang windows下载

  • 最新版本针对 RabbitMQ3.8.x,较早的系列已不受支持。RabbitMQ下载

  • RabbitMQ 官方提供的延迟队列插件,下载放置到 RabbitMQ 根目录下的 plugins 内。延迟队列插件下载

  • 进入RabbitMQ安装目录的sbin目录下,在cmd窗口使用命令启用延迟插件

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

  • 禁用插件,使用如下命令,但请注意,所有尚未交付的延迟消息都将丢失
rabbitmq-plugins disable rabbitmq_delayed_message_exchange

注:本demo在windows环境下模拟,插件采用rabbitmq-delayed-message-exchange v3.8.x版本;插件启用或禁用时,若Rabbitmq服务一直是启用状态的话,需重启使其生效

实现延迟推送信息

依赖文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>spring-boot-project</artifactId>
<groupId>cn.goitman</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>rabbitmq-delayed-demo</artifactId>

<dependencies>
<!--Web 项目开发starter-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>

<!-- 消息队列依赖 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.1.4.RELEASE</version>
</dependency>

<!-- log4j日志-->
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>

<!-- fastJson -->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.76</version>
</dependency>

</dependencies>
</project>

配置文件

一般来说消息生产和消费是两个独立的项目,配置应该分开,这里为了方便就整合在一块啦

server:
port: 8080
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
# 连接超时,单位毫秒,0表示无穷大,不超时
connection-timeout: 15000
# 开启confirm确认机制
#publisher-confirms: true
publisher-confirm-type: correlated
# 开启return确认机制
publisher-returns: true
# 设置为true后,路由不到队列的消息不会被自动删除,从而被return消息模式监听到
template:
mandatory: true
listener:
simple:
# 表示消息确认方式,其有三种配置方式,分别是none(不确认)、manual(手动确认)和auto(自动确认);默认auto
acknowledge-mode: manual
# 最小消费者数量
concurrency: 1
# 最大消费者数量
max-concurrency: 10
# 在单个请求中处理的消息个数,必须大于等于transaction(事务)数量.
prefetch: 2
# 当ack模式为auto时,一个事务(ack间)处理的消息数量
#transaction-size:
  1. 在RabbitMQ中可以虚拟消息服务器VirtualHost,每个VirtualHost相当月一个相对独立的RabbitMQ服务器,每个VirtualHost之间是相互隔离的。exchange、queue、message不能互通。
  2. 在springboot2.2.0.RELEASE版本之前是amqp正式支持的属性,用来配置消息发送到交换器之后是否触发回调方法,在2.2.0及之后该属性过期使用spring.rabbitmq.publisher-confirm-type属性配置代替,用来配置更多的确认类型;
    • NONE值是禁用发布确认模式,是默认值
    • CORRELATED值是发布消息成功到交换器后会触发回调方法
    • SIMPLE值经测试有两种效果,其一效果和CORRELATED值一样会触发回调方法,其二在发布消息成功后使用rabbitTemplate调用waitForConfirms或waitForConfirmsOrDie方法等待broker节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是waitForConfirmsOrDie方法如果返回false则会关闭channel,则接下来无法发送消息到broker;

引导类

package cn.goitman;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
* @author Nicky
* @version 1.0
* @className RabbitmqApplication
* @blog goitman.cn | blog.csdn.net/minkeyto
* @description 引导类
* @date 2021/7/9 17:07
*/
@SpringBootApplication
public class RabbitmqApplication {
public static void main(String[] args) {
SpringApplication.run(RabbitmqApplication.class, args);
}
}

实体类

模拟一个订单对象,发送、接收对象信息

package cn.goitman.pojo;

/**
* @author Nicky
* @version 1.0
* @className Order
* @blog goitman.cn | blog.csdn.net/minkeyto
* @description 订单对象
* @date 2021/7/9 15:43
*/
public class Order {

// 订单ID
public String orderId;

// 订单状态,0:投递中、1:消费中、2:消费成功
public String orderStatus;

public Order() {
}

public Order(String orderId, String orderStatus) {
this.orderId = orderId;
this.orderStatus = orderStatus;
}

public String getOrderId() {
return orderId;
}

public void setOrderId(String orderId) {
this.orderId = orderId;
}

public String getOrderStatus() {
return orderStatus;
}

public void setOrderStatus(String orderStatus) {
this.orderStatus = orderStatus;
}
}

配置类

package cn.goitman.config;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.RabbitListenerContainerFactory;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* @author Nicky
* @version 1.0
* @className RabbitmqConfig
* @blog goitman.cn | blog.csdn.net/minkeyto
* @description 配置类,创建交换机、路由键、队列和之间的关联绑定
* @date 2021/7/9 17:06
*/
@Configuration
public class RabbitmqConfig {

private final static Logger logger = LoggerFactory.getLogger(RabbitmqConfig.class);

// 交换机名称
public static final String DELAYED_EXCHANGE = "delayedExchange";
// 队列名称
public static final String DELAYED_QUEUE = "delayedQueue";
// 路由键,#匹配一个或多个词
public static final String DELAYED_KEY = "delayed.#";

/**
* 创建主题模式交换机,
*/
@Bean
public TopicExchange delayedExchange() {
// 参数一:交换机名称;参数二:数据是否持久化;参数三:数据是否自动删除
TopicExchange exchange = new TopicExchange(DELAYED_EXCHANGE, true, false);
// 开启延迟队列
exchange.setDelayed(true);
return exchange;
}

/**
* 创建队列
*/
@Bean
public Queue delayedQueue() {
// 参数一:队列名称;参数二:数据是否持久化
return new Queue(DELAYED_QUEUE, true);
}

/**
* 绑定交换机和队列之间的联系,并配置路由键字符
*/
@Bean
public Binding delayedBinding(){
return BindingBuilder.bind(delayedQueue()).to(delayedExchange()).with(DELAYED_KEY);
}

/**
* 消费者JSON数据反序列化
*/
@Bean
public RabbitListenerContainerFactory<?> rabbitListenerContainerFactory(ConnectionFactory connectionFactory){
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
/* 不设置手动确认,将会报错:
* Channel shutdown: channel error; protocol method: #method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - unknown delivery tag 1, class-id=60, method-id=80)
*/
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
factory.setMessageConverter(new Jackson2JsonMessageConverter());
return factory;
}
}

消息生产者

package cn.goitman.component;

import cn.goitman.config.RabbitmqConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.UUID;

/**
* @author Nicky
* @version 1.0
* @className ProducerSender
* @blog goitman.cn | blog.csdn.net/minkeyto
* @description 消息生产者
* @date 2021/7/9 16:06
*/
@Component
public class ProducerSender {

private final static Logger logger = LoggerFactory.getLogger(ProducerSender.class);

@Autowired
private RabbitTemplate rabbitTemplate;

/**
* 消息发送确认回调方法,确保消息是否发送到交换机
*/
final RabbitTemplate.ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {

/**
* correlationData:SpringBoot提供的业务标识对象,封装业务ID信息,需要在发送消息时传入此参数,这里才能接收到,否则是null
* ack:消息发送的结果状态,成功是true,失败是false
* cause:发送失败的描述信息,如果发送成功是null。
*/
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
logger.info("correlationData:{},ack:{},cause:{}",correlationData.toString(), ack, cause);
}
};

/**
* 消息发送失败回调方法,可能是队列或路由键不存在等等
*/
final RabbitTemplate.ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
/**
* message:发送的信息内容
* replyCode:状态码,200为成功
* replyText:失败信息
* exchange:交换机名称
* routingKey:路由键
*/
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
logger.info("returnedMessage:{},replyCode:{},replyText:{},exchange:{},routingKey:{}",
new String(message.getBody()), replyCode, replyText, exchange, routingKey);
}
};

/**
* 消息发送,一般定时任务配合
*/
public void sendMessage(Object message) {
// 消息发送确认,处理消息到交换机之间的逻辑
rabbitTemplate.setConfirmCallback(confirmCallback);
// 设为true,消息不会自动删除,而被return消息模式监听
rabbitTemplate.setMandatory(true);
// 消息失败监听,处理交换机到队列之间的逻辑
rabbitTemplate.setReturnCallback(returnCallback);
// 生产者JSON数据序列化
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());

// 消息发送,标准发送信息和延时发送差异在于MessagePostProcessor
// rabbitTemplate.convertAndSend(RabbitmqConfig.DELAYED_EXCHANGE,"delayed.boot",message,new CorrelationData(UUID.randomUUID().toString().replace("-","")));
rabbitTemplate.convertAndSend(RabbitmqConfig.DELAYED_EXCHANGE, "delayed.boot", message, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 设置消息持久化
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
// 设置消息延迟发送时间,单位毫秒ms
message.getMessageProperties().setDelay(6000);
return message;
}
// 消息唯一ID
}, new CorrelationData(UUID.randomUUID().toString().replace("-", "")));
}
}

消息消费者

package cn.goitman.component;

import cn.goitman.pojo.Order;
import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;

/**
* @author Nicky
* @version 1.0
* @className ConsumerReceiver
* @blog goitman.cn | blog.csdn.net/minkeyto
* @description 消息消费者
* @date 2021/7/9 16:07
*/
@Component
public class ConsumerReceiver {

private final static Logger logger = LoggerFactory.getLogger(ConsumerReceiver.class);

/**
* ·@RabbitListener可以标注在类上,当在类上时需@RabbitHandler配合使用,
* 如有多个@RabbitHandler,根据MessageConverter转换后的对象来匹配哪个方法处理
*
* ·@RabbitListener(queues = "delayedQueue",containerFactory = "")
* ·containerFactory:可指定一个RabbitListenerContainerFactory的bean,默认为rabbitListenerContainerFactory的实例
* 也可在rabbitListenerContainerFactory实例上的@Bean注解中标记名称如:@Bean("rabbitListenerContainerFactory2")
*/
@RabbitListener(queues = "delayedQueue")
public void receiverMessage(Message msg , Channel channel) throws IOException {
// 应避免脏数据的接收,若数据一直消费失败而退回队列,队列又一直发送数据给消费者,将造成无限循环,导致内存溢出系统崩溃
Order order = JSONObject.parseObject(new String(msg.getBody(),"UTF-8"), Order.class);
logger.info("order:{}",order.toString());

// 获取消息数量,可和批量确认一起使用
// channel.basicQos(10);

// boolean flag = ****(); 在此做逻辑,返回boolean类型决定消息是走确认机制,还是退回机制
boolean flag = true;

if (flag) {
/*
* 确认机制,参数一:消息唯一标识;参数二:是否批量确认,false为不开启
* 若开启批量确认,最后一条确认的ID,会把之前未确认的消息一并确认
* 开启批量后需做好幂等性处理,若消息在未确认之前,连接中断会造成重复消费
*/
channel.basicAck(msg.getMessageProperties().getDeliveryTag(),false);
logger.info("消费成功,ID:{}",msg.getMessageProperties().getDeliveryTag());
}else {
/*
* 退回机制,参数一:唯一标识符;参数二:是否批量退回,false为单条退回;参数三:是否把消息退回队列中,false为废弃消息
* 若有多个消费者需做好幂等性处理,避免消息重复消费
*/
channel.basicNack(msg.getMessageProperties().getDeliveryTag(),false,true);
logger.info("消费失败,ID:{}",msg.getMessageProperties().getDeliveryTag());
}
}
}

测试类

当然也可以写个测试方法,这里就这样啦

package cn.goitman.controller;

import cn.goitman.component.ProducerSender;
import cn.goitman.pojo.Order;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;

/**
* @author Nicky
* @version 1.0
* @className MessageSendTest
* @blog goitman.cn | blog.csdn.net/minkeyto
* @description 测试
* @date 2021/7/9 17:06
*/
@RestController
public class MessageSendTest {

@Autowired
private ProducerSender producerSender;

@PostMapping("/delayedSend")
public void delayedSend(@RequestBody Order order){
producerSender.sendMessage(order);
}
}

测试结果

交换机

绑定交换机和队列之间的联系,并配置路由键字符

队列

6秒后交换机将数据发送到队列,队列即时发送给消费端消费数据

源码地址:https://github.com/wangdaicong/spring-boot-project/tree/master/rabbitmq-delayed-demo