百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术教程 > 正文

如何实现延迟队列(JDK/mysql/redis/Rabbit)

csdh11 2025-03-12 13:40 2 浏览

何为延迟队列

队列,即先进先出的数据结构,就和食堂打饭一样,排在最前面的先打饭,打完饭就走;延迟队列即队列中的元素相比以往多了一个属性特征:延迟。延迟队列中的每个元素都指定了延迟时间,表示该元素到达指定时间之后将出队进行处理。其实从上述定义来看,与其说是延迟队列,不如说它是一个以时间为权重最小堆结构

那么延迟队列有什么用呢?我们生活中其实平时接触到很多可以使用延迟队列来解决的例子:

  • 订单超时30分钟未付款将自动关闭
  • 会议系统中,会议开始前10分钟,发送会议提醒
  • 夏天晚上时,我们经常会给空调设置指定时长的时间,到时空调自动关闭
  • 再比如微波炉、烤箱、等等

可以发现延迟队列想要实现的功能其实就是一个定时任务调度的一种。

延迟队列实现方式

延迟队列实现的方式有很多种,具体采用哪种去实现,和我们的业务背景、业务诉求都息息相关,不同的实现方式都有其适用的应用场景,我这里将延迟队列分为两类:单机延迟队列分布式延迟队列

1. 单机实现

JDK 提供了DelayedQueue可以实现延迟队列的目的。其类图如下:

可以看到DelayedQueue是一个阻塞队列,其队列中的元素必须实现Delayed接口:

public interface Delayed extends Comparable {
    long getDelay(TimeUnit unit);
}

其中getDelay返回代表该元素的一个在队列中可存在的时间,通过这种方式来实现元素的延迟弹出。接下来看订单超时30秒将自动关闭的实际例子:

public class JDKDelayQueueTest {
    private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

    private static final DelayQueue DELAY_QUEUE = new DelayQueue<>();

    private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(2);

    public static void main(String[] args) throws Exception {

        EXECUTOR_SERVICE.submit(() -> {
            while (true) {
                if (!DELAY_QUEUE.isEmpty()) {
                    Order order = DELAY_QUEUE.poll();
                    if (order != null) {
                        System.out.println(order.getOrderId() + " 超时关闭与:" + FORMATTER.format(LocalDateTime.now()));
                    }

                }
                TimeUnit.MILLISECONDS.sleep(1000);
            }
        });
        EXECUTOR_SERVICE.submit(() -> {
            try {
                DELAY_QUEUE.add(new Order("黄焖鸡订单"));
                TimeUnit.SECONDS.sleep(5);
                DELAY_QUEUE.add(new Order("麻辣香锅订单"));
                TimeUnit.SECONDS.sleep(10);
                DELAY_QUEUE.add(new Order("石锅拌饭订单"));
            } catch (Exception e) {

            }

        });

    }

    public static class Order implements Delayed {

        private final LocalDateTime expireTime;
        private final String orderId;

        public Order(String orderId) {
            this.expireTime = LocalDateTime.now().plusSeconds(30);
            this.orderId = orderId;
            System.out.println(orderId + " 创建于:" + FORMATTER.format(LocalDateTime.now()));
        }

        @Override
        public long getDelay(TimeUnit unit) {
            return LocalDateTime.now().isAfter(expireTime) ? -1 : 1;
        }

        @Override
        public int compareTo(Delayed targetOrder) {
            // 谁的过期时间最早谁就排最前面
            return this.expireTime.isBefore(((Order) targetOrder).getExpireTime()) ? -1 : 1;
        }

        public String getOrderId() {
            return orderId;
        }

        public LocalDateTime getExpireTime() {
            return expireTime;
        }
    }
}

输出:

黄焖鸡订单 创建于:2021-08-21 18:26:30
麻辣香锅订单 创建于:2021-08-21 18:26:35
石锅拌饭订单 创建于:2021-08-21 18:26:45
黄焖鸡订单 超时关闭与:2021-08-21 18:27:00
麻辣香锅订单 超时关闭与:2021-08-21 18:27:05
石锅拌饭订单 超时关闭与:2021-08-21 18:27:15

DelayQueue实现方式小结

这种方式的优点就是实现简单,不复杂,但是其缺点也比较多:不具备可扩展性内存限制无持久化机制,数据容易丢失。


分布式实现

2. 数据库轮询

数据库论询的方式相对而言也比较好理解,后台启动定时任务每隔一段时间扫描指定的数据库表每一行数据,获取出到达指定延迟时间的行进行处理,所以使用该方式重要的就三个要素:

1)捞取任务
扫描数据库的后台任务,可以使用分布式任务去扫,比如A任务扫描limit 0,100满足条件的数据行,B任务扫描limit 100,200满足条件的数据行

2)执行任务
一般来说讲究分工协作,第一步中的分布式线程任务专门用来捞取任务,那么捞取到的任务可以再次扔给另外专门用户处理任务的线程中

3)数据库表设计
可以在表中增加一个字段来表示延迟时间,比如针对上面的订单超时30秒关闭,我们可以增加一个字段timeout,可以是此时间的毫秒数来记录订单的超时时间,那么此时我们的SQL就可以是:

select * from order where ${now} >= timeout limit ${start},100;

数据库轮询实现方式小结

采用这种方式可以看到首先我们需要查询数据库,那么查询数据库就意味着存在查询耗时,那么可能最终导致的就是实时性不高,但是它的优点在于天生满足任务持久化机制,不用担心延迟任务丢失。

3.通过Redis实现

Redis的五大数据类型中的zset数据类型中,包含一个称为score的属性,该数据类型中所有元素都会按照score进行排序,所以如果将score作为我们的延迟时间的时间戳,那么我们可以通过命令Zrangebyscore来获取满足条件的数据,然后交给我们的任务处理线程去处理,其实整个实现思想和数据库轮循是一样的,只不过数据存储结构由数据库转变成了redis,准确来说redis也是数据库,只不过不同的存储结构带来的影响就是适用场景的不同罢了。

那么如果通过Redis来实现延迟队列,大概会有如下几步:

1) 增加任务

zadd tasks ${过期时间戳} ${任务相关数据}

2)捞取任务

ZRANGEBYSCORE tasks -inf ${当前时间戳} WITHSCORES

捞取过期时间早于当前时间的这部分任务

3)执行任务 接下来就是执行,这个就没什么好说的了

关于redis zset数据结构以及命令可以看这里:
https://www.runoob.com/redis/redis-sorted-sets.html

一些优化点

1.在添加延迟任务时,可以通过对任务id进行hash分散至多个redis key,可以避免所有任务存储在一个key中导致大key从而影响元素的添加和查找性能

2.每个key独自拥有一个线程处理

3.每个key的线程只负责拉取需要处理的数据,然后再转发至消息队列中,不做任何其他处理,可以提升处理速度,消息消费者可扩展性好,性能不够,机器来凑

redis实现方式小结

redis因为其都是内存中操作,所以查询插入速度和mysql来比都是非常快的,所以实时性会比mysql高,虽然redis也能满足任务数据的持久化,但是无法保证任务不丢失,所以这里持久性会比mysql稍弱一点


4.使用消息队列

我们可以采用rabbitMQ的延时队列。RabbitMQ具有以下两个特性,可以实现延迟队列

  • RabbitMQ可以针对Queue和Message设置 x-message-tt,来控制消息的生存时间,如果超时,则消息变为dead letter
  • lRabbitMQ的Queue可以配置x-dead-letter-exchange 和x-dead-letter-routing-key(可选)两个参数,用来控制队列内出现了deadletter,则按照这两个参数重新路由。结合以上两个特性,就可以模拟出延迟消息的功能.

Time-To-Live Extensions

RabbitMQ允许我们为消息或者队列设置TTL(time to live),也就是过期时间。TTL表明了一条消息可在队列中存活的最大时间,单位为毫秒。也就是说,当某条消息被设置了TTL或者当某条消息进入了设置了TTL的队列时,这条消息会在经过TTL秒后“死亡”,成为Dead Letter。如果既配置了消息的TTL,又配置了队列的TTL,那么较小的那个值会被取用。更多资料请查阅官方文档。

Dead Letter Exchange

刚才提到了,被设置了TTL的消息在过期后会成为Dead Letter。其实在RabbitMQ中,一共有三种消息的“死亡”形式:

  1. 消息被拒绝。通过调用basic.reject或者basic.nack并且设置的requeue参数为false。
  2. 消息因为设置了TTL而过期。
  3. 消息进入了一条已经达到最大长度的队列。

如果队列设置了Dead Letter Exchange(DLX),那么这些Dead Letter就会被重新publish到Dead Letter Exchange,通过Dead Letter Exchange路由到其他队列。

不同实现方式的对比

实现方式

复杂度

数据量

持久化,数据丢失

扩展性

实时性

jdk DelayQueue

简单

由于程序内存限制,适用于少数据量

无持久化

mysql 轮询

稍微复杂

可支持大数据量

可保证持久化,保证任务不丢失

可扩展

由于查询开销,稍弱

redis zet

稍微复杂

可支持大数据量

可尽量保证持久化,不保证任务不丢失

可扩展

RabbitMQ

稍微复杂

可支持大数据量

可保证持久化,保证任务不丢失

可扩展

结语

除了以上实现方式,还有其他比如通过Rabbit MQTTL死信队列来实现:每一个消息带有TTL属性,该TTL即延迟任务的延迟时间,只要超过指定时间没被消费,此消息将被转至死信队列中,我们可以监听死信队列消费消息进而达到延迟任务的目的;还有时间轮转算法等,时间有限,日后再学,日后再讲。

相关推荐

linux下的定时或计时操作(gettimeofday等的用法秒\微秒\纳秒

一、用select()函数实现非阻塞时的等待时间,用到结构体structtimeval{},这里就不多说了。二、用gettimeofday()可获得...

guava限流器RateLimiter使用简介(Springboot实现)

在大型分布式系统中,限流是一种重要的防护机制,可以帮助我们控制流量并减轻系统的负担。Google的Guava库提供了一种方便的限流器实现,可以帮助我们轻松地实现限流功能。本文将介绍Guava中限流器的...

Mybatis配置文件XML全貌详解,再不懂我也没招了

一、为什么要使用配置文件试想,如果没有配置文件,我们的应用程序将只能沿着固定的姿态运行,几乎不能做任何动态的调整,那么这不是一套完美的设计,因为我们希望拥有更宽更灵活的操作空间和更多的兼容度,同时也能...

五分钟搞懂分布式流控算法原理和滑动窗口设计实现

流控的使用场景保护系统稳定性:...

实时数据处理的性能瓶颈:如何用Flink实现毫秒级延迟?

在大数据实时处理领域,ApacheFlink凭借其低延迟、高吞吐的特性,成为了许多企业的首选框架。然而,在实际生产环境中,实现毫秒级延迟仍然是一个巨大的挑战。本文将从性能优化的角度出发,详细讲解如...

Java时间类介绍:Date的使用

在Java中用来表示日期的类有很多,最早使用的应该是Date类。Date类大概有两个,分别是:java.util.Date和java.sql.Date。前者对应我们日常生活中常用的日期类,而后者则表示...

不会用Redis做分布式流水号?

引言最近做项目,需要做单据编号,格式固定为:单据类型固定前缀+年月日时间戳+4位流水号,要求是每个单据类型的流水号唯一,方便后续业务使用。之前项目中使用的是UUID作为其他业务的单据编号,和组长沟通了...

用rabbitmq实现消息重发的功能

前言:在开发工作中,有很多时候会遇到要把数据同步给其他部门或三方的场景,这个时候光写一个同步接口是不太稳定的,因为有很多因素会导致同步接口运行失败或未运行,比如调接口之前的代码出现了bug,异常被th...

如何实现延迟队列(JDK/mysql/redis/Rabbit)

何为延迟队列队列,即先进先出的数据结构,就和食堂打饭一样,排在最前面的先打饭,打完饭就走;延迟队列即队列中的元素相比以往多了一个属性特征:延迟...

面试突击40:线程休眠的方法有几种?

在Java中,让线程休眠的方法有很多,这些方法大致可以分为两类,一类是设置时间,在一段时间后自动唤醒,而另一个类是提供了一对休眠和唤醒的方法,在线程休眠之后,可以在任意时间对线程进行唤醒。PS:休...

让java日期和时间处理变得更简单

时间和日期处理是Java编程中不可或缺的一部分。Java提供了许多内置的日期和时间类,可以用来处理日期和时间数据。但是,由于时间和日期的复杂性,处理时间和日期数据可能会变得非常棘手。在本文中,我...

Java如何获取当前日期?

Talkischeap,Showmethecode.--by:LinusTorvalds使用获取当时日期的方式如下:System.currentTimeMillis();获取当时...

流水号设计及Leaf的升级使用

一、需求背景由于业务需要对数据和消息等进行唯一的标识。需要进行唯一流水号的设计。...

糟糕,被SimpleDateFormat坑到啦!

1.问题背景问题的背景是这样的,在最近需求开发中遇到需要将给定目标数据通过某一固定的计量规则进行过滤并打标生成明细数据...

JDK8新特性解析:深入比较LocalDateTime和Date之间的区别

JDK8引入了java.time包,其中包含了LocalDateTime类,它与旧的Date类在表示日期和时间方面有着显著的区别。下面是对这两者之间区别的详细解释:...