百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术教程 > 正文

五分钟搞懂分布式流控算法原理和滑动窗口设计实现

csdh11 2025-03-12 13:41 2 浏览

流控的使用场景

  • 保护系统稳定性: 流控算法可以限制系统的请求流量,防止突发的大流量请求导致系统资源耗尽,从而保护系统的稳定性,避免系统崩溃或性能下降。
  • 避免资源竞争: 在高并发的情况下,如果不进行流控,多个请求可能会竞争有限的资源(如数据库连接、线程池等),导致资源竞争和资源耗尽,进而影响系统的响应时间和可用性。
  • 防止恶意攻击: 流控算法可以限制来自单个IP地址或用户的过多请求,防止恶意攻击和滥用系统资源,提高系统的安全性。
  • 提高服务质量: 通过合理的流控策略,可以确保系统能够正常处理合理范围内的请求,从而提高服务的质量和稳定性,减少服务的不可用或延迟现象。

固定窗口算法

图片来自互联网,如有侵权请联系作者删除

设计原理

维护一个单位时间内的计数值,每当一个请求通过时,就将计数值加1,当计数值超过预先设定的阈值时,就拒绝单位时间内的其他请求

问题

假设我们设定1秒内允许通过的请求阈值是99,如果有用户在时间窗口的最后几毫秒发送了99个请求,紧接着又在下一个时间窗口开始时发送了99个请求,那么这个用户其实在一秒内成功请求了198次,显然超过了阈值但并不会被限流(会有突刺问题)

滑动窗口算法

图片来自互联网,如有侵权请联系作者删除

设计原理

假设我们设定1秒内允许通过的请求是200个,但是在这里我们需要把1秒的时间分成多格,假设分成5格(格数越多,流量过渡越平滑),每格窗口的时间大小是200毫秒,每过200毫秒,就将窗口向前移动一格

问题

流量的过渡是否平滑依赖于我们设置的窗口格数也就是统计时间间隔,格数越多,统计越精确,但是具体要分多少呢?

小总结: 固定窗口 和 滑动窗口 解决:单位时间总流量

漏斗算法

图片来自互联网,如有侵权请联系作者删除


设计原理

漏斗算法以一个常量限制了出口流量速率,因此漏斗算法可以应对平滑突发的流量。其中漏斗作为流量容器我们可以看做一个FIFO的队列,当入口流量速率大于出口流量速率时,因为流量容器是有限的,当超出流量容器大小时,超出的流量会被丢弃

优点

  1. 可以平滑限制请求的处理速度,避免瞬间请求过多导致系统崩溃或者雪崩
  2. 可以控制请求的处理速度,使得系统可以适应不同的流量需求,避免过载或者过度闲置。
  3. 可以通过调整桶的大小和漏出速率来满足不同的限流需求,可以灵活地适应不同的场景。

问题

因为漏桶算法限制了流出速率是一个固定常量值,所以漏桶算法不支持出现突发流出流量。但是在实际情况下,流量往往是突发的。

令牌桶算法

图片来自互联网,如有侵权请联系作者删除

设计原理

以恒定速率往令牌桶里加入令牌,令牌桶被装满时,多余的令牌会被丢 弃。当请求到来时,会先尝试从令牌桶获取令牌(相当于从令牌桶移除一个令牌),获取成功则请求被放行,获取失败则阻塞或直接拒绝请求

算法实现

目前来说滑动窗口的实现分为两种【环形和线性】,环形的代表是Sentinel的LeapArray,线性的是EasyRetry自研的SlidingWindow,下面分别介绍这两种的设计与实现

LeapArray【Sentinel】

核心字段

javaprotected int windowLengthInMs;  // 窗口长度 (intervalInMs / sampleCount)
protected int sampleCount;  // 总窗口间隔
protected int intervalInMs; // 总窗口时间 单位毫秒
private double intervalInSecond; // 总窗口时间 单位秒

核心算法

计算起始时间【windowStart】 => 利用求余运算,保证区间内桶的开始时间是一致的

javaprotected long calculateWindowStart(long timeMillis) {
  return timeMillis - timeMillis % windowLengthInMs;
}

计算下标 【timeIdx】 => 利用除法取整和求余运算,保证区间内桶的下标位是一致的

javapublic int calculateTimeIdx(long timeMillis) {
  long timeId = timeMillis / windowLengthInMs;
  return (int) (timeId % array.length());
}

获取窗口对象

java// 根据给定的时间戳计算出在存储桶数组中的索引位置。存储桶数组是一个循环数组,存储了不同时间窗口的计数桶。
int idx = calculateTimeIdx(timeMillis);

// 根据给定的时间戳计算出对应时间窗口的开始时间。
long windowStart = calculateWindowStart(timeMillis);

while (true) {
    // 获取环形数组中对象下标的窗口对象
    WindowWrap old = array.get(idx);
    if (old == null) {
        // 不存在则创建窗口
        WindowWrap window = new WindowWrap(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
        // 通过CAS(Compare-And-Swap)原子操作来确保只有一个线程能够成功插入
        if (array.compareAndSet(idx, null, window)) {
           // 如果插入成功,返回新创建的计数桶;
            return window;
        } else {
        //  如果插入失败,线程将让出时间片,等待下一次循环。
            Thread.yield();
        }
    } else if (windowStart == old.windowStart()) {
    // 如果找到的计数桶的开始时间与给定时间戳的开始时间一致,说明该计数桶是最新的,并且当前时间戳处于这个时间窗口内。直接返回找到的计数桶。
        return old;
    } else if (windowStart > old.windowStart()) {
// 如果找到的计数桶的开始时间早于给定时间戳的开始时间,说明该计数桶已过时(不再使用)。
// 在这种情况下,算法会尝试获取一个更新锁(updateLock)来对这个过时的计数桶进行重置
//如果成功获取锁,就会调用 `resetWindowTo` 方法来重置过时的计数桶,并返回重置后的计数桶。如果获取锁失败,线程将让出时间片,等待下一次循环。
        if (updateLock.tryLock()) {
            try {
                return resetWindowTo(old, windowStart);
            } finally {
                updateLock.unlock();
            }
        } else {
            Thread.yield();
        }
    } else if (windowStart < old.windowStart()) {
    // 如果找到的计数桶的开始时间晚于给定时间戳的开始时间,这是一个不应该出现的情况,代码中并没有详细处理这种情况,只是返回一个新创建的空计数桶。
        return new WindowWrap(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
    }
}

完整代码实现:
https://github.com/alibaba/Sentinel/blob/v1.8.0/sentinel-core/src/main/java/com/alibaba/csp/sentinel/slots/statistic/base/LeapArray.java

SlidingWindow[【EasyRetry】]https://www.easyretry.com/)

核心字段

java/**  
* 这是一个用于存储数据的 TreeMap,其中的键为窗口期的开始时间(LocalDateTime 类型),
* 值为一个并发链表队列(ConcurrentLinkedQueue),用于存储在该窗口期内的数据。  
*/  
public final TreeMap<LocalDateTime, ConcurrentLinkedQueue> saveData = new TreeMap<>();  
  
/**  
* 总量窗口期阈值,指定一个窗口期内数据的最大数量。
*/  
private final Integer totalThreshold;  
  
/**  
* 开启的窗口数据预警阈值,当存活的窗口数量过多时会进行预警。
*/  
private final Integer windowTotalThreshold;  
  
/**  
* 一个监听器列表,用于处理达到窗口期阈值时的操作。
*/  
private final List<Listener> listeners;

/**  
* 新增窗口锁  
*/  
private static final ReentrantLock SAVE_LOCK = new ReentrantLock();  
  
/**  
* 到达时间窗口期或者总量窗口期锁  
*/  
private static final ReentrantLock NOTICE_LOCK = new ReentrantLock();

新增数据&&开启新窗口

javapublic void add(T data) {

    LocalDateTime now = LocalDateTime.now();
    if (isOpenNewWindow(now)) {

        SAVE_LOCK.lock();
        LocalDateTime windowPeriod = now.plus(duration, chronoUnit);
        try {

            // 防止开启两个间隔时间小于窗口期的窗口
            if (isOpenNewWindow(now)) {
                ConcurrentLinkedQueue list = new ConcurrentLinkedQueue<>();
                list.add(data);

                LogUtils
                    .info(log, "添加新数据 [{}] [{}] size:[{}]", windowPeriod, Thread.currentThread().getName(), list.size());
                saveData.put(windowPeriod, list);

                // 扫描n-1个窗口,是否过期,过期则删除
                removeInvalidWindow();

                // 超过窗口阈值预警
                alarmWindowTotal();

            } else {
                oldWindowAdd(data);
            }

        } finally {
            SAVE_LOCK.unlock();
        }

    } else {
        oldWindowAdd(data);
    }

}

往已存在的窗口期内添加数据

java    private void oldWindowAdd(T data) {

        LocalDateTime windowPeriod = getNewWindowPeriod();
        // 添加数据
        ConcurrentLinkedQueue list = saveData.get(windowPeriod);
        list.add(data);

        // 到达总量窗口期,将数据传递给监听器进行处理。
        if (list.size() >= totalThreshold) {
            doHandlerListener(windowPeriod);
        }

    }

处理到达窗口期的数据监听器

javaprivate void doHandlerListener(LocalDateTime windowPeriod) {

    NOTICE_LOCK.lock();

    try {

        ConcurrentLinkedQueue list = saveData.get(windowPeriod);
        if (CollectionUtils.isEmpty(list)) {
            return;
        }

        // 深拷贝
        ConcurrentLinkedQueue deepCopy = new ConcurrentLinkedQueue<>(list);
        clear(windowPeriod, deepCopy);

        if (CollectionUtils.isEmpty(deepCopy)) {
            return;
        }

        for (Listener listener : listeners) {
            // 用户自定义实现具体的数据处理逻辑
            listener.handler(new ArrayList<>(deepCopy));
        }

    } catch (Exception e) {
        log.error("到达总量窗口期通知异常", e);
    } finally {
        NOTICE_LOCK.unlock();
    }

}

删除过期窗口

扫描n-1个窗口,是否过期,过期则删除 过期条件为窗口期内无数据

java    private void removeInvalidWindow() {

        for (int i = 0; i < saveData.size() - 1; i++) {
            Map.Entry<LocalDateTime, ConcurrentLinkedQueue> firstEntry = saveData.firstEntry();
            if (CollectionUtils.isEmpty(firstEntry.getValue())) {
                saveData.remove(firstEntry.getKey());
            }
        }
    }

完成代码实现:
https://gitee.com/aizuda/easy-retry/blob/master/easy-retry-client-core/src/main/java/com/aizuda/easy/retry/client/core/window/SlidingWindow.java

项目实战

本篇文章多次提到EasyRetry,想必小伙伴们都很好奇,下面简单介绍一下EasyRetry

EasyRetry是致力提高分布式业务系统一致性的分布式重试平台,它提供了控制台任务观测、可配置的重试策略、重试后执行回调以及丰富地告警配置等功能。通过这些手段,可以对异常数据进行全面监测和回放,从而在确保系统高可用性的同时,大大提升数据的一致性。详细了解

EasyRetry为什么使用滑动窗口?

场景一 客户端批量上报异常数据

EasyRetry作为高性能的分布式重试平台,从设计之初就充分考虑了重试数据上报的集中性和异步特点.使用滑动窗口批量上报异常数据可以减少网络传输的频率,从而降低网络开销。滑动窗口可以将一定时间内的异常数据进行缓冲和合并,然后一次性发送,减少了频繁的网络通信。

具体使用案例

java// 滑动窗口的参数配置
SlidingWindowConfig slidingWindowConfig = easyRetryProperties.getSlidingWindow();  
  
slidingWindow = SlidingWindow  
    .Builder  
    .newBuilder()  
    .withTotalThreshold(slidingWindowConfig.getTotalThreshold())  
    .withWindowTotalThreshold(slidingWindowConfig.getWindowTotalThreshold())  
    .withDuration(slidingWindowConfig.getDuration(), slidingWindowConfig.getChronoUnit())  
    .withListener(new ReportListener())  
.build();  
  
slidingWindow.start();

场景二 重试流量的统计(计划在后期版本实现)

通过LeapArray对重试流量进行统计,EasyRetry将进一步实现重试风暴的管控,从而确保服务的稳定性并提升服务端的质量。

有兴趣的小伙伴可以关注EasyRetry

相关推荐

linux下的定时或计时操作(gettimeofday等的用法秒\微秒\纳秒

一、用select()函数实现非阻塞时的等待时间,用到结构体structtimeval{},这里就不多说了。二、用gettimeofday()可获得...

guava限流器RateLimiter使用简介(Springboot实现)

在大型分布式系统中,限流是一种重要的防护机制,可以帮助我们控制流量并减轻系统的负担。Google的Guava库提供了一种方便的限流器实现,可以帮助我们轻松地实现限流功能。本文将介绍Guava中限流器的...

Mybatis配置文件XML全貌详解,再不懂我也没招了

一、为什么要使用配置文件试想,如果没有配置文件,我们的应用程序将只能沿着固定的姿态运行,几乎不能做任何动态的调整,那么这不是一套完美的设计,因为我们希望拥有更宽更灵活的操作空间和更多的兼容度,同时也能...

五分钟搞懂分布式流控算法原理和滑动窗口设计实现

流控的使用场景保护系统稳定性:...

实时数据处理的性能瓶颈:如何用Flink实现毫秒级延迟?

在大数据实时处理领域,ApacheFlink凭借其低延迟、高吞吐的特性,成为了许多企业的首选框架。然而,在实际生产环境中,实现毫秒级延迟仍然是一个巨大的挑战。本文将从性能优化的角度出发,详细讲解如...

Java时间类介绍:Date的使用

在Java中用来表示日期的类有很多,最早使用的应该是Date类。Date类大概有两个,分别是:java.util.Date和java.sql.Date。前者对应我们日常生活中常用的日期类,而后者则表示...

不会用Redis做分布式流水号?

引言最近做项目,需要做单据编号,格式固定为:单据类型固定前缀+年月日时间戳+4位流水号,要求是每个单据类型的流水号唯一,方便后续业务使用。之前项目中使用的是UUID作为其他业务的单据编号,和组长沟通了...

用rabbitmq实现消息重发的功能

前言:在开发工作中,有很多时候会遇到要把数据同步给其他部门或三方的场景,这个时候光写一个同步接口是不太稳定的,因为有很多因素会导致同步接口运行失败或未运行,比如调接口之前的代码出现了bug,异常被th...

如何实现延迟队列(JDK/mysql/redis/Rabbit)

何为延迟队列队列,即先进先出的数据结构,就和食堂打饭一样,排在最前面的先打饭,打完饭就走;延迟队列即队列中的元素相比以往多了一个属性特征:延迟...

面试突击40:线程休眠的方法有几种?

在Java中,让线程休眠的方法有很多,这些方法大致可以分为两类,一类是设置时间,在一段时间后自动唤醒,而另一个类是提供了一对休眠和唤醒的方法,在线程休眠之后,可以在任意时间对线程进行唤醒。PS:休...

让java日期和时间处理变得更简单

时间和日期处理是Java编程中不可或缺的一部分。Java提供了许多内置的日期和时间类,可以用来处理日期和时间数据。但是,由于时间和日期的复杂性,处理时间和日期数据可能会变得非常棘手。在本文中,我...

Java如何获取当前日期?

Talkischeap,Showmethecode.--by:LinusTorvalds使用获取当时日期的方式如下:System.currentTimeMillis();获取当时...

流水号设计及Leaf的升级使用

一、需求背景由于业务需要对数据和消息等进行唯一的标识。需要进行唯一流水号的设计。...

糟糕,被SimpleDateFormat坑到啦!

1.问题背景问题的背景是这样的,在最近需求开发中遇到需要将给定目标数据通过某一固定的计量规则进行过滤并打标生成明细数据...

JDK8新特性解析:深入比较LocalDateTime和Date之间的区别

JDK8引入了java.time包,其中包含了LocalDateTime类,它与旧的Date类在表示日期和时间方面有着显著的区别。下面是对这两者之间区别的详细解释:...