Sentinel源码—8.限流算法和设计模式总结

360影视 动漫周边 2025-04-23 00:20 4

摘要:保护高并发系统的三把利器:缓存、降级和限流。限流就是通过限制请求的流量以达到保护系统的目的,比如秒杀抢购。具体就是对并发请求进行限速,或对一个时间窗口内的的请求进行限速,一旦达到限制速率就会拒绝服务或进行流量整形。

大纲

1.关于限流的概述

2.高并发下的四大限流算法原理及实现

3.Sentinel使用的设计模式总结

1.关于限流的概述

保护高并发系统的三把利器:缓存、降级和限流。限流就是通过限制请求的流量以达到保护系统的目的,比如秒杀抢购。具体就是对并发请求进行限速,或对一个时间窗口内的的请求进行限速,一旦达到限制速率就会拒绝服务或进行流量整形。

常用的限流方式:

一.限制总请求数

如数据库连接池、线程池。

二.限制瞬时并发数

如Nginx的LimitConn模块,可以用来限制瞬时并发连接数。如Java的Semaphore也可以用来限制瞬时并发数。如果需要限制方法被调用的并发数不能超过100(同一时间并发数),则可以使用信号量Semaphore来实现。

三.限制时间窗口内的平均速率

如Guava的RateLimiter、Nginx的LimitReq模块,就可以用来限制每秒的请求速率。如果需要限制方法在一段时间内平均被调用次数不超过100,则可以使用RateLimiter来实现。Guava的RateLimiter只能用于单机的限流,如果想要集群限流,则需要引入Redis或者阿里开源的Sentinel中间件。

四.限制远程接口的调用速率

五.限制MQ的消费速率

六.根据网络、CPU或内存负载等来限流

2.高并发下的四大限流算法原理及实现

(1)固定窗口计数法

(2)滑动窗口计数法

(3)漏桶算法

(4)令牌桶算法

(5)四种限流算法的对比总结

(1)固定窗口计数法

一.实现原理

在一个固定长度的时间窗口内限制请求数量。每来一个请求,请求数加一,如果请求数超过最大限制,就拒绝该请求。

二.算法流程

三.算法存在的问题

问题一:限流不够平滑

例如设置的是每秒限流3个请求,第一毫秒就发送3个请求,达到限流。那么窗口剩余时间的请求都将会被拒绝,这样用户体验就不好。

问题二:存在突发流量的问题

由于在进行窗口切换时,当前窗口的访问总数会立即置为0,所以可能会导致流量突发的问题。

四.算法的代码实现

//注意:下面的实现并没有考虑并发的情况public class FixWindowLimiter { //窗口的大小,1000ms public static long windowUnit = 1000; //窗口的最大请求数 public static long threshold = 10; //当前窗口内的请求数 public static long count = 0; //当前窗口的开始时间 public static long lastTime = 0; //限流方法,返回true表示通过 public boolean canPass { //获取当前时间 long currentTime = System.currentTimeMillis; //判断当前时间与窗口的开始时间的时间差,是否大于窗口的大小 if (currentTime - lastTime > windowUnit) { //当前窗口的请求数设置为0 count = 0; //重置当前窗口的开始时间为当前时间 lastTime = currentTime; } //判断当前窗口的请求数是否超过窗口的最大请求数 if (count

(2)滑动窗口计数法

为解决固定窗口计数法潜在的流量突发问题,可使用滑动窗口计数法。

一.实现原理

在滑动窗口算法中,窗口的开始时间是动态的,窗口大小是固定的。每来一个请求,就向后推一个时间窗口,计算该窗口内的请求数量。如果请求数超过限制就拒绝请求,否则处理请求 + 记录请求的时间戳。另外还需要一个任务清理过期的时间戳,滑动窗口没有划分固定的时间窗起点与终点。

二.算法存在的问题

虽然解决了流量突发的问题,但限流依然不够平滑。例如设置的是每秒限流3个请求,第一毫秒就发送3个请求,达到限流。那么窗口剩余时间的请求都将会被拒绝,这样用户体验就不好。

三.算法的代码实现

public class SlidingWindowLimiter { //每个窗口的最大请求数量 public static long threshold = 10; //窗口大小,1000ms public static long windowUnit = 1000; //请求集合,用来存储窗口内的请求数量 public static List requestList = new ArrayList; //限流方法,返回true表示通过 public boolean canPass { //获取当前时间 long currentTime = System.currentTimeMillis; //统计当前时间对应的窗口,收到的请求的数量 int sizeOfValid = this.sizeOfValid(currentTime); //判断请求数是否超过窗口的最大请求数量 if (sizeOfValid System.currentTimeMillis - requestTime > windowUnit); }}

(3)漏桶算法

它是一种流量整形(Traffic Shaping)和流量控制(Traffic Policing)的算法,它可以有效地控制流量的处理速率以及防止网络拥塞。

一.实现原理

首先,一个固定容量的漏桶,按照固定速率流出水(处理请求)。然后,当流入水的速度过大会直接溢出(请求数量超过限制则直接拒绝)。最后,漏桶里的水不够则无法流出水(漏桶内没有请求则不处理)。

当请求流量正常或者较小时,请求能够得到正常的处理。当请求流量过大时,漏桶算法可通过丢弃部分请求来防止系统过载。

这种算法的一个重要特性是:无论请求的接收速率如何变化,请求的处理速率始终是稳定的,这就确保了系统的负载不会超过预设的阈值。但是由于请求的处理速率是固定的,所以无法处理突发流量。此外如果入口流量过大,漏桶可能会溢出,导致请求丢失。

二.算法的优缺点

优点一:平滑流量

由于以固定的速率处理请求,所以可以有效地平滑和整形流量,避免流量的突发和波动,类似于消息队列的削峰填谷的作用。

优点二:防止过载

当流入的请求超过桶的容量时,可以直接丢弃请求,防止系统过载。

缺点一:无法处理突发流量

由于漏桶的出口速度是固定的,无法处理突发流量。例如,即使在流量较小的时候,也无法以更快的速度处理请求。

缺点二:可能会丢失数据

如果入口流量过大,超过了桶的容量,那么就需要丢弃部分请求。在一些不能接受丢失请求的场景中,这可能是一个问题。

缺点三:不适合处理速率变化大的场景

如果处理速率变化大,或需要动态调整处理速率,则无法满足。

三.算法的代码实现

public class LeakyBucketLimiter { //桶的最大容量 public static long threshold = 10; //当前桶内的水量 public static long count = 0; //漏水速率(每秒5次) public static long leakRate = 5; //上次漏水时间 public static long lastLeakTime = System.currentTimeMillis; //限流方法,返回true表示通过 public boolean canPass { //调用漏水方法 this.leak; //判断是否超过最大请求数量 if (count

(4)令牌桶算法

令牌桶限流算法也是一种常用的流量整形和限制请求处理速率的算法。

一.实现原理

首先,系统会以固定的速率向桶中添加令牌。然后,当有请求到来时,会尝试从桶中移除一个令牌。如果桶中有足够的令牌,则请求可以被处理。如果桶中没有令牌,那么请求将被拒绝。此外,桶中的令牌数不能超过桶的容量。如果新生成的令牌超过了桶的容量,那么新的令牌会被丢弃。

令牌桶算法的一个重要特性是,它能够处理突发流量。当桶中有足够的令牌时,可以一次性处理多个请求,这对于需要处理突发流量的应用场景非常有用。但是又不会无限制的增加处理速率导致压垮服务器,因为桶内令牌数量是有限制的。

二.算法的优缺点

优点一:可以处理突发流量

令牌桶算法可以处理突发流量。当桶满时,能够以最大速度处理请求。这对于需要处理突发流量的应用场景非常有用。

优点二:限制请求处理的平均速率

在长期运行中,请求的处理速率会被限制在预定义的平均速率下,也就是生成令牌的速率。

优点三:灵活性

与漏桶算法相比,令牌桶算法提供了更大的灵活性。例如,可以动态地调整生成令牌的速率。

缺点一:可能导致过载

如果令牌产生速度过快,可能会导致大量突发流量,使网络或服务过载。

缺点二:需要存储空间

令牌桶需要一定的存储空间来保存令牌,可能会导致内存资源的浪费。

三.算法的代码实现

public class TokenBucketLimiter { //桶的最大容量 public static long threshold = 10; //当前桶内的令牌数 public static long count = 0; //令牌生成速率(每秒5次) public static long tokenRate = 5; //上次生成令牌的时间 public static long lastRefillTime = System.currentTimeMillis; //限流方法,返回true表示通过 public boolean canPass { //调用生成令牌方法 this.refillTokens; //判断桶内是否还有令牌 if (count > 0) { count--; return true; } return false; } //生成令牌方法,计算并更新这段时间内生成的令牌数量 private void refillTokens { long currentTime = System.currentTimeMillis; //计算这段时间内,需要生成的令牌数量 long refillTokens = (currentTime - lastRefillTime) * tokenRate / 1000; //更新桶内的令牌数 count = Math.min(count + refillTokens, threshold); //更新令牌生成时间 lastRefillTime = currentTime; }}

(5)四种限流算法的对比总结

一.四种算法的优缺点

固定窗口算法实现简单,但是限流不够平滑,存在突发流量的问题,适用于需要简单实现限流的场景。

滑动窗口算法虽然解决了突发流量的问题,但是还是存在限流不够平滑的问题,所以它适用于需要控制平均请求速率的场景。

漏桶算法的优点是流量处理更平滑,但是无法应对突发流量,适用于需要平滑流量的场景。

令牌桶算法既能平滑流量,又能处理突发流量,适用于需要处理突发流量的场景。

二.令牌桶算法和漏桶算法的对比总结

令牌桶算法就是以固定速率生成令牌放入桶中。每个请求都需要从桶中获取令牌,没有获取到令牌的请求会被阻塞限流。当令牌消耗速度小于生成速度时,令牌桶内就会预存这些未消耗的令牌。当有突发流量进来时,可以直接从桶中取出令牌,而不会被限流。

漏桶算法就是将请求放入桶中,然后以固定的速率从桶中取出请求来处理。当桶中等待的请求数超过桶的容量后,后续的请求就不再加入桶中。

漏桶算法适用于需要以固定速率处理请求的场景。在多数业务场景中,其实并不需要按照严格的速率进行请求处理。而且多数业务场景都需要应对突发流量的能力,所以会使用令牌桶。

但不管是令牌桶算法还是漏桶算法,都可以通过延迟计算的方式来实现。延迟计算指的是不需要单独的线程来定时生成令牌或从漏桶中定时取请求,而是由调用限流器的线程自己来计算是否有足够的令牌以及需要sleep的时间。使用延迟计算的方式,可以节省一个线程资源。

3.Sentinel使用的设计模式总结

(1)责任链模式

(2)监听器模式

(3)适配器模式

(4)模版方法模式

(5)策略模式

(6)观察者模式

(1)责任链模式

一.责任链接口ProcessorSlot

二.责任链接口的抽象实现类

三.责任链的构建

Sentinel的功能都是靠一条链式的ProcessorSlot来完成的,这些ProcessorSlot的初始化以及调用便使用了责任链模式。

一.责任链接口ProcessorSlot

entry方法相当于AOP的before方法,也就是入口方法,因此责任链执行时会调用entry方法。

exit方法相当于AOP的after方法,也就是出口方法,因此责任链执行结束时会调用exit方法。

fireEntry方法相当于AOP在执行完before方法后调用pjp.proceed方法,也就是调用责任链上的下一个节点的entry方法。

fireExit方法相当于AOP在执行完exit方法后调用pjp.proceed方法,也就是调用责任链上的下一个节点的exit方法。

//A container of some process and ways of notification when the process is finished.public interface ProcessorSlot { //Entrance of this slot. //@param context current Context //@param ResourceWrapper current resource //@param param generics parameter, usually is a com.alibaba.csp.sentinel.node.Node //@param count tokens needed //@param prioritized whether the entry is prioritized //@param args parameters of the original call //@throws Throwable blocked exception or unexpected error void entry(Context context, ResourceWrapper resourceWrapper, T param, int count, boolean prioritized, Object... args) throws Throwable; //Means finish of #entry(Context, ResourceWrapper, Object, int, boolean, Object...). //@param context current Context //@param resourceWrapper current resource //@param obj relevant object (e.g. Node) //@param count tokens needed //@param prioritized whether the entry is prioritized //@param args parameters of the original call //@throws Throwable blocked exception or unexpected error void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args) throws Throwable; //Exit of this slot. //@param context current Context //@param resourceWrapper current resource //@param count tokens needed //@param args parameters of the original call void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args); //Means finish of #exit(Context, ResourceWrapper, int, Object...). //@param context current Context //@param resourceWrapper current resource //@param count tokens needed //@param args parameters of the original call void fireExit(Context context, ResourceWrapper resourceWrapper, int count, Object... args);}

二.责任链接口的抽象实现类

public abstract class AbstractLinkedProcessorSlot implements ProcessorSlot { //下一个节点,这里的责任链是一个单向链表,因此next就是当前节点所指向的下一个节点 private AbstractLinkedProcessorSlot next = null; //触发执行责任链下一个节点的entry方法 @Override public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args) throws Throwable { if (next != null) { next.transformEntry(context, resourceWrapper, obj, count, prioritized, args); } } @SuppressWarnings("unchecked") void transformEntry(Context context, ResourceWrapper resourceWrapper, Object o, int count, boolean prioritized, Object... args) throws Throwable { T t = (T)o; entry(context, resourceWrapper, t, count, prioritized, args); } //触发执行责任链下一个节点的exit方法 @Override public void fireExit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) { if (next != null) { next.exit(context, resourceWrapper, count, args); } } public AbstractLinkedProcessorSlot getNext { return next; } public void setNext(AbstractLinkedProcessorSlot next) { this.next = next; }}

三.责任链的构建

Sentinel在默认情况下会通过DefaultProcessorSlotChain类来实现责任链的构建,当然我们也可以通过SPI机制指定一个自定义的责任链构建类。

//Builder for a default {@link ProcessorSlotChain}.@Spi(isDefault = true)public class DefaultSlotChainBuilder implements SlotChainBuilder { @Override public ProcessorSlotChain build { //创建一个DefaultProcessorSlotChain对象实例 ProcessorSlotChain chain = new DefaultProcessorSlotChain; //通过SPI机制加载责任链的节点ProcessorSlot实现类 //然后按照@Spi注解的order属性进行排序并进行实例化 //最后将ProcessorSlot实例放到sortedSlotList中 List sortedSlotList = SpiLoader.of(ProcessorSlot.class).loadInstanceListSorted; //遍历已排好序的ProcessorSlot集合 for (ProcessorSlot slot : sortedSlotList) { //安全检查,防止业务系统也写了一个SPI文件,但没按规定继承AbstractLinkedProcessorSlot if (!(slot instanceof AbstractLinkedProcessorSlot)) { RecordLog.warn("The ProcessorSlot(" + slot.getClass.getCanonicalName + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain"); continue; } //调用DefaultProcessorSlotChain.addLast方法构建单向链表 //将责任链的节点ProcessorSlot实例放入DefaultProcessorSlotChain中 chain.addLast((AbstractLinkedProcessorSlot) slot); } //返回单向链表 return chain; }}

DefaultProcessorSlotChain构建的责任链如下:

(2)监听器模式

一.监听器接口和具体实现

二.监听器管理器接口和具体实现

三.使用方如何基于这套监听器机制管理规则

Sentinel在加载和配置规则的时候就使用了监听器模式。监听器模式的实现分为三大部分:监听器、监听器管理器、使用方(比如规则管理器)。

一.监听器接口和具体实现

//This class holds callback method when SentinelProperty#updateValue(Object) need inform the listener//监听器接口,负责监听各个配置,包含两个方法:初始化方法以及更新方法public interface PropertyListener { //Callback method when SentinelProperty#updateValue(Object) need inform the listener. //规则变更时触发的回调方法 void configUpdate(T value); //The first time of the value's load. //首次加载规则时触发的回调方法 void configLoad(T value);}//流控规则管理器public class FlowRuleManager { ... //监听器接口的具体实现:流控规则监听器 private static final class FlowPropertyListener implements PropertyListener> { //初始化规则 @Override public synchronized void configUpdate(List value) { Map> rules = FlowRuleUtil.buildFlowRuleMap(value); if (rules != null) { flowRules = rules; } RecordLog.info("[FlowRuleManager] Flow rules received: {}", rules); } //规则变更 @Override public synchronized void configLoad(List conf) { Map> rules = FlowRuleUtil.buildFlowRuleMap(conf); if (rules != null) { flowRules = rules; } RecordLog.info("[FlowRuleManager] Flow rules loaded: {}", rules); } }}

二.监听器管理器接口和具体实现

//监听器管理器接口public interface SentinelProperty { //添加监听者 void addListener(PropertyListener listener); //移除监听者 void removeListener(PropertyListener listener); //当监听值有变化时,调用此方法进行通知 boolean updateValue(T newValue);}//监听器管理器具体实现public class DynamicSentinelProperty implements SentinelProperty { //存放每个监听器 protected Set> listeners = new CopyOnWriteArraySet; //要监听的值 private T value = null; public DynamicSentinelProperty { } //添加监听器到集合 @Override public void addListener(PropertyListener listener) { listeners.add(listener); //回调监听器的configLoad方法初始化规则配置 listener.configLoad(value); } //移除监听器 @Override public void removeListener(PropertyListener listener) { listeners.remove(listener); } //更新值 @Override public boolean updateValue(T newValue) { //如果值没变化,直接返回 if (isEqual(value, newValue)) { return false; } RecordLog.info("[DynamicSentinelProperty] Config will be updated to: {}", newValue); //如果值发生了变化,则遍历监听器,回调监听器的configUpdate方法更新对应的值 value = newValue; for (PropertyListener listener : listeners) { listener.configUpdate(newValue); } return true; } //对比值是否发生了变化 private boolean isEqual(T oldValue, T newValue) { if (oldValue == null && newValue == null) { return true; } if (oldValue == null) { return false; } return oldValue.equals(newValue); } //清空监听器集合 public void close { listeners.clear; }}

三.使用方如何基于这套监听器机制管理规则

//流控规则管理器public class FlowRuleManager { //维护每个资源的流控规则列表,key是资源名称,value是资源对应的规则 private static volatile Map> flowRules = new HashMap; //饿汉式单例模式实例化流控规则的监听器对象 private static final FlowPropertyListener LISTENER = new FlowPropertyListener; //监听器对象的管理器 private static SentinelProperty> currentProperty = new DynamicSentinelProperty>; //当FlowRuleManager类的静态方法首次被调用时,会执行这里的静态代码块(对应类加载的过程) static { //将流控规则监听器注册到监听器管理器中 currentProperty.addListener(LISTENER); startMetricTimerListener; } //Load FlowRules, former rules will be replaced. //加载流控规则 public static void loadRules(List rules) { //通知监听器管理器中的每一个监听器,规则已发生变化,需要重新加载规则配置 //其实就是更新FlowRuleManager规则管理器中的流控规则列表flowRules currentProperty.updateValue(rules); } ...}//使用方:通过流控规则管理器FlowRuleManager加载和监听流控规则public class FlowQpsDemo { private static final String KEY = "abc"; private static AtomicInteger pass = new AtomicInteger; private static AtomicInteger block = new AtomicInteger; private static AtomicInteger total = new AtomicInteger; private static volatile boolean stop = false; private static final int threadCount = 32; private static int seconds = 60 + 40; public static void main(String args) throws Exception { //初始化QPS的流控规则 initFlowQpsRule; //启动线程定时输出信息 tick; //first make the system run on a very low condition //模拟QPS为32时的访问场景 simulateTraffic; System.out.println("===== begin to do flow control"); System.out.println("only 20 requests per second can pass"); } private static void initFlowQpsRule { List rules = new ArrayList; FlowRule rule1 = new FlowRule; rule1.setResource(KEY); //设置QPS的限制为20 rule1.setCount(20); rule1.setGrade(RuleConstant.FLOW_GRADE_QPS); rule1.setLimitApp("default"); rules.add(rule1); //首次调用FlowRuleManager的静态方法会加载FlowRuleManager类执行其静态代码块 //加载流控规则 FlowRuleManager.loadRules(rules); } private static void simulateTraffic { for (int i = 0; i

(3)适配器模式

适配器模式是一种结构型设计模式,它允许将一个类的接口转换为客户端期望的另一个接口。在Sentinel中,使用适配器模式将不同框架和库的接口适配为统一的接口,如SphU类。SphU类提供了统一的入口,用于执行不同的资源保护逻辑。

public class SphU { private static final Object OBJECTS0 = new Object[0]; private SphU { } //Record statistics and perform rule checking for the given resource. //@param name the unique name of the protected resource //@return the Entry of this invocation (used for mark the invocation complete and get context data) public static Entry entry(String name) throws BlockException { //调用CtSph.entry方法创建一个Entry资源访问对象 return Env.sph.entry(name, EntryType.OUT, 1, OBJECTS0); } //Checking all Rules about the protected method. //@param method the protected method public static Entry entry(Method method) throws BlockException { return Env.sph.entry(method, EntryType.OUT, 1, OBJECTS0); } //Checking all Rules about the protected method. //@param method the protected method //@param batchCount the amount of calls within the invocation (e.g. batchCount=2 means request for 2 tokens) public static Entry entry(Method method, int batchCount) throws BlockException { return Env.sph.entry(method, EntryType.OUT, batchCount, OBJECTS0); } //Record statistics and perform rule checking for the given resource. //@param name the unique string for the resource //@param batchCount the amount of calls within the invocation (e.g. batchCount=2 means request for 2 tokens) public static Entry entry(String name, int batchCount) throws BlockException { return Env.sph.entry(name, EntryType.OUT, batchCount, OBJECTS0); } //Checking all Rules about the protected method. //@param method the protected method //@param trafficType the traffic type (inbound, outbound or internal). //This is used to mark whether it can be blocked when the system is unstable, only inbound traffic could be blocked by SystemRule. public static Entry entry(Method method, EntryType trafficType) throws BlockException { return Env.sph.entry(method, trafficType, 1, OBJECTS0); } //Record statistics and perform rule checking for the given resource. public static Entry entry(String name, EntryType trafficType) throws BlockException { //调用CtSph.entry方法创建一个Entry资源访问对象 return Env.sph.entry(name, trafficType, 1, OBJECTS0); } //Checking all Rules about the protected method. //@param method the protected method //@param trafficType the traffic type (inbound, outbound or internal). //This is used to mark whether it can be blocked when the system is unstable, only inbound traffic could be blocked by SystemRule. //@param batchCount the amount of calls within the invocation (e.g. batchCount=2 means request for 2 tokens) public static Entry entry(Method method, EntryType trafficType, int batchCount) throws BlockException { return Env.sph.entry(method, trafficType, batchCount, OBJECTS0); } //Record statistics and perform rule checking for the given resource. public static Entry entry(String name, EntryType trafficType, int batchCount) throws BlockException { return Env.sph.entry(name, trafficType, batchCount, OBJECTS0); } //Checking all Rules about the protected method. //@param method the protected method //@param trafficType the traffic type (inbound, outbound or internal). //This is used to mark whether it can be blocked when the system is unstable, only inbound traffic could be blocked by SystemRule. //@param batchCount the amount of calls within the invocation (e.g. batchCount=2 means request for 2 tokens) //@param args args for parameter flow control or customized slots //@return the Entry of this invocation (used for mark the invocation complete and get context data) public static Entry entry(Method method, EntryType trafficType, int batchCount, Object... args) throws BlockException { return Env.sph.entry(method, trafficType, batchCount, args); } //Record statistics and perform rule checking for the given resource. public static Entry entry(String name, EntryType trafficType, int batchCount, Object... args) throws BlockException { return Env.sph.entry(name, trafficType, batchCount, args); } //Record statistics and check all rules of the resource that indicates an async invocation. //@param name the unique name of the protected resource public static AsyncEntry asyncEntry(String name) throws BlockException { return Env.sph.asyncEntry(name, EntryType.OUT, 1, OBJECTS0); } //Record statistics and check all rules of the resource that indicates an async invocation. //@param name the unique name for the protected resource //@param trafficType the traffic type (inbound, outbound or internal). //This is used to mark whether it can be blocked when the system is unstable, only inbound traffic could be blocked by SystemRule. //@return the Entry of this invocation (used for mark the invocation complete and get context data) public static AsyncEntry asyncEntry(String name, EntryType trafficType) throws BlockException { return Env.sph.asyncEntry(name, trafficType, 1, OBJECTS0); } public static AsyncEntry asyncEntry(String name, EntryType trafficType, int batchCount, Object... args) throws BlockException { return Env.sph.asyncEntry(name, trafficType, batchCount, args); } //Record statistics and perform rule checking for the given resource. The entry is prioritized. public static Entry entryWithPriority(String name) throws BlockException { return Env.sph.entryWithPriority(name, EntryType.OUT, 1, true); } //Record statistics and perform rule checking for the given resource. The entry is prioritized. public static Entry entryWithPriority(String name, EntryType trafficType) throws BlockException { return Env.sph.entryWithPriority(name, trafficType, 1, true); } //Record statistics and perform rule checking for the given resource. public static Entry entry(String name, int resourceType, EntryType trafficType) throws BlockException { return Env.sph.entryWithType(name, resourceType, trafficType, 1, OBJECTS0); } //Record statistics and perform rule checking for the given resource. public static Entry entry(String name, int resourceType, EntryType trafficType, Object args) throws BlockException { return Env.sph.entryWithType(name, resourceType, trafficType, 1, args); } //Record statistics and perform rule checking for the given resource that indicates an async invocation. public static AsyncEntry asyncEntry(String name, int resourceType, EntryType trafficType) throws BlockException { return Env.sph.asyncEntryWithType(name, resourceType, trafficType, 1, false, OBJECTS0); } //Record statistics and perform rule checking for the given resource that indicates an async invocation. public static AsyncEntry asyncEntry(String name, int resourceType, EntryType trafficType, Object args) throws BlockException { return Env.sph.asyncEntryWithType(name, resourceType, trafficType, 1, false, args); } //Record statistics and perform rule checking for the given resource that indicates an async invocation. public static AsyncEntry asyncEntry(String name, int resourceType, EntryType trafficType, int batchCount, Object args) throws BlockException { return Env.sph.asyncEntryWithType(name, resourceType, trafficType, batchCount, false, args); }}

(4)模版方法模式

模板方法模式是一种行为型设计模式,它在一个方法中定义一个算法的骨架,将一些步骤延迟到子类中实现。Sentinel便使用了类似模板方法模式来处理熔断策略,但不是严格意义上的模板模式,因为模板方法模式一般会有一个final修饰的模板方法来定义整个流程。例如AbstractCircuitBreaker类定义了熔断策略的基本结构,具体的细节需要继承它并实现对应的方法。

public abstract class AbstractCircuitBreaker implements CircuitBreaker { @Override public boolean tryPass(Context context) { ... } //提供抽象方法供子类实现 abstract void resetStat;}//子类public class ExceptionCircuitBreaker extends AbstractCircuitBreaker { @Override protected void resetStat { stat.currentWindow.value.reset; }}

(5)策略模式

策略模式是一种行为型设计模式,定义了一系列的算法,并将每个算法封装起来,使它们可以互相替换。Sentinel便在构建流控规则对象时使用了策略模式来设置不同的流控策略。例如TrafficShapingController接口定义了流控策略的方法,具体的实现类负责实现不同的流控策略。

//流控效果接口public interface TrafficShapingController { //Check whether given resource entry can pass with provided count. //@param node resource node //@param acquireCount count to acquire //@param prioritized whether the request is prioritized //@return true if the resource entry can pass; false if it should be blocked boolean canPass(Node node, int acquireCount, boolean prioritized); //Check whether given resource entry can pass with provided count. //@param node resource node //@param acquireCount count to acquire //@return true if the resource entry can pass; false if it should be blocked boolean canPass(Node node, int acquireCount);}//流控规则管理器public class FlowRuleManager { ... private static final class FlowPropertyListener implements PropertyListener> { //初始化规则 @Override public synchronized void configUpdate(List value) { //构建流控规则对象 Map> rules = FlowRuleUtil.buildFlowRuleMap(value); if (rules != null) { flowRules = rules; } RecordLog.info("[FlowRuleManager] Flow rules received: {}", rules); } //规则变更 @Override public synchronized void configLoad(List conf) { //构建流控规则对象 Map> rules = FlowRuleUtil.buildFlowRuleMap(conf); if (rules != null) { flowRules = rules; } RecordLog.info("[FlowRuleManager] Flow rules loaded: {}", rules); } }}public final class FlowRuleUtil { ... public static Map> buildFlowRuleMap(List list, Function groupFunction, Predicate filter, boolean shouldSort) { Map> newRuleMap = new ConcurrentHashMap; if (list == null || list.isEmpty) { return newRuleMap; } Map> tmpMap = new ConcurrentHashMap; for (FlowRule rule : list) { if (!isValidRule(rule)) { RecordLog.warn("[FlowRuleManager] Ignoring invalid flow rule when loading new flow rules: " + rule); continue; } if (filter != null && !filter.test(rule)) { continue; } if (StringUtil.isBlank(rule.getLimitApp)) { rule.setLimitApp(RuleConstant.LIMIT_APP_DEFAULT); } //获取[流控效果]处理器 TrafficShapingController rater = generateRater(rule); rule.setRater(rater); //获取资源名 K key = groupFunction.apply(rule); if (key == null) { continue; } //获取资源名对应的流控规则列表 Set flowRules = tmpMap.get(key); //将规则放到Map里,和当前资源绑定 if (flowRules == null) { //Use hash set here to remove duplicate rules. flowRules = new HashSet; tmpMap.put(key, flowRules); } flowRules.add(rule); } Comparator comparator = new FlowRuleComparator; for (Entry> entries : tmpMap.entrySet) { List rules = new ArrayList(entries.getValue); if (shouldSort) { //Sort the rules. Collections.sort(rules, comparator); } newRuleMap.put(entries.getKey, rules); } return newRuleMap; } private static TrafficShapingController generateRater(FlowRule rule) { //判断只有当阈值类型为QPS时才生效 if (rule.getGrade == RuleConstant.FLOW_GRADE_QPS) { //根据流控效果选择不同的流量整形控制器TrafficShapingController switch (rule.getControlBehavior) { case RuleConstant.CONTROL_BEHAVIOR_WARM_UP://Warm Up预热模式——冷启动模式 return new WarmUpController(rule.getCount, rule.getWarmUpPeriodSec, ColdFactorProperty.coldFactor); case RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER://排队等待模式 return new RateLimiterController(rule.getMaxQueueingTimeMs, rule.getCount); case RuleConstant.CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER://Warm Up + 排队等待模式 return new WarmUpRateLimiterController(rule.getCount, rule.getWarmUpPeriodSec, rule.getMaxQueueingTimeMs, ColdFactorProperty.coldFactor); case RuleConstant.CONTROL_BEHAVIOR_DEFAULT://快速失败模式——Default默认模式 default: //Default mode or unknown mode: default traffic shaping controller (fast-reject). } } //默认模式:快速失败用的是DefaultController return new DefaultController(rule.getCount, rule.getGrade); } ...}public class FlowRuleChecker { ... private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) { //选择Node作为限流计算的依据 Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node); if (selectedNode == null) { return true; } //先通过FlowRule.getRater方法获取流控规则对应的流量整形控制器 //然后调用TrafficShapingController.canPass方法对请求进行检查 return rule.getRater.canPass(selectedNode, acquireCount, prioritized); } ...}

(6)观察者模式

Sentinel实现熔断功能使用了观察者模式。具体接口是CircuitBreakerStateChangeObserver,它负责感知熔断器状态发生变化后通知到各个观察者。

//1.首先定义观察者接口CircuitBreakerStateChangeObserverpublic interface CircuitBreakerStateChangeObserver { void onStateChange(CircuitBreaker oldCircuitBreaker, CircuitBreaker newCircuitBreaker);}//2.在熔断器事件注册类EventObserverRegistry中://定义一个观察者Map(stateChangeObserverMap)用于存放观察者实例,并提供注册、移除和获取全部观察者的方法public class EventObserverRegistry { private final Map stateChangeObserverMap = new HashMap; //注册观察者 public void addStateChangeObserver(String name, CircuitBreakerStateChangeObserver observer) { stateChangeObserverMap.put(name, observer); } //移除观察者 public boolean removeStateChangeObserver(String name) { return stateChangeObserverMap.remove(name) != null; } //获取全部观察者 public List getStateChangeObservers { return new ArrayList(stateChangeObserverMap.values); } ...}//3.当熔断器状态发生变化时,通知所有已注册的观察者。//比如在AbstractCircuitBreaker类中的notifyObservers方法中实现:public abstract class AbstractCircuitBreaker implements CircuitBreaker { private void notifyObservers(CircuitBreaker.State prevState, CircuitBreaker.State newState, Double snapshotValue) { for (CircuitBreakerStateChangeObserver observer : observerRegistry.getStateChangeObservers) { observer.onStateChange(prevState, newState, rule, snapshotValue); } }}

来源:墨码行者

相关推荐