消息队列是一种异步的服务间通信方式,适用于无服务器和微服务架构。消息在被处理和删除之前一直存储在队列上。每条消息仅可被一位用户处理一次。消息队列可被用于分离重量级处理、缓冲或批。
_假设场景:_在业务中,我们异步
调用了其他**服务A
****,当服务A
处理成功完成后,回调到主业务
**流程正常处理完成;
当**服务A
出现异常时;主业务
通常是不知道,会一直等待服务A
的回调处理,线程得不到释放,引发线上故障,这个时候,我们就需要在主业务
中,增加超时机制
,来保证主业务
流程不受到其他业务
**的影响。
以下关于延迟消息的处理分别围绕:业务事件,超时时间,业务类型,业务阶段,回调处理 五个主体功能进行实践。
构建延时消息表
创建超时处理消息表,用户记录_业务事件相关的信息。_
/** 创建超时处理消息表 */
@Data
@Builder
@ToString
@NoArgsConstructor
@AllArgsConstructor
@JsonInclude(JsonInclude.Include.NON_NULL)
@TableName(value = "app_delay_message")
public class AppDelayMessage {
/** ID */
private String id;
/** 应用ID */
private String appId;
/** 超时时长(H) */
private Integer ttl;
/** type1, type2*/
private Type type;
/** 编译:COMPILE; 测试:TEST; */
private Stage stage;
/** 待处理:PENDING; 已处理:PROCESSED; 超时:TIMEOUT; 无效:INVALID; */
private Status status;
/** 备考 */
private String remark;
/** 回调函数 */
private String callback;
/** 创建时间 */
private LocalDateTime createTime;
/** 修改时间 */
private LocalDateTime modifyTime;
/** 删除标 */
private String deleteFlg;
/** 场景类型 */
public enum Type {
/** 场景1 */
TYPE1,
/** 场景2 */
TYPE2
}
/** 阶段 */
public enum Stage {
/** 编译 */
REAL_COMPILE
}
/** 状态 */
public enum Status {
/** 待处理 */
PENDING,
/** 已处理 */
PROCESSED,
/** 超时 */
TIMEOUT,
/** 无效 */
INVALID
}
}
延迟消息消费者
消息消费者由 DelayQueueConsumer 创建,启动异步线程用于消费以超时的消息,方法中设置延迟队列和超时错误处理。
用于记录消息和更新消息状态。
/**延迟消息消费者 */
@Slf4j
@Component
public class DelayQueueConsumer implements Runnable {
/** 延迟队列 */
private DelayQueue<DelayMessage> delayQueue;
/**
* 设置延迟队列
* @param delayQueue 延迟队列
*/
public void setDelayQueue(DelayQueue<DelayMessage> delayQueue) {
this.delayQueue = delayQueue;
}
/** 超时消息处理服务 */
@Autowired
private AppDelayMessageService service;
@Override
public void run() {
while (true) {
try {
log.info("@@ 启动异步线程 [{}] 消费以超时的消息", Thread.currentThread().getName());
// 如果暂时没有过期消息或者队列为空, 则 take 方法会被阻塞, 直到有过期的消息为止
DelayMessage delayMessage = delayQueue.take();
AppDelayMessage message = JSON.parseObject(delayMessage.getMessage(), AppDelayMessage.class);
// 处理 TIMEOUT 异常
handleTimeoutError(message);
log.info("@@ 以消费消息:{}", delayMessage.getMessage());
} catch (InterruptedException e) {
log.error("@@ 线程 [{}] 消费消息异常", Thread.currentThread().getName(), e);
}
}
}
/**
* 超时错误处理
* @param message 消息内容
*/
@Transactional
public void handleTimeoutError(AppDelayMessage message) {
log.info("@@ 处理超时错误, AppDelayMessage:{}", message);
// 更新消息状态 [PENDING -> TIMEOUT]
boolean update = service.lambdaUpdate()
.set(AppDelayMessage::getStatus, AppDelayMessage.Status.TIMEOUT)
.set(AppDelayMessage::getModifyTime, LocalDateTime.now())
.eq(AppDelayMessage::getId, message.getId())
.eq(AppDelayMessage::getStatus, AppDelayMessage.Status.PENDING)
.update();
if (update) {
log.info("@@ 处理超时调用回调函数, message:{}", JSON.toJSONString(message));
service.callback(message);
}
}
}
延迟消息生产者
消息生产者由 DelayQueueProducer 创建,并用于将消息发送到 DelayQueue队列中。可以调用消息生产者的方法(offer
或 obtainQueue 方法)创建延迟消息队列入队列和获取延迟消息队列。
/** 延迟消息生产者 */
@Slf4j
@Component
public class DelayQueueProducer {
/** 创建延迟消息队列 */
private static final DelayQueue<DelayMessage> DELAY_QUEUE = new DelayQueue<>();
/**
* 消息入队列
* @param delayMessage 消息内容
* @return 成功:{@code true}, 失败:{@code false}
*/
public boolean offer(DelayMessage delayMessage) {
return DELAY_QUEUE.offer(delayMessage);
}
/**
* 获取延迟消息队列
* @return {@link DelayQueue<DelayMessage>}
*/
public DelayQueue<DelayMessage> obtainQueue() {
return DELAY_QUEUE;
}
}
延迟消息体
构建消息的结构体内容,设置统一标准的消息格式和自定义超时时间的范围。
/** 延迟消息体 */
@Data
public class DelayMessage implements Delayed {
/** 消息内容 */
private String message; // 延迟任务中的任务数据
/** ttl */
private long ttl; // 延迟任务到期时间(过期时间)
/**
* 构造函数
* @param message 消息实体
* @param ttl 延迟时间,单位毫秒
*/
public DelayMessage(String message, long ttl) {
setMessage(message);
this.ttl = System.currentTimeMillis() + ttl;
}
/**
* 获取消息触发剩余时间
* @param unit the time unit
* @return {@link long}
*/
@Override
public long getDelay(TimeUnit unit) {
// 计算该任务距离过期还剩多少时间
long remaining = ttl - System.currentTimeMillis();
return unit.convert(remaining, TimeUnit.MILLISECONDS);
}
/**
* 比较消息延时时长
* @param o {@link Delayed}
* @return 延时时长
*/
@Override
public int compareTo(Delayed o) {
// 比较、排序: 对任务的延时大小进行排序,将延时时间最小的任务放到队列头部
return (int)(this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
}
}
定义回调函数
自定义**Callback
** 注解,定义延时消息回调注解, 将回调类添加改注解 {@link Callback},注入到 Spring Ioc 容器。
/** 定义延时消息回调注解, 将回调类添加改注解 {@link Callback},注入到 Spring Ioc 容器。 */
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Component
public @interface Callback {
/**
* The value may indicate a suggestion for a logical component name,
* to be turned into a Spring bean in case of an autodetected component.
* @return the suggested component name, if any (or empty String otherwise)
*/
@AliasFor(annotation = Component.class)
String value() default "";
}
定义超时接口
提供给实现类完成回调函数处理方法
/** 回调函数接口 */
public interface TimeoutCallback {
/**
* 提供给实现类完成回调函数处理方法
* @param message 消息
*/
void handle(AppDelayMessage message);
}
延迟消息超时事件
Spring的事件**ApplicationEvent
**为bean和bean之间的消息通信提供了支持。当bean处理完一个事件之后,希望另一个bean能够知道并做相应的处理。这时其他bean监听当前bean所发送的事件。
事件流程如下:
- 自己的event需要继承 ApplicationEvent,并且写相应的构造函数
- 定义一个监听器listener,监听器(listener)具体根据事件发生的业务处理模块,可以接收处理事件中封装的对象。
- 使用ApplicationContext容器发布事件
/** 延迟消息超时事件 */
public class InvokeTimeoutEvent extends ApplicationEvent {
/**
* Create a new {@code ApplicationEvent}.
* @param source the object on which the event initially occurred or with which the event is associated (never
* {@code null})
*/
public InvokeTimeoutEvent(Object source) {
super(source);
}
}
延迟消息超时事件监听器
事件监听处理方法
@EventListener
注解,实现对任意的方法都能监听事件。
在任意方法上标注@EventListener 注解,指定 classes,即需要处理的事件类型,一般就是 ApplicationEven 及其子类,可以设置多项。
package com.example.demo.delay;
import com.alibaba.fastjson.JSON;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.event.EventListener;
import org.springframework.stereotype.Component;
import java.util.Objects;
/**
* 延迟消息超时事件监听器
*/
@Component
public class DelayMessageListener {
/** 延迟消息生产者 */
@Autowired
private DelayQueueProducer producer;
/** 创建超时消息处理服务接口 */
@Autowired
private AppDelayMessageService service;
/**
* 事件监听处理方法
* @param event {@link InvokeTimeoutEvent}
*/
@EventListener
public void onApplicationEvent(InvokeTimeoutEvent event) {
// 监听延迟消息触发事件
AppDelayMessage source = (AppDelayMessage)event.getSource();
if (service.save(source)) {
// 转换为毫秒
long ttl = 2 * 60 * 60 * 1000;
if (Objects.nonNull(source.getTtl())) {
ttl = source.getTtl() * 60 * 60 * 1000;
}
producer.offer(new DelayMessage(JSON.toJSONString(source), ttl));
}
}
}
以上构建完成了消息的生产,消费,与监听,下面我们进行功能测试。
线程池配置类
@EnableAsync
注解:启用了Spring的异步方法执行支持。确保你的应用程序中有@EnableAsync
生效,否则异步方法可能不会被正确地处理。ThreadPoolConfig
类中的成员变量:core
、max
、queue
、keepAlive
分别表示核心线程数量、最大线程数、排队线程数和线程回收时间。这些值似乎是通过@Value
注解从配置文件中读取的。确保在你的配置文件中有这些属性的正确配置。@Bean("toolThreadPool")
:这个方法定义了一个名为 “toolThreadPool” 的 Bean,返回一个ThreadPoolExecutor
对象。该方法使用了 Google Guava 库中的ThreadFactoryBuilder
来创建一个带有自定义线程名称的线程工厂。- 线程池配置:确保你在配置文件中设置了适当的值,以满足你应用的需求。特别是要注意核心线程数量、最大线程数、排队线程数和线程回收时间的设置,这些值应该根据你的应用负载和性能需求来调整。
- 异步方法的使用:确保你的应用中有异步方法的定义和调用,以便线程池得以发挥作用。
/** 线程池配置类 */
@EnableAsync
@Configuration
public class ThreadPoolConfig {
/** 核心线程数量 */
@Value("${import.thread.core}")
private Integer core;
/** 最大线程数 */
@Value("${import.thread.max}")
private Integer max;
/** 排队线程数 */
@Value("${import.thread.queue}")
private Integer queue;
/** 线程回收时间 */
@Value("${import.thread.keepAlive}")
private Integer keepAlive;
/**
* toolsThreadPool
* @return {@link ThreadPoolExecutor} 线程池
*/
@Bean("toolThreadPool")
public ThreadPoolExecutor arxmlThreadPoolExecutor() {
ThreadFactory namedThreadFactory = new ThreadFactoryBuilder().setNameFormat("build-tool-%d").build();
return new ThreadPoolExecutor(core, max, keepAlive, TimeUnit.SECONDS, new LinkedBlockingQueue<>(queue),
namedThreadFactory);
}
}
测试消息队列
DelayController
/** 测试类 */
@RestController
public class DelayController {
@Autowired
private AppDelayMessageService appDelayMessageService;
/**发布一个超时的监听*/
@GetMapping("/testPublishDelay")
public void testPublishDelay (){
appDelayMessageService.publish("00000000011111111", 10,
AppDelayMessage.Type.TYPE1,
AppDelayMessage.Stage.REAL_COMPILE,
AppAuditCallback.class);
}
/**当业务未超时,修改消息状态*/
@GetMapping("/testChangeDelay")
public void testChangeDelay (){
// 修改延迟消息状态
appDelayMessageService.changeToProcessed("00000000011111111",
AppDelayMessage.Stage.REAL_COMPILE);
}
}
AppDelayMessageService
/** 应用创建超时消息处理服务接口 */
public interface AppDelayMessageService extends IService<AppDelayMessage> {
/**
* 发布延迟消息
* @param appId 应用ID
* @param timeout 超时时长(H)
* @param type TYPE_ONE; TYPE_TWO;
* @param stage 编译:COMPILE; 测试:TEST;
* @param callback 回调函数
*/
void publish(String appId, Integer timeout, AppDelayMessage.Type type, AppDelayMessage.Stage stage, Class callback);
/**
* 超时回调
* @param message 消息体
*/
void callback(AppDelayMessage message);
/**
* 修改延迟消息状态[PROCESSED]
* @param appId 应用ID
* @param stage 编译:COMPILE; 测试:TEST;
* @return 成功:true, 失败:false
*/
boolean changeToProcessed(String appId, AppDelayMessage.Stage stage);
}
AppDelayMessageServiceImpl
AppDelayMessageMapper
超时处理
使用自定义的**@Callback** 注入,实现自定义TimeoutCallback接口。根据回调接口,对自定义的回调方法做超时处理。
当发生业务流程发生超时,使用自定义的回调函数,对超时的问题进行处理。
以上是完整的功能实现,请结合自身业务进行实践。
原文链接:https://juejin.cn/post/7447118874627571764 作者:不惑_