大纲
1.漏桶算法的实现对比
(1)普通思路的漏桶算法实现
(2)节省线程的漏桶算法实现
(3)Sentinel中的漏桶算法实现
(4)Sentinel中的漏桶算法与普通漏桶算法的区别
(5)Sentinel中的漏桶算法存在的问题
2.令牌桶算法的实现对比
(1)普通思路的令牌桶算法实现
(2)节省线程的令牌桶算法实现
(3)Guava中的令牌桶算法实现
(4)Sentinel中的令牌桶算法实现
(5)Sentinel中的令牌桶算法总结
1.漏桶算法的实现对比
(1)普通思路的漏桶算法实现
(2)节省线程的漏桶算法实现
(3)Sentinel中的漏桶算法实现
(4)Sentinel中的漏桶算法与普通漏桶算法的区别
(5)Sentinel中的漏桶算法存在的问题
(1)普通思路的漏桶算法实现
一.漏桶算法的处理流程
二.漏桶算法的主要特点
三.漏桶算法的优缺点
一.漏桶算法的处理流程
漏桶算法的核心思想是以固定速率流出。
步骤一:当新的请求到达时,会将新的请求放入缓冲区(请求队列)中,类似于往水桶里注水。
步骤二:系统会以固定的速度处理缓冲区中的请求,类似于水从窟窿中以固定的速度流出,比如开启一个后台线程定时以固定的速度从缓冲区中取出请求然后进行分发处理。
步骤三:如果缓冲区已满,则新的请求将被拒绝或丢弃,类似于水溢出。
二.漏桶算法的主要特点
特点一:固定速率
水从桶底的孔洞中以固定速率流出,类似于网络中以固定速率发送数据包。但写入速度不固定,也就是请求不是匀速产生的。相当于生产者生产消息不固定,消费者消费消息是匀速消费的。
特点二:有限容量
桶的容量有限,当桶满时,新到达的水会溢出,即拒绝超过容量的请求。
特点三:先进先出(FIFO)
水按照先进先出的顺序从桶中流出,类似于请求的处理顺序。
这种算法的一个重要特性是:无论请求的接收速率如何变化,请求的处理速率始终是稳定的,这就确保了系统的负载不会超过预设的阈值。但是由于请求的处理速率是固定的,所以无法处理突发流量。此外如果入口流量过大,漏桶可能会溢出,导致请求丢失。
三.漏桶算法的优缺点
优点一:平滑流量
由于以固定的速率处理请求,所以可以有效地平滑和整形流量,避免流量的突发和波动,类似于消息队列的削峰填谷的作用。
优点二:防止过载
当流入的请求超过桶的容量时,可以直接丢弃请求,防止系统过载。
缺点一:无法处理突发流量
由于漏桶的出口速度是固定的,无法处理突发流量。例如,即使在流量较小的时候,也无法以更快的速度处理请求。
缺点二:可能会丢失数据
如果入口流量过大,超过了桶的容量,那么就需要丢弃部分请求。在一些不能接受丢失请求的场景中,这可能是一个问题。
缺点三:不适合处理速率变化大的场景
如果处理速率变化大,或需要动态调整处理速率,则无法满足。
漏桶算法适用于需要以固定速率处理请求的场景。在多数业务场景中,其实并不需要按照严格的速率进行请求处理。而且多数业务场景都需要应对突发流量的能力,所以会使用令牌桶算法。
(2)节省线程的漏桶算法实现
漏桶算法可以通过延迟计算的方式来实现。延迟计算指的是不需要单独的线程来定时生成令牌或从漏桶中定时取请求,而是由调用限流器的线程自己计算是否有足够的令牌以及需要sleep的时间。延迟计算的方式可以节省一个线程资源。- public class LeakyBucketLimiter {
- //桶的最大容量
- public static long threshold = 10;
- //当前桶内的水量
- public static long count = 0;
- //漏水速率(每秒5次)
- public static long leakRate = 5;
- //上次漏水时间
- public static long lastLeakTime = System.currentTimeMillis();
- //限流方法,返回true表示通过
- public boolean canPass() {
- //调用漏水方法
- this.leak();
- //判断是否超过最大请求数量
- if (count < threshold) {
- count++;
- return true;
- }
- return false;
- }
- //漏水方法,计算并更新这段时间内漏水量
- private void leak() {
- //获取系统当前时间
- long currentTime = System.currentTimeMillis();
- //计算这段时间内,需要流出的水量
- long leakWater = (currentTime - lastLeakTime) * leakRate / 1000;
- count = Math.max(count - leakWater, 0);
- //更新最近一次的漏水时间
- lastLeakTime = currentTime;
- }
- }
复制代码 (3)Sentinel中的漏桶算法实现
在RateLimiterController的canPass()方法中,为了判断是否超出QPS阈值,通过原子类变量latestPassedTime简化成单线程让请求先后通过的处理模型。为了尽量让业务不受Sentinel影响,采用预估请求的被处理时间点的方式。也就是无需等前面的请求完全被处理完,才确定后面的请求被处理的时间。因为在普通的漏桶算法中,是处理完一个请求,才从漏桶取出水滴。而RateLimiterController的漏桶算法,则是假设请求已经被通过了。
具体的判断逻辑如下:首先获取系统的当前时间currentTime。然后计算在满足流控规则中限制的QPS阈值count的情况下,先后的两个请求被允许通过时的最小时间间隔costTime。接着计算当前请求最早的预期通过时间expectedTime,也就是此次请求预计会在几时几分几秒内通过。最后比较expectedTime和currentTime就可知当前请求是否允许通过了。
一.如果expectedTime小于等于currentTime
也就是当前请求最早的预期通过时间比系统当前时间小。如果在此时(currentTime)通过当前请求,则当前请求的通过时间就比它最早的预期通过时间(expectedTime)要晚,即当前请求和最近通过的请求的时间间隔变大了,所以此时不会超QPS阈值。于是返回true允许通过,同时更新最近允许请求通过的时间戳为当前时间。
二.如果expectedTime大于currentTime
也就是当前请求最早的预期通过时间比系统当前时间大。如果在此时(currentTime)通过当前请求,则当前请求的通过时间就比它最早的预期通过时间(expectedTime)要早,即当前请求和最近通过的请求的时间间隔变小了,比最小间隔时间costTime还小,所以此时必然会超QPS阈值。因此返回进行等待或者返回false不允许通过,等待的最小时间就是:最近通过请求的时间 + 先后两个请求允许通过时的最小间隔时间 - 当前时间。
需要注意:Sentinel流量控制的漏桶算法,只能限制在costTime内的流量激增,限制不了costTime外的流量激增。比如系统启动完一瞬间就涌入大量并发请求,此时的流量激增限制不了。又比如系统处理完正常流量的最后一个请求,隔了costTime+的时间后,突然涌入超QPS阈值的并发请求,此时也限制不了这种情况的流量激增。但如果系统处理完正常流量的最后一个请求,隔了costTime-的时间后,突然涌入超QPS阈值的并发请求,此时则可以限制这种情况的流量激增。
同时,为了避免等待的各个并发线程被同时唤醒,可以利用原子变量的addAndGet()方法 + 假设等待请求已被通过的方式,实现需要等待的并发请求进行睡眠等待的时间都不一样,从而实现并发请求排队等待的效果。
实现排队等待效果的核心逻辑:由于latestPassedTime的原子性,每个线程都会获得不一样的oldTime。接着根据oldTime - 当前时间,就可以得到每个线程需要睡眠等待的时间waitTime。此时的waitTime都将会不一样,从而避免并发线程同时被唤醒的情况。将latestPassedTime按costTime进行自增,其实相当于假设当前请求在不超过QPS阈值的情况下,被允许通过了。- public class RateLimiterController implements TrafficShapingController {
- //排队等待的意思是超出阈值后等待一段时间,maxQueueingTimeMs就是请求在队列中的最大等待时间
- private final int maxQueueingTimeMs;
- //流控规则中限制QPS的阈值,也就是QPS超出多少后会进行限制
- private final double count;
- //最近允许一个请求通过的时间,每次请求通过后就会更新此时间,可以根据该时间计算出当前请求最早的预期通过时间
- //注意:Sentinel是在业务前面的,尽量不要让业务受到Sentinel的影响,所以不需要等请求完全被处理完,才确定请求被通过的时间
- private final AtomicLong latestPassedTime = new AtomicLong(-1);
-
- public RateLimiterController(int timeOut, double count) {
- this.maxQueueingTimeMs = timeOut;
- this.count = count;
- }
-
- @Override
- public boolean canPass(Node node, int acquireCount) {
- return canPass(node, acquireCount, false);
- }
-
- @Override
- public boolean canPass(Node node, int acquireCount, boolean prioritized) {
- //acquireCount代表每次从桶底流出多少个请求
- //如果acquireCount小于等于0,则表示无需限流直接通过,不过acquireCount一般默认是1
- if (acquireCount <= 0) {
- return true;
- }
- //如果限流规则的count(即限制QPS的阈值)小于等于0,则直接拒绝,相当于一个请求也不能放行
- if (count <= 0) {
- return false;
- }
- //1.首先获取系统的当前时间
- long currentTime = TimeUtil.currentTimeMillis();
- //2.然后计算,在满足流控规则中限制的QPS阈值count的情况下,先后的两个请求被允许通过时的最小间隔时间(假设请求是单线程处理的)
- long costTime = Math.round(1.0 * (acquireCount) / count * 1000);
- //3.接着计算当前请求最早的预期通过时间 = 满足QPS阈值下的两个请求的最小时间间隔 + 上次请求的通过时间
- long expectedTime = costTime + latestPassedTime.get();
- //4.最后判断当前请求最早的预期通过时间是否比系统当前时间小
- if (expectedTime <= currentTime) {//等价于没有超出QPS阈值
- //当前请求最早的预期通过时间比系统当前时间小
- //如果在此时(currentTime)通过当前请求,那么当前请求的实际通过时间就比它最早的预期通过时间(expectedTime)要晚
- //也就是当前请求和最近通过的请求的时间间隔变大了,所以此时不会超QPS阈值,返回true允许通过
- //由这里可知,latestPassedTime并不会影响costTime,也就是说,多个线程可以并发执行到这里而不受阈值的影响
- //这意味着,Sentinel流量控制的漏桶算法,只能限制在costTime时间内的流量激增,限制不了costTime时间外的流量激增
- //比如系统启动完的那一瞬间就涌入超出QPS阈值的并发请求,此时的这种流量激增是限制不了的;
- //又比如系统正常运行时处理完了正常流量的最后一个请求,隔了costTime+的时间后,突然涌入超出QPS阈值的并发请求,此时也限制不了;
- //只能限制住这样的一种情况:系统正常运行处理完正常流量的最后一个请求,隔了costTime-的时间,突然涌入超出QPS阈值的并发请求
- latestPassedTime.set(currentTime);
- return true;
- } else {
- //如果不是,即当前请求最早的预期通过时间比系统当前时间大
- //则说明latestPassedTime.get()大了,也就是上一个可能由于QPS超出阈值的原因导致请求处理慢了,所以需要进行等待
- //计算当前请求的等待时间,用于判断是否超出流控规则设置的最大等待时间
- long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis();
- if (waitTime > maxQueueingTimeMs) {
- //如果超出最大等待时间,则直接返回false
- return false;
- } else {//等价于超出了QPS阈值
- //当前请求最早的预期通过时间比系统当前时间大
- //如果在此时(currentTime)通过当前请求,那么当前请求的实际通过时间就比它最早的预期通过时间(expectedTime)要早
- //也就是当前请求和最近通过的请求的时间间隔变小了,比最小间隔时间costTime还小
- //所以此时必然会超QPS阈值,因此返回进行等待或者返回false不允许通过
- //而等待的最小时间,就是最近通过请求的时间 + 先后两个请求允许通过时的最小间隔时间 - 当前时间
-
- //首先通过latestPassedTime这个原子变量的addAndGet()方法
- //将最近通过请求的时间latestPassedTime,加上先后两次请求需要的最小间隔时间costTime,得到当前请求本来预期的通过时间
- //注意:
- //当多个并发线程执行到此处时,由于latestPassedTime的原子性,每个线程都会获得不一样的oldTime
- //接着根据oldTime - 当前时间,就可以得到每个线程需要睡眠等待的时间waitTime
- //此时的waitTime都将会不一样,从而避免并发线程同时被唤醒的情况
- //将latestPassedTime进行自增,其实相当于假设当前请求在不超过QPS阈值的情况下,被允许通过了
- long oldTime = latestPassedTime.addAndGet(costTime);
- try {
- //然后计算当前请求需要等待多久 = 当前请求最早的预期通过时间 - 当前系统时间
- waitTime = oldTime - TimeUtil.currentTimeMillis();
- //如果等待时间大于流控规则设置的最大等待时间,则需要回滚刚才更新的最近通过请求的时间
- //也就是将latestPassedTime减去costTime,然后返回false表示请求无法通过
- if (waitTime > maxQueueingTimeMs) {
- //如果发现新计算的等待时间 大于 最大等待时间,则需要回滚latestPassedTime
- latestPassedTime.addAndGet(-costTime);
- return false;
- }
- //in race condition waitTime may <= 0
- if (waitTime > 0) {
- //当前请求需要进行等待
- Thread.sleep(waitTime);
- }
- return true;
- } catch (InterruptedException e) {
- }
- }
- }
- return false;
- }
- }
复制代码 (3)Guava中的令牌桶算法实现
一.SmoothBursty的初始化
二.SmoothBursty的acquire()方法
三.SmoothWarmingUp的初始化
四.SmoothWarmingUp的acquire()方法
SmoothBursty和SmoothWarmingUp这两种限流器都使用了预支令牌的思路来实现,就是当前线程获取令牌的代价(阻塞时间)需要由下一个线程来支付。这样可以减少线程阻塞的概率,因为下一个请求不确定什么时候才来。如果下一个请求很久才来,那么这段时间产生的新令牌已满足下一个线程的需求,这样就不用阻塞了。
一.SmoothBursty的初始化
RateLimiter不保存上一次请求的时间,但是它保存下一次请求期望到达的时间。如果下一个请求的预期到达时间实际上已经过去了,并且假设下次请求期望到达的时间点是past,现在的时间点是now。那么now - past的这段时间表示RateLimiter没有被使用,所以在这段空闲的时间内RateLimiter就会增加storedPermits的数量。- long costTime = Math.round(1.0 * (acquireCount) / count * 1000);
- long costTime = Math.round(1.0 * (1) / 1100 * 1000)约等于0.9ms;
复制代码 二.SmoothBursty的acquire()方法
- public class TokenBucketLimiter {
- //桶的最大容量
- public static long threshold = 10;
- //当前桶内的令牌数
- public static long count = 0;
- //令牌生成速率(每秒5次)
- public static long tokenRate = 5;
- //上次生成令牌的时间
- public static long lastRefillTime = System.currentTimeMillis();
- //限流方法,返回true表示通过
- public boolean canPass() {
- //调用生成令牌方法
- this.refillTokens();
- //判断桶内是否还有令牌
- if (count > 0) {
- count--;
- return true;
- }
- return false;
- }
-
- //生成令牌方法,计算并更新这段时间内生成的令牌数量
- private void refillTokens() {
- long currentTime = System.currentTimeMillis();
- //计算这段时间内,需要生成的令牌数量
- long refillTokens = (currentTime - lastRefillTime) * tokenRate / 1000;
- //更新桶内的令牌数
- count = Math.min(count + refillTokens, threshold);
- //更新令牌生成时间
- lastRefillTime = currentTime;
- }
- }
复制代码 四.SmoothWarmingUp的acquire()方法
[code]@Beta@GwtIncompatible@SuppressWarnings("GoodTime")public abstract class RateLimiter { ... //无限等待的获取 //Acquires the given number of permits from this RateLimiter, //blocking until the request can be granted. //Tells the amount of time slept, if any. //@param permits the number of permits to acquire,获取的令牌数量 //@return time spent sleeping to enforce rate, in seconds; 0.0 if not rate-limited @CanIgnoreReturnValue public double acquire(int permits) { //调用RateLimiter.reserve()方法 //预支令牌并获取需要阻塞的时间:即预定数量为permits的令牌数,并返回需要等待的时间 long microsToWait = reserve(permits); //将需要等待的时间补齐, 从而满足限流的需求,即根据microsToWait来让线程sleep(共性) stopwatch.sleepMicrosUninterruptibly(microsToWait); //返回这次调用使用了多少时间给调用者 return 1.0 * microsToWait / SECONDS.toMicros(1L); } //Reserves the given number of permits from this RateLimiter for future use, //returning the number of microseconds until the reservation can be consumed. //从这个RateLimiter限速器中保留给定数量的令牌,以备将来使用,返回可以使用保留前的微秒数 //@return time in microseconds to wait until the resource can be acquired, never negative final long reserve(int permits) { checkPermits(permits); //由于涉及并发操作,所以必须使用synchronized进行互斥处理 synchronized (mutex()) { //调用RateLimiter.reserveAndGetWaitLength()方法 return reserveAndGetWaitLength(permits, stopwatch.readMicros()); } } //Reserves next ticket and returns the wait time that the caller must wait for. //预定下一个ticket,并且返回需要等待的时间 final long reserveAndGetWaitLength(int permits, long nowMicros) { //调用SmoothRateLimiter.reserveEarliestAvailable()方法 long momentAvailable = reserveEarliestAvailable(permits, nowMicros); return max(momentAvailable - nowMicros, 0); } //Reserves the requested number of permits and returns the time that those permits can be used (with one caveat). //保留请求数量的令牌,并返回可以使用这些令牌的时间(有一个警告) //生产令牌、获取令牌、计算阻塞时间的具体细节由子类来实现 //@return the time that the permits may be used, or, if the permits may be used immediately, an arbitrary past or present time abstract long reserveEarliestAvailable(int permits, long nowMicros); ...}@GwtIncompatibleabstract class SmoothRateLimiter extends RateLimiter { //The currently stored permits. //令牌桶中当前缓存的未消耗的令牌数 double storedPermits; //The maximum number of stored permits. //令牌桶中允许存放的最大令牌数 double maxPermits; //The interval between two unit requests, at our stable rate. //E.g., a stable rate of 5 permits per second has a stable interval of 200ms. //按照我们稳定的速率,两个单位请求之间的时间间隔;例如,每秒5个令牌的稳定速率具有200ms的稳定间隔 double stableIntervalMicros; //The time when the next request (no matter its size) will be granted. //After granting a request, this is pushed further in the future. Large requests push this further than small requests. //下一个请求(无论大小)将被批准的时间. 在批准请求后,这将在未来进一步推进,大请求比小请求更能推动这一进程. private long nextFreeTicketMicros = 0L;//could be either in the past or future ... @Override final long reserveEarliestAvailable(int requiredPermits, long nowMicros) { //1.根据nextFreeTicketMicros计算新产生的令牌数,更新当前未使用的令牌数storedPermits //获取令牌时调用SmoothRateLimiter.resync()方法与初始化时的调用不一样. //此时会把"没有过期"的令牌存储起来. //但是如果计数时间nextFreeTicketMicros是在未来. 那就不做任何处理. resync(nowMicros); //下一个请求(无论大小)将被批准的时间 long returnValue = nextFreeTicketMicros; //2.计算需要阻塞等待的时间 //2.1.先从桶中取未消耗的令牌,如果桶中令牌数不足,看最多能取多少个 //存储的令牌可供消费的数量 double storedPermitsToSpend = min(requiredPermits, this.storedPermits); //2.2.计算是否需要等待新鲜的令牌(当桶中现有的令牌数不足时就需要等待新鲜的令牌),如果需要,则计算需要等待的令牌数 //需要等待的令牌:新鲜的令牌 double freshPermits = requiredPermits - storedPermitsToSpend; //计算需要等待的时间 //分两部分计算:waitMicros = 从桶中获取storedPermitsToSpend个现有令牌的代价 + 等待生成freshPermits个新鲜令牌的代价 //从桶中取storedPermitsToSpend个现有令牌也是有代价的,storedPermitsToWaitTime()方法是个抽象方法,会由SmoothBursty和SmoothWarmingUp实现 //对于SmoothBursty来说,storedPermitsToWaitTime()会返回0,表示已经存储的令牌不需要等待. //而生成新鲜令牌需要等待的代价是:新鲜令牌的个数freshPermits * 每个令牌的耗时stableIntervalMicros long waitMicros = storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend) + (long) (freshPermits * stableIntervalMicros); //3.更新nextFreeTicketMicros //由于新鲜的令牌可能已被预消费,所以nextFreeTicketMicros就得往后移,以表示这段时间被预消费了 this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros); //4.扣减令牌数,更新桶内剩余令牌 //最后把上面计算的可扣减的令牌数量从存储的令牌里减掉 this.storedPermits -= storedPermitsToSpend; //返回请求需要等待的时间 //需要注意returnValue被赋值的是上次的nextFreeTicketMicros,说明当前这次请求获取令牌的代价由下一个请求去支付 return returnValue; } //Updates storedPermits and nextFreeTicketMicros based on the current time. //根据当前时间,更新storedPermits和nextFreeTicketMicros变量 //计算nextFreeTicketMicros到当前时间内新产生的令牌数,这个就是延迟计算 void resync(long nowMicros) { //if nextFreeTicket is in the past, resync to now //一般当前的时间是大于下个请求被批准的时间 //此时:会把过去的时间换成令牌数存储起来,注意存储的令牌数不能大于最大的令牌数 //当RateLimiter初始化好后,可能刚开始没有流量,或者是一段时间没有流量后突然来了流量 //此时可以往"后"预存储一秒时间的令牌数. 也就是这里所说的burst能力 //如果nextFreeTicketMicros在未来的一个时间点,那这个if判断便不满足 //此时,不需要进行更新storedPermits和nextFreeTicketMicros变量 //此种情况发生在:"预借"了令牌的时候 if (nowMicros > nextFreeTicketMicros) { //时间差除以生成一个新鲜令牌的耗时,coolDownIntervalMicros()是抽象方法,由子类实现 double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros(); //更新令牌桶内已存储的令牌个数,注意不超过最大限制 storedPermits = min(maxPermits, storedPermits + newPermits); //更新nextFreeTicketMicros为当前时间 nextFreeTicketMicros = nowMicros; } } //Translates a specified portion of our currently stored permits which we want to spend/acquire, into a throttling time. //Conceptually, this evaluates the integral of the underlying function we use, for the range of [(storedPermits - permitsToTake), storedPermits]. //This always holds: 0 |