百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术教程 > 正文

实现延迟队列,这些你知道吗?

csdh11 2025-03-12 13:40 1 浏览

何为延迟队列

队列,即先进先出的数据结构,就和食堂打饭一样,排在最前面的先打饭,打完饭就走;延迟队列即队列中的元素相比以往多了一个属性特征:延迟。延迟队列中的每个元素都指定了延迟时间,表示该元素到达指定时间之后将出队进行处理。其实从上述定义来看,与其说是延迟队列,不如说它是一个以时间为权重最小堆结构

那么延迟队列有什么用呢?我们生活中其实平时接触到很多可以使用延迟队列来解决的例子:

  • 订单超时30分钟未付款将自动关闭
  • 会议系统中,会议开始前10分钟,发送会议提醒
  • 夏天晚上时,我们经常会给空调设置指定时长的时间,到时空调自动关闭
  • 再比如微波炉、烤箱、等等

可以发现延迟队列想要实现的功能其实就是一个定时任务调度的一种。

延迟队列实现方式

延迟队列实现的方式有很多种,具体采用哪种去实现,和我们的业务背景、业务诉求都息息相关,不同的实现方式都有其适用的应用场景,我这里将延迟队列分为两类:单机延迟队列分布式延迟队列

单机实现

JDK 提供了DelayedQueue可以实现延迟队列的目的。其类图如下:

可以看到DelayedQueue是一个阻塞队列,其队列中的元素必须实现Delayed接口:

public interface Delayed extends Comparable {
    long getDelay(TimeUnit unit);
}
复制代码

其中getDelay返回代表该元素的一个在队列中可存在的时间,通过这种方式来实现元素的延迟弹出。接下来看订单超时30秒将自动关闭的实际例子:

public class JDKDelayQueueTest {
    private static final DateTimeFormatter FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

    private static final DelayQueue DELAY_QUEUE = new DelayQueue<>();

    private static final ExecutorService EXECUTOR_SERVICE = Executors.newFixedThreadPool(2);

    public static void main(String[] args) throws Exception {

        EXECUTOR_SERVICE.submit(() -> {
            while (true) {
                if (!DELAY_QUEUE.isEmpty()) {
                    Order order = DELAY_QUEUE.poll();
                    if (order != null) {
                        System.out.println(order.getOrderId() + " 超时关闭与:" + FORMATTER.format(LocalDateTime.now()));
                    }

                }
                TimeUnit.MILLISECONDS.sleep(1000);
            }
        });
        EXECUTOR_SERVICE.submit(() -> {
            try {
                DELAY_QUEUE.add(new Order("黄焖鸡订单"));
                TimeUnit.SECONDS.sleep(5);
                DELAY_QUEUE.add(new Order("麻辣香锅订单"));
                TimeUnit.SECONDS.sleep(10);
                DELAY_QUEUE.add(new Order("石锅拌饭订单"));
            } catch (Exception e) {

            }

        });

    }

    public static class Order implements Delayed {

        private final LocalDateTime expireTime;
        private final String orderId;

        public Order(String orderId) {
            this.expireTime = LocalDateTime.now().plusSeconds(30);
            this.orderId = orderId;
            System.out.println(orderId + " 创建于:" + FORMATTER.format(LocalDateTime.now()));
        }

        @Override
        public long getDelay(TimeUnit unit) {
            return LocalDateTime.now().isAfter(expireTime) ? -1 : 1;
        }

        @Override
        public int compareTo(Delayed targetOrder) {
            // 谁的过期时间最早谁就排最前面
            return this.expireTime.isBefore(((Order) targetOrder).getExpireTime()) ? -1 : 1;
        }

        public String getOrderId() {
            return orderId;
        }

        public LocalDateTime getExpireTime() {
            return expireTime;
        }
    }
}
复制代码

输出:

黄焖鸡订单 创建于:2021-08-21 18:26:30
麻辣香锅订单 创建于:2021-08-21 18:26:35
石锅拌饭订单 创建于:2021-08-21 18:26:45
黄焖鸡订单 超时关闭与:2021-08-21 18:27:00
麻辣香锅订单 超时关闭与:2021-08-21 18:27:05
石锅拌饭订单 超时关闭与:2021-08-21 18:27:15
复制代码

DelayQueue实现方式小结

这种方式的优点就是实现简单,不复杂,但是其缺点也比较多:不具备可扩展性内存限制无持久化机制,数据容易丢失。

分布式实现

数据库轮询

数据库论询的方式相对而言也比较好理解,后台启动定时任务每隔一段时间扫描指定的数据库表每一行数据,获取出到达指定延迟时间的行进行处理,所以使用该方式重要的就三个要素:

1)捞取任务
扫描数据库的后台任务,可以使用分布式任务去扫,比如A任务扫描limit 0,100满足条件的数据行,B任务扫描limit 100,200满足条件的数据行

2)执行任务
一般来说讲究分工协作,第一步中的分布式线程任务专门用来捞取任务,那么捞取到的任务可以再次扔给另外专门用户处理任务的线程中

3)数据库表设计
可以在表中增加一个字段来表示延迟时间,比如针对上面的订单超时30秒关闭,我们可以增加一个字段timeout,可以是此时间的毫秒数来记录订单的超时时间,那么此时我们的SQL就可以是:

select * from order where ${now} >= timeout limit ${start},100;
复制代码

数据库轮询实现方式小结

采用这种方式可以看到首先我们需要查询数据库,那么查询数据库就意味着存在查询耗时,那么可能最终导致的就是实时性不高,但是它的优点在于天生满足任务持久化机制,不用担心延迟任务丢失。

通过Redis实现

Redis的五大数据类型中的zset数据类型中,包含一个称为score的属性,该数据类型中所有元素都会按照score进行排序,所以如果将score作为我们的延迟时间的时间戳,那么我们可以通过命令Zrangebyscore来获取满足条件的数据,然后交给我们的任务处理线程去处理,其实整个实现思想和数据库轮循是一样的,只不过数据存储结构由数据库转变成了redis,准确来说redis也是数据库,只不过不同的存储结构带来的影响就是适用场景的不同罢了。

那么如果通过Redis来实现延迟队列,大概会有如下几步:

1) 增加任务

zadd tasks ${过期时间戳} ${任务相关数据}
复制代码

2)捞取任务

ZRANGEBYSCORE tasks -inf ${当前时间戳} WITHSCORES
复制代码

捞取过期时间早于当前时间的这部分任务

3)执行任务
接下来就是执行,这个就没什么好说的了

关于redis zset数据结构以及命令可以看这里:
www.runoob.com/redis/redis…

一些优化点

1.在添加延迟任务时,可以通过对任务id进行hash分散至多个redis key,可以避免所有任务存储在一个key中导致大key从而影响元素的添加和查找性能

2.每个key独自拥有一个线程处理

3.每个key的线程只负责拉取需要处理的数据,然后再转发至消息队列中,不做任何其他处理,可以提升处理速度,消息消费者可扩展性好,性能不够,机器来凑

redis实现方式小结

redis因为其都是内存中操作,所以查询插入速度和mysql来比都是非常快的,所以实时性会比mysql高,虽然redis也能满足任务数据的持久化,但是无法保证任务不丢失,所以这里持久性会比mysql稍弱一点

不同实现方式的对比

实现方式

复杂度

数据量

持久化,数据丢失

扩展性

实时性

jdk DelayQueue

简单

由于程序内存限制,适用于少数据量

无持久化

mysql 轮询

稍微复杂

可支持大数据量

可保证持久化,保证任务不丢失

可扩展

由于查询开销,稍弱

redis zet

稍微复杂

可支持大数据量

可尽量保证持久化,不保证任务不丢失

可扩展

结语

除了以上实现方式,还有其他比如通过Rabbit MQTTL死信队列来实现:每一个消息带有TTL属性,该TTL即延迟任务的延迟时间,只要超过指定时间没被消费,此消息将被转至死信队列中,我们可以监听死信队列消费消息进而达到延迟任务的目的;还有时间轮转算法等,时间有限,日后再学,日后再讲。


作者:我有一只喵喵
链接:
https://juejin.cn/post/6998849679353970701

来源:掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

相关推荐

最具标志性的电影服饰

在1933年的电影《晚宴》中,一袭白色斜裁缎质礼服和一头染成银灰色的卷发将珍·哈露变成了片中孤独骄傲的凯蒂·帕卡德(KittyPackard)。这条白色礼服风靡一时,与它类似的款式统称为”珍·哈露式...

呼吸系统及其他相关疾病英文词汇

allergic/alls:rd3ik/a.过敏的同根alergyn.辻敏症allergenn.变应原(引起过敏反应的物质)...

itch 一周游戏汇:8月19日-8月25日(上)

...

python实现微信群消息自动转发简明教程

基本思路是,用Python模拟微信网页版登陆,接收到群里消息后,对文本、图片、分享等各类消息类型分别处理,并转发至其他群。主要源头是使用itchat,itchat是一个开源的微信个人号接口,pyt...

python实践——如何批量向好友发送消息,亲测有效!

今天为大家介绍如何像指定好友或微信群批量,一共两个Python模块,第一个是itchat,...

用python分析你的朋友圈,很好玩~

设计喵的内心OS:我一脸懵逼点进来,一脸懵逼走出去,你说了什么?事情是这么来的,我看到有朋友在做朋友圈的电子书,也想自己研究一下,发现原来有个itchart这个微信第三方API可以读取微信数据,抱着好...

探索Python技术在电脑端自动化中:从群发消息到智能朋友圈秒赞

一、技术革新与基础库的安装首先,确保Python环境已如诗如画般配置完毕,随后安装那些将赋予我们魔力的第三方库:...

使用Python实现微信电脑端自动化:群发消息、朋友圈点赞与秒赞

一、技术准备与基础库安装首先,确保Python环境已正确配置,随后安装必要的第三方库:...

Python 查看微信撤回的消息(完整代码)

看到了一个基于python的微信开源库:itchat,玩了一天,做了一个程序,把私聊撤回的信息可以收集起来并发送到个人微信的文件传输助手,包括:...

用 Python 玩转微信就是这么简单

前言wxpy在itchat的基础上,通过大量接口优化提升了模块的易用性,并进行丰富的功能扩展。用来干啥一些常见的场景控制路由器、智能家居等具有开放接口的玩意儿...

itch 一周游戏汇:2月17日-2月23日(下)

...

Python开源项目合集(第三方平台)

wechat-python-sdk-wechat-python-sdk微信公众平台Python开发包http://wechat-python-sdk.readthedocs.org/,非官方...

78行Python代码帮你复现微信撤回消息!

来源:悟空智能科技本文约700字,建议阅读5分钟。本文基于python的微信开源库itchat,教你如何收集私聊撤回的信息。...

女朋友老喜欢撤回消息?看我如何利用Python识破她的心理

一、pipInstallitchat既然都用python解决问题了,第一步当然是明确用什么库啦,直接执行pipinstallitchat:...

打游戏老是被女朋友骚扰怎么办?教你用Python智能回复消息

Python中itchat模块对于操作微信来说是个福音,今天,我们就用Python实现微信自动回复,同时调用图灵机器人,实现智能回复。具体代码如下:#导入...