一、基于内存的延时队列
一、核心组件与适用场景
适用条件:
单机部署,订单量较小(日均订单<10万)
对实时性要求中等(误差容忍度在秒级)
允许系统重启时丢失未处理任务
核心类:
java.util.concurrent.DelayQueue
:基于优先级队列的阻塞队列Delayed
接口:实现getDelay()
和compareTo()
定义延迟逻辑
二、代码实现步骤
1. 定义延迟订单对象
public class OrderDelayItem implements Delayed {
private final String orderId;
private final long expireTime; // 到期时间戳
public OrderDelayItem(String orderId, long delaySeconds) {
this.orderId = orderId;
this.expireTime = System.currentTimeMillis() + delaySeconds * 1000;
}
@Override
public long getDelay(TimeUnit unit) {
return unit.convert(expireTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
}
@Override
public int compareTo(Delayed o) {
return Long.compare(this.expireTime, ((OrderDelayItem)o).expireTime);
}
// Getter省略
}
2. 初始化队列与消费者线程
public class OrderTimeoutService {
private static final DelayQueue<OrderDelayItem> delayQueue = new DelayQueue<>();
static {
// 启动消费者线程
Executors.newSingleThreadExecutor().execute(() -> {
while (!Thread.currentThread().isInterrupted()) {
try {
OrderDelayItem item = delayQueue.take();
handleTimeoutOrder(item.getOrderId());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
});
}
// 添加订单到队列
public static void addOrder(String orderId, int timeoutSeconds) {
delayQueue.put(new OrderDelayItem(orderId, timeoutSeconds));
}
private static void handleTimeoutOrder(String orderId) {
// 执行取消订单逻辑
System.out.println("订单超时取消: " + orderId);
}
}
三、关键实现逻辑
生产者逻辑:
创建订单时将
OrderDelayItem
对象放入队列示例:
OrderTimeoutService.addOrder("202504300001", 1800);
(设置30分钟超时)
消费者逻辑:
单线程循环调用
take()
方法阻塞获取到期订单实际项目中需添加异常处理(如数据库操作失败重试)
补偿机制:
系统启动时扫描数据库未完成订单重新入队
定时任务二次校验防止内存队列漏处理
四、优缺点对比
五、生产环境建议
监控指标:
队列积压数量(
delayQueue.size()
)消费线程存活状态
优化方向:
结合Redis持久化存储订单ID
采用双队列模式(主队列处理+备份队列持久化)
兜底方案:
每小时执行数据库扫描:
UPDATE orders SET status='CANCELED'
WHERE status='PENDING' AND create_time < NOW() - INTERVAL 30 MINUTE
典型应用场景
本地缓存过期清理
单机版预约系统超时释放
开发环境模拟支付回调
对于更高并发或分布式场景,建议改用Redis ZSET
或RabbitMQ延迟队列
。如需完整代码示例,可参考网页1、6、7中的实现细节。
二、基于MQ的延时队列
一、RabbitMQ实现方案(推荐)
1. TTL+死信队列方案
// 配置死信队列(Spring Boot示例)
@Bean
public Queue orderQueue() {
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "dlx_exchange"); // 死信交换机
args.put("x-dead-letter-routing-key", "dlx_key"); // 路由键
args.put("x-message-ttl", 1800000); // TTL设为30分钟
return new Queue("order.queue", true, false, false, args);
}
// 生产者发送订单消息
rabbitTemplate.convertAndSend("order_exchange", "order.create", order);
2. 延迟插件方案
// 配置延迟交换机
@Bean
public CustomExchange delayedExchange() {
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange("delayed_exchange", "x-delayed-message", true, false, args);
}
// 发送延迟消息(30分钟)
Message message = MessageBuilder.withBody(orderJson.getBytes())
.setHeader("x-delay", 1800000)
.build();
rabbitTemplate.send("delayed_exchange", "order.delay", message);
适用场景:
精准延迟控制(如30分钟±5秒)
需要处理不同订单的差异化超时时间
二、RocketMQ实现方案
1. 原生延迟消息
// 发送延迟消息(延迟级别对应30分钟=16级)
rocketMQTemplate.syncSend(
"orderCancelTopic:delay16",
MessageBuilder.withPayload(order).build(),
10000 // 发送超时时间
);
// 消费者监听
@RocketMQMessageListener(topic = "orderCancelTopic", consumerGroup = "cancel-group")
public class OrderCancelListener implements RocketMQListener<Order> {
@Override
public void onMessage(Order order) {
if (order.getStatus() == UNPAID) {
orderService.cancel(order.getId());
}
}
}
核心限制:
仅支持固定延迟级别(1s/5s/10s/30s/1m...2h)
最大延迟时间24小时
三、Kafka实现方案(大数据量场景)
1. 分桶策略+定时任务补偿
// 生产者按延迟时间分桶
public void sendDelayMessage(String orderId, long delayMinutes) {
String topic = "delay_" + delayMinutes + "m"; // 如delay_30m
kafkaTemplate.send(topic, orderId);
}
// 消费者处理超时订单
@KafkaListener(topics = "delay_30m")
public void handleDelayMessage(String orderId) {
Order order = orderService.getOrder(orderId);
if (order.getStatus() == UNPAID) {
orderService.cancel(orderId);
}
}
// 定时任务兜底(防消息丢失)
@Scheduled(cron = "0 0/5 * * * ?")
public void checkTimeoutOrders() {
List<Order> orders = orderRepo.findExpired(LocalDateTime.now().minusMinutes(30));
orders.forEach(order -> orderService.cancel(order.getId()));
}
优势:
支持日均百万级订单
通过分桶实现近似延迟(误差±1分钟)
四、技术选型建议
五、关键注意事项
幂等性设计
所有方案需保证关单操作的幂等性,可通过数据库状态机或Redis分布式锁实现
数据一致性
关单需同步释放预扣库存,建议采用本地事务表+Saga模式
监控指标
RabbitMQ:积压消息数(
rabbitmqctl list_queues messages_unacknowledged
)Kafka:消费者滞后量(
kafka-consumer-groups.sh --describe
)关单成功率(需埋点监控)
容灾设计
建议采用双写机制:同时写入Redis过期key和MQ,防止单点故障
三、CEP(Complex Event Processing, CEP)模式
1. CEP的核心概念
CEP是一种实时事件流处理技术,用于识别数据流中的特定事件模式。在订单超时场景中:
事件流:用户行为(如下单、支付)会被抽象为连续的事件流,例如:
创建订单事件 → 支付事件
。模式规则:通过CEP定义“订单创建后,若未在指定时间内检测到支付事件”的规则
时间窗口:设定一个时间阈值(如15分钟),作为判断超时的依据
使用框架:Flink的CEP库
2. 配置模式的具体含义
步骤1:定义事件流
将订单创建和支付行为建模为两个事件,例如:事件A:订单创建(状态为“待支付”)
事件B:支付完成(状态更新为“已支付”)
步骤2:设置时间约束
在CEP中,通过within(时间窗口)
方法限定事件B必须在事件A之后的指定时间内发生。步骤3:超时处理
当CEP引擎检测到超时事件时,执行取消订单、释放库存等操作3. 与定时任务的区别
传统方案可能通过定时扫描数据库或消息队列延迟消费实现超时检测,但CEP的优势在于:
实时性:基于事件驱动,无需轮询数据库,响应更快
。
精准性:通过时间窗口精确控制超时阈值,避免因系统延迟导致误判
。
灵活性:支持复杂规则(如多次失败登录检测),扩展性强
。
4. 应用场景示例
以某电商平台为例:
事件流:用户下单(事件A)→ 支付(事件B)。
CEP规则:若事件A后15分钟内无事件B,则标记订单为超时取消。
结果:系统自动释放库存,并通过短信通知用户
。
5. 数据支持与优化
根据实际数据统计,采用CEP可将未支付订单的超时检测效率提升30%以上,同时减少数据库查询压力
。建议结合提醒机制(如超时前10分钟推送通知)进一步提升用户支付率。
例子:
1. 事件实体定义(带自定义注解)
// 订单事件标记接口
public @interface OrderEvent {}
// 创建订单事件注解
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
@OrderEvent
public @interface OrderCreate {}
// 支付订单事件注解
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
@OrderEvent
public @interface OrderPay {}
// 订单事件实体
public class OrderEventEntity {
private Long orderId;
private String eventType; // "create" 或 "pay"
private Long timestamp;
// 构造函数、getter/setter省略
}
2. CEP模式配置(Flink实现)
// CEP模式定义(15分钟超时)
Pattern<OrderEventEntity, ?> pattern = Pattern
.<OrderEventEntity>begin("create")
.where(new SimpleCondition<OrderEventEntity>() {
@Override
public boolean filter(OrderEventEntity event) {
return "create".equals(event.getEventType());
}
})
.followedBy("pay")
.where(new SimpleCondition<OrderEventEntity>() {
@Override
public boolean filter(OrderEventEntity event) {
return "pay".equals(event.getEventType());
}
})
.within(Time.minutes(15));
// 超时处理(侧输出流)
OutputTag<OrderEventEntity> timeoutTag = new OutputTag<>("order-timeout"){};
PatternStream<OrderEventEntity> patternStream = CEP.pattern(
orderEventStream.keyBy(OrderEventEntity::getOrderId),
pattern
);
SingleOutputStreamOperator<String> resultStream = patternStream.select(
timeoutTag,
// 超时处理函数
(PatternTimeoutFunction<OrderEventEntity, String>) (patternMap, timestamp) -> {
OrderEventEntity createEvent = patternMap.get("create").iterator().next();
return "Order Timeout: " + createEvent.getOrderId();
},
// 正常支付处理函数
(PatternSelectFunction<OrderEventEntity, String>) patternMap -> {
OrderEventEntity payEvent = patternMap.get("pay").iterator().next();
return "Order Paid: " + payEvent.getOrderId();
}
);
// 获取超时流
DataStream<String> timeoutStream = resultStream.getSideOutput(timeoutTag);
3. 注解驱动的事件采集(Spring AOP实现)
@Aspect
@Component
public class OrderEventAspect {
private final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
@Around("@annotation(orderCreate)")
public Object logCreateEvent(ProceedingJoinPoint joinPoint, OrderCreate orderCreate)
throws Throwable {
Long orderId = (Long) joinPoint.getArgs()[0];
OrderEventEntity event = new OrderEventEntity(orderId, "create", System.currentTimeMillis());
env.addSource(new CollectionSource<>(Collections.singletonList(event))).print();
return joinPoint.proceed();
}
@Around("@annotation(orderPay)")
public Object logPayEvent(ProceedingJoinPoint joinPoint, OrderPay orderPay)
throws Throwable {
Long orderId = (Long) joinPoint.getArgs()[0];
OrderEventEntity event = new OrderEventEntity(orderId, "pay", System.currentTimeMillis());
env.addSource(new CollectionSource<>(Collections.singletonList(event))).print();
return joinPoint.proceed();
}
}
4. 超时补偿处理(定时任务)
@Scheduled(fixedRate = 60000) // 每分钟执行
public void checkOrderTimeout() {
List<Order> pendingOrders = orderRepository
.findByStatusAndCreateTimeBefore(
OrderStatus.CREATED,
LocalDateTime.now().minusMinutes(15)
);
pendingOrders.forEach(order -> {
order.setStatus(OrderStatus.CANCELED);
orderRepository.save(order);
log.warn("Order {} auto canceled by timeout", order.getId());
});
}
5. 处理流程说明
事件采集:通过
@OrderCreate
和@OrderPay
注解自动捕获业务操作事件CEP处理:实时检测15分钟内未支付的订单模式
超时处理:
实时流:通过Flink侧输出流立即触发超时逻辑
补偿机制:定时任务二次验证确保可靠性
状态更新:同时更新数据库状态和发送通知
6.关键实现技术对比
建议根据实际业务规模选择方案:高并发场景优先使用CEP实时处理,关键业务系统建议采用混合方案。
总结:CEP配置模式通过定义事件流和时间窗口,实现订单超时的实时检测与自动化处理,是一种高效且精准的技术方案