大纲
1.关于限流的概述
2.高并发下的四大限流算法原理及实现
3.Sentinel使用的设计模式总结
1.关于限流的概述
保护高并发系统的三把利器:缓存、降级和限流。限流就是通过限制请求的流量以达到保护系统的目的,比如秒杀抢购。具体就是对并发请求进行限速,或对一个时间窗口内的的请求进行限速,一旦达到限制速率就会拒绝服务或进行流量整形。
常用的限流方式:
一.限制总请求数
如数据库连接池、线程池。
二.限制瞬时并发数
如Nginx的LimitConn模块,可以用来限制瞬时并发连接数。如Java的Semaphore也可以用来限制瞬时并发数。如果需要限制方法被调用的并发数不能超过100(同一时间并发数),则可以使用信号量Semaphore来实现。
三.限制时间窗口内的平均速率
如Guava的RateLimiter、Nginx的LimitReq模块,就可以用来限制每秒的请求速率。如果需要限制方法在一段时间内平均被调用次数不超过100,则可以使用RateLimiter来实现。Guava的RateLimiter只能用于单机的限流,如果想要集群限流,则需要引入Redis或者阿里开源的Sentinel中间件。
四.限制远程接口的调用速率
五.限制MQ的消费速率
六.根据网络、CPU或内存负载等来限流
2.高并发下的四大限流算法原理及实现
(1)固定窗口计数法
(2)滑动窗口计数法
(3)漏桶算法
(4)令牌桶算法
(5)四种限流算法的对比总结
(1)固定窗口计数法
一.实现原理
在一个固定长度的时间窗口内限制请求数量。每来一个请求,请求数加一,如果请求数超过最大限制,就拒绝该请求。
二.算法流程
三.算法存在的问题
问题一:限流不够平滑
例如设置的是每秒限流3个请求,第一毫秒就发送3个请求,达到限流。那么窗口剩余时间的请求都将会被拒绝,这样用户体验就不好。
问题二:存在突发流量的问题
由于在进行窗口切换时,当前窗口的访问总数会立即置为0,所以可能会导致流量突发的问题。
四.算法的代码实现- //注意:下面的实现并没有考虑并发的情况
- public class FixWindowLimiter {
- //窗口的大小,1000ms
- public static long windowUnit = 1000;
- //窗口的最大请求数
- public static long threshold = 10;
- //当前窗口内的请求数
- public static long count = 0;
- //当前窗口的开始时间
- public static long lastTime = 0;
- //限流方法,返回true表示通过
- public boolean canPass() {
- //获取当前时间
- long currentTime = System.currentTimeMillis();
- //判断当前时间与窗口的开始时间的时间差,是否大于窗口的大小
- if (currentTime - lastTime > windowUnit) {
- //当前窗口的请求数设置为0
- count = 0;
- //重置当前窗口的开始时间为当前时间
- lastTime = currentTime;
- }
- //判断当前窗口的请求数是否超过窗口的最大请求数
- if (count < threshold) {
- count++;
- return true;
- }
- return false;
- }
- }
复制代码 (2)滑动窗口计数法
为解决固定窗口计数法潜在的流量突发问题,可使用滑动窗口计数法。
一.实现原理
在滑动窗口算法中,窗口的开始时间是动态的,窗口大小是固定的。每来一个请求,就向后推一个时间窗口,计算该窗口内的请求数量。如果请求数超过限制就拒绝请求,否则处理请求 + 记录请求的时间戳。另外还需要一个任务清理过期的时间戳,滑动窗口没有划分固定的时间窗起点与终点。
二.算法存在的问题
虽然解决了流量突发的问题,但限流依然不够平滑。例如设置的是每秒限流3个请求,第一毫秒就发送3个请求,达到限流。那么窗口剩余时间的请求都将会被拒绝,这样用户体验就不好。
三.算法的代码实现- public class SlidingWindowLimiter {
- //每个窗口的最大请求数量
- public static long threshold = 10;
- //窗口大小,1000ms
- public static long windowUnit = 1000;
- //请求集合,用来存储窗口内的请求数量
- public static List<Long> requestList = new ArrayList<>();
- //限流方法,返回true表示通过
- public boolean canPass() {
- //获取当前时间
- long currentTime = System.currentTimeMillis();
- //统计当前时间对应的窗口,收到的请求的数量
- int sizeOfValid = this.sizeOfValid(currentTime);
- //判断请求数是否超过窗口的最大请求数量
- if (sizeOfValid < threshold) {
- //把当前请求添加到请求集合里
- requestList.add(currentTime);
- return true;
- }
- return false;
- }
- //统计当前时间对应的窗口的请求数
- private int sizeOfValid(long currentTime) {
- int sizeOfValid = 0;
- for (Long requestTime : requestList) {
- //判断是否在当前时间窗口内
- if (currentTime - requestTime <= windowUnit) {
- sizeOfValid++;
- }
- }
- return sizeOfValid;
- }
- //清理过期的请求:单独启动一个线程处理
- private void clean() {
- //判断是否超出当前时间窗口
- requestList.removeIf(requestTime -> System.currentTimeMillis() - requestTime > windowUnit);
- }
- }
复制代码 (3)漏桶算法
它是一种流量整形(Traffic Shaping)和流量控制(Traffic Policing)的算法,它可以有效地控制流量的处理速率以及防止网络拥塞。
一.实现原理
首先,一个固定容量的漏桶,按照固定速率流出水(处理请求)。然后,当流入水的速度过大会直接溢出(请求数量超过限制则直接拒绝)。最后,漏桶里的水不够则无法流出水(漏桶内没有请求则不处理)。
当请求流量正常或者较小时,请求能够得到正常的处理。当请求流量过大时,漏桶算法可通过丢弃部分请求来防止系统过载。
这种算法的一个重要特性是:无论请求的接收速率如何变化,请求的处理速率始终是稳定的,这就确保了系统的负载不会超过预设的阈值。但是由于请求的处理速率是固定的,所以无法处理突发流量。此外如果入口流量过大,漏桶可能会溢出,导致请求丢失。
二.算法的优缺点
优点一:平滑流量
由于以固定的速率处理请求,所以可以有效地平滑和整形流量,避免流量的突发和波动,类似于消息队列的削峰填谷的作用。
优点二:防止过载
当流入的请求超过桶的容量时,可以直接丢弃请求,防止系统过载。
缺点一:无法处理突发流量
由于漏桶的出口速度是固定的,无法处理突发流量。例如,即使在流量较小的时候,也无法以更快的速度处理请求。
缺点二:可能会丢失数据
如果入口流量过大,超过了桶的容量,那么就需要丢弃部分请求。在一些不能接受丢失请求的场景中,这可能是一个问题。
缺点三:不适合处理速率变化大的场景
如果处理速率变化大,或需要动态调整处理速率,则无法满足。
三.算法的代码实现- public class LeakyBucketLimiter {
- //桶的最大容量
- public static long threshold = 10;
- //当前桶内的水量
- public static long count = 0;
- //漏水速率(每秒5次)
- public static long leakRate = 5;
- //上次漏水时间
- public static long lastLeakTime = System.currentTimeMillis();
- //限流方法,返回true表示通过
- public boolean canPass() {
- //调用漏水方法
- this.leak();
- //判断是否超过最大请求数量
- if (count < threshold) {
- count++;
- return true;
- }
- return false;
- }
- //漏水方法,计算并更新这段时间内漏水量
- private void leak() {
- //获取系统当前时间
- long currentTime = System.currentTimeMillis();
- //计算这段时间内,需要流出的水量
- long leakWater = (currentTime - lastLeakTime) * leakRate / 1000;
- count = Math.max(count - leakWater, 0);
- //更新最近一次的漏水时间
- lastLeakTime = currentTime;
- }
- }
复制代码 (4)令牌桶算法
令牌桶限流算法也是一种常用的流量整形和限制请求处理速率的算法。
一.实现原理
首先,系统会以固定的速率向桶中添加令牌。然后,当有请求到来时,会尝试从桶中移除一个令牌。如果桶中有足够的令牌,则请求可以被处理。如果桶中没有令牌,那么请求将被拒绝。此外,桶中的令牌数不能超过桶的容量。如果新生成的令牌超过了桶的容量,那么新的令牌会被丢弃。
令牌桶算法的一个重要特性是,它能够处理突发流量。当桶中有足够的令牌时,可以一次性处理多个请求,这对于需要处理突发流量的应用场景非常有用。但是又不会无限制的增加处理速率导致压垮服务器,因为桶内令牌数量是有限制的。
二.算法的优缺点
优点一:可以处理突发流量
令牌桶算法可以处理突发流量。当桶满时,能够以最大速度处理请求。这对于需要处理突发流量的应用场景非常有用。
优点二:限制请求处理的平均速率
在长期运行中,请求的处理速率会被限制在预定义的平均速率下,也就是生成令牌的速率。
优点三:灵活性
与漏桶算法相比,令牌桶算法提供了更大的灵活性。例如,可以动态地调整生成令牌的速率。
缺点一:可能导致过载
如果令牌产生速度过快,可能会导致大量突发流量,使网络或服务过载。
缺点二:需要存储空间
令牌桶需要一定的存储空间来保存令牌,可能会导致内存资源的浪费。
三.算法的代码实现- public class TokenBucketLimiter {
- //桶的最大容量
- public static long threshold = 10;
- //当前桶内的令牌数
- public static long count = 0;
- //令牌生成速率(每秒5次)
- public static long tokenRate = 5;
- //上次生成令牌的时间
- public static long lastRefillTime = System.currentTimeMillis();
- //限流方法,返回true表示通过
- public boolean canPass() {
- //调用生成令牌方法
- this.refillTokens();
- //判断桶内是否还有令牌
- if (count > 0) {
- count--;
- return true;
- }
- return false;
- }
- //生成令牌方法,计算并更新这段时间内生成的令牌数量
- private void refillTokens() {
- long currentTime = System.currentTimeMillis();
- //计算这段时间内,需要生成的令牌数量
- long refillTokens = (currentTime - lastRefillTime) * tokenRate / 1000;
- //更新桶内的令牌数
- count = Math.min(count + refillTokens, threshold);
- //更新令牌生成时间
- lastRefillTime = currentTime;
- }
- }
复制代码 (5)四种限流算法的对比总结
一.四种算法的优缺点
固定窗口算法实现简单,但是限流不够平滑,存在突发流量的问题,适用于需要简单实现限流的场景。
滑动窗口算法虽然解决了突发流量的问题,但是还是存在限流不够平滑的问题,所以它适用于需要控制平均请求速率的场景。
漏桶算法的优点是流量处理更平滑,但是无法应对突发流量,适用于需要平滑流量的场景。
令牌桶算法既能平滑流量,又能处理突发流量,适用于需要处理突发流量的场景。
二.令牌桶算法和漏桶算法的对比总结
令牌桶算法就是以固定速率生成令牌放入桶中。每个请求都需要从桶中获取令牌,没有获取到令牌的请求会被阻塞限流。当令牌消耗速度小于生成速度时,令牌桶内就会预存这些未消耗的令牌。当有突发流量进来时,可以直接从桶中取出令牌,而不会被限流。
漏桶算法就是将请求放入桶中,然后以固定的速率从桶中取出请求来处理。当桶中等待的请求数超过桶的容量后,后续的请求就不再加入桶中。
漏桶算法适用于需要以固定速率处理请求的场景。在多数业务场景中,其实并不需要按照严格的速率进行请求处理。而且多数业务场景都需要应对突发流量的能力,所以会使用令牌桶。
但不管是令牌桶算法还是漏桶算法,都可以通过延迟计算的方式来实现。延迟计算指的是不需要单独的线程来定时生成令牌或从漏桶中定时取请求,而是由调用限流器的线程自己来计算是否有足够的令牌以及需要sleep的时间。使用延迟计算的方式,可以节省一个线程资源。
3.Sentinel使用的设计模式总结
(1)责任链模式
(2)监听器模式
(3)适配器模式
(4)模版方法模式
(5)策略模式
(6)观察者模式
(1)责任链模式
一.责任链接口ProcessorSlot
二.责任链接口的抽象实现类
三.责任链的构建
Sentinel的功能都是靠一条链式的ProcessorSlot来完成的,这些ProcessorSlot的初始化以及调用便使用了责任链模式。
一.责任链接口ProcessorSlot
entry()方法相当于AOP的before()方法,也就是入口方法,因此责任链执行时会调用entry()方法。
exit()方法相当于AOP的after()方法,也就是出口方法,因此责任链执行结束时会调用exit()方法。
fireEntry()方法相当于AOP在执行完before()方法后调用pjp.proceed()方法,也就是调用责任链上的下一个节点的entry()方法。
fireExit()方法相当于AOP在执行完exit()方法后调用pjp.proceed()方法,也就是调用责任链上的下一个节点的exit()方法。- //A container of some process and ways of notification when the process is finished.
- public interface ProcessorSlot<T> {
- //Entrance of this slot.
- //@param context current Context
- //@param resourceWrapper current resource
- //@param param generics parameter, usually is a com.alibaba.csp.sentinel.node.Node
- //@param count tokens needed
- //@param prioritized whether the entry is prioritized
- //@param args parameters of the original call
- //@throws Throwable blocked exception or unexpected error
- void entry(Context context, ResourceWrapper resourceWrapper, T param, int count, boolean prioritized, Object... args) throws Throwable;
- //Means finish of #entry(Context, ResourceWrapper, Object, int, boolean, Object...).
- //@param context current Context
- //@param resourceWrapper current resource
- //@param obj relevant object (e.g. Node)
- //@param count tokens needed
- //@param prioritized whether the entry is prioritized
- //@param args parameters of the original call
- //@throws Throwable blocked exception or unexpected error
- void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args) throws Throwable;
- //Exit of this slot.
- //@param context current Context
- //@param resourceWrapper current resource
- //@param count tokens needed
- //@param args parameters of the original call
- void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args);
- //Means finish of #exit(Context, ResourceWrapper, int, Object...).
- //@param context current Context
- //@param resourceWrapper current resource
- //@param count tokens needed
- //@param args parameters of the original call
- void fireExit(Context context, ResourceWrapper resourceWrapper, int count, Object... args);
- }
复制代码 二.责任链接口的抽象实现类- public abstract class AbstractLinkedProcessorSlot<T> implements ProcessorSlot<T> {
- //下一个节点,这里的责任链是一个单向链表,因此next就是当前节点所指向的下一个节点
- private AbstractLinkedProcessorSlot<?> next = null;
- //触发执行责任链下一个节点的entry()方法
- @Override
- public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args) throws Throwable {
- if (next != null) {
- next.transformEntry(context, resourceWrapper, obj, count, prioritized, args);
- }
- }
- @SuppressWarnings("unchecked")
- void transformEntry(Context context, ResourceWrapper resourceWrapper, Object o, int count, boolean prioritized, Object... args) throws Throwable {
- T t = (T)o;
- entry(context, resourceWrapper, t, count, prioritized, args);
- }
- //触发执行责任链下一个节点的exit()方法
- @Override
- public void fireExit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
- if (next != null) {
- next.exit(context, resourceWrapper, count, args);
- }
- }
- public AbstractLinkedProcessorSlot<?> getNext() {
- return next;
- }
- public void setNext(AbstractLinkedProcessorSlot<?> next) {
- this.next = next;
- }
- }
复制代码 三.责任链的构建
Sentinel在默认情况下会通过DefaultProcessorSlotChain类来实现责任链的构建,当然我们也可以通过SPI机制指定一个自定义的责任链构建类。- //Builder for a default {@link ProcessorSlotChain}.
- @Spi(isDefault = true)
- public class DefaultSlotChainBuilder implements SlotChainBuilder {
- @Override
- public ProcessorSlotChain build() {
- //创建一个DefaultProcessorSlotChain对象实例
- ProcessorSlotChain chain = new DefaultProcessorSlotChain();
- //通过SPI机制加载责任链的节点ProcessorSlot实现类
- //然后按照@Spi注解的order属性进行排序并进行实例化
- //最后将ProcessorSlot实例放到sortedSlotList中
- List<ProcessorSlot> sortedSlotList = SpiLoader.of(ProcessorSlot.class).loadInstanceListSorted();
- //遍历已排好序的ProcessorSlot集合
- for (ProcessorSlot slot : sortedSlotList) {
- //安全检查,防止业务系统也写了一个SPI文件,但没按规定继承AbstractLinkedProcessorSlot
- if (!(slot instanceof AbstractLinkedProcessorSlot)) {
- RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain");
- continue;
- }
- //调用DefaultProcessorSlotChain.addLast()方法构建单向链表
- //将责任链的节点ProcessorSlot实例放入DefaultProcessorSlotChain中
- chain.addLast((AbstractLinkedProcessorSlot<?>) slot);
- }
- //返回单向链表
- return chain;
- }
- }
复制代码 DefaultProcessorSlotChain构建的责任链如下:
(2)监听器模式
一.监听器接口和具体实现
二.监听器管理器接口和具体实现
三.使用方如何基于这套监听器机制管理规则
Sentinel在加载和配置规则的时候就使用了监听器模式。监听器模式的实现分为三大部分:监听器、监听器管理器、使用方(比如规则管理器)。
一.监听器接口和具体实现- //This class holds callback method when SentinelProperty#updateValue(Object) need inform the listener
- //监听器接口,负责监听各个配置,包含两个方法:初始化方法以及更新方法
- public interface PropertyListener<T> {
- //Callback method when SentinelProperty#updateValue(Object) need inform the listener.
- //规则变更时触发的回调方法
- void configUpdate(T value);
- //The first time of the value's load.
- //首次加载规则时触发的回调方法
- void configLoad(T value);
- }
- //流控规则管理器
- public class FlowRuleManager {
- ...
- //监听器接口的具体实现:流控规则监听器
- private static final class FlowPropertyListener implements PropertyListener<List<FlowRule>> {
- //初始化规则
- @Override
- public synchronized void configUpdate(List<FlowRule> value) {
- Map<String, List<FlowRule>> rules = FlowRuleUtil.buildFlowRuleMap(value);
- if (rules != null) {
- flowRules = rules;
- }
- RecordLog.info("[FlowRuleManager] Flow rules received: {}", rules);
- }
- //规则变更
- @Override
- public synchronized void configLoad(List<FlowRule> conf) {
- Map<String, List<FlowRule>> rules = FlowRuleUtil.buildFlowRuleMap(conf);
- if (rules != null) {
- flowRules = rules;
- }
- RecordLog.info("[FlowRuleManager] Flow rules loaded: {}", rules);
- }
- }
- }
复制代码 二.监听器管理器接口和具体实现- //监听器管理器接口
- public interface SentinelProperty<T> {
- //添加监听者
- void addListener(PropertyListener<T> listener);
- //移除监听者
- void removeListener(PropertyListener<T> listener);
- //当监听值有变化时,调用此方法进行通知
- boolean updateValue(T newValue);
- }
- //监听器管理器具体实现
- public class DynamicSentinelProperty<T> implements SentinelProperty<T> {
- //存放每个监听器
- protected Set<PropertyListener<T>> listeners = new CopyOnWriteArraySet<>();
- //要监听的值
- private T value = null;
- public DynamicSentinelProperty() {
- }
- //添加监听器到集合
- @Override
- public void addListener(PropertyListener<T> listener) {
- listeners.add(listener);
- //回调监听器的configLoad()方法初始化规则配置
- listener.configLoad(value);
- }
- //移除监听器
- @Override
- public void removeListener(PropertyListener<T> listener) {
- listeners.remove(listener);
- }
- //更新值
- @Override
- public boolean updateValue(T newValue) {
- //如果值没变化,直接返回
- if (isEqual(value, newValue)) {
- return false;
- }
- RecordLog.info("[DynamicSentinelProperty] Config will be updated to: {}", newValue);
- //如果值发生了变化,则遍历监听器,回调监听器的configUpdate()方法更新对应的值
- value = newValue;
- for (PropertyListener<T> listener : listeners) {
- listener.configUpdate(newValue);
- }
- return true;
- }
- //对比值是否发生了变化
- private boolean isEqual(T oldValue, T newValue) {
- if (oldValue == null && newValue == null) {
- return true;
- }
- if (oldValue == null) {
- return false;
- }
- return oldValue.equals(newValue);
- }
- //清空监听器集合
- public void close() {
- listeners.clear();
- }
- }
复制代码 三.使用方如何基于这套监听器机制管理规则
[code]//流控规则管理器public class FlowRuleManager { //维护每个资源的流控规则列表,key是资源名称,value是资源对应的规则 private static volatile Map flowRules = new HashMap(); //饿汉式单例模式实例化流控规则的监听器对象 private static final FlowPropertyListener LISTENER = new FlowPropertyListener(); //监听器对象的管理器 private static SentinelProperty currentProperty = new DynamicSentinelProperty(); //当FlowRuleManager类的静态方法首次被调用时,会执行这里的静态代码块(对应类加载的过程) static { //将流控规则监听器注册到监听器管理器中 currentProperty.addListener(LISTENER); startMetricTimerListener(); } //Load FlowRules, former rules will be replaced. //加载流控规则 public static void loadRules(List rules) { //通知监听器管理器中的每一个监听器,规则已发生变化,需要重新加载规则配置 //其实就是更新FlowRuleManager规则管理器中的流控规则列表flowRules currentProperty.updateValue(rules); } ...}//使用方:通过流控规则管理器FlowRuleManager加载和监听流控规则public class FlowQpsDemo { private static final String KEY = "abc"; private static AtomicInteger pass = new AtomicInteger(); private static AtomicInteger block = new AtomicInteger(); private static AtomicInteger total = new AtomicInteger(); private static volatile boolean stop = false; private static final int threadCount = 32; private static int seconds = 60 + 40; public static void main(String[] args) throws Exception { //初始化QPS的流控规则 initFlowQpsRule(); //启动线程定时输出信息 tick(); //first make the system run on a very low condition //模拟QPS为32时的访问场景 simulateTraffic(); System.out.println("===== begin to do flow control"); System.out.println("only 20 requests per second can pass"); } private static void initFlowQpsRule() { List rules = new ArrayList(); FlowRule rule1 = new FlowRule(); rule1.setResource(KEY); //设置QPS的限制为20 rule1.setCount(20); rule1.setGrade(RuleConstant.FLOW_GRADE_QPS); rule1.setLimitApp("default"); rules.add(rule1); //首次调用FlowRuleManager的静态方法会加载FlowRuleManager类执行其静态代码块 //加载流控规则 FlowRuleManager.loadRules(rules); } private static void simulateTraffic() { for (int i = 0; i < threadCount; i++) { Thread t = new Thread(new RunTask()); t.setName("simulate-traffic-Task"); t.start(); } } private static void tick() { Thread timer = new Thread(new TimerTask()); timer.setName("sentinel-timer-task"); timer.start(); } static class TimerTask implements Runnable { @Override public void run() { long start = System.currentTimeMillis(); System.out.println("begin to statistic!!!"); long oldTotal = 0; long oldPass = 0; long oldBlock = 0; while (!stop) { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { } long globalTotal = total.get(); long oneSecondTotal = globalTotal - oldTotal; oldTotal = globalTotal; long globalPass = pass.get(); long oneSecondPass = globalPass - oldPass; oldPass = globalPass; long globalBlock = block.get(); long oneSecondBlock = globalBlock - oldBlock; oldBlock = globalBlock; System.out.println(seconds + " send qps is: " + oneSecondTotal); System.out.println(TimeUtil.currentTimeMillis() + ", total:" + oneSecondTotal + ", pass:" + oneSecondPass + ", block:" + oneSecondBlock); if (seconds-- |