首页 热点 业界 科技快讯 数码 电子消费 通信 前沿动态 电商

springboot~kafka中延时消息的实现

2023-08-23 04:59:59 来源 : 博客园


(资料图片)

应用场景

  • 用户下单5分钟后,给他发短信
  • 用户下单30分钟后,如果用户不付款就自动取消订单

kafka无死信队列

kafka本身没有这种延时队列的机制,像rabbitmq有自己的死信队列,当一些消息在一定时间不消费时会发到死信队列,由死信队列来处理它们,上面的两个需求如果是rabbitmq可以通过死信队列实现的。

kafka有生产者拦截器

通过对生产者拦截器实现一个TTL的检查,然后再通过类似netty里的延时队列组件来实现消息的延时发送,发到咱们的死信队列里

  • ProducerInterceptorTTL源码
public class ProducerInterceptorTTL implements ProducerInterceptor, ApplicationContextAware {// 消息延时,单位秒public static String TTL = "ttl";// 死信队列,延时后发送到的队列,我们称为死信队列public static String DEAD_TOPIC = "dead_topic";// 静态化的上下文,用于获取bean,因为ConsumerInterceptor是通过反射创建的,所以无法通过注入的方式获取beanprivate static ApplicationContext applicationContext;// 时间轮,用于延时发送消息private static LindTimeWheel timeWheel = new LindTimeWheel(1000, 8);@Overridepublic ProducerRecord onSend(ProducerRecord record) {final String topic = record.topic();final Integer partition = record.partition();final Integer key = record.key();final String value = record.value();final Long timestamp = record.timestamp();final Headers headers = record.headers();long ttl = -1;String deadTopic = null;for (Header header : headers) {if (header.key().equals(TTL)) {ttl = toLong(header.value());}if (header.key().equals(DEAD_TOPIC)) {deadTopic = new String(header.value());}}// 消息超时判定if (deadTopic != null && ttl > 0) {// 可以放在死信队列中String finalDeadTopic = deadTopic;long finalTtl = ttl * 1000;timeWheel.addTask(() -> {System.out.println("消息超时了," + finalTtl + "需要发到topic:" + record.key());KafkaTemplate kafkaTemplate = applicationContext.getBean(KafkaTemplate.class);kafkaTemplate.send(finalDeadTopic, record.value());}, finalTtl);}// 拦截器拦下来之后改变原来的消息内容ProducerRecord newRecord = new ProducerRecord(topic, partition, timestamp,key, value, headers);// 传递新的消息return newRecord;}@Overridepublic void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {}@Overridepublic void close() {}@Overridepublic void configure(Map map) {}@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {this.applicationContext = applicationContext;}}
  • 注册拦截器
spring:  kafka:    producer:      properties:        interceptor.classes: com.ruoyi.lawyer.delay.ProducerInterceptorTTL
  • 延时消息在某个时间段之后会送出

标签:

相关文章

最近更新
( )地回答 2023-08-23 03:43:49
董芸(关于董芸简述) 2023-08-23 03:41:38
三星930qcg(三星939d) 2023-08-23 03:38:13
董群(关于董群简述) 2023-08-23 03:36:49