大纲
1.FlowSlot根据流控规则对请求进行限流
2.FlowSlot实现流控规则的快速失败效果的原理
3.FlowSlot实现流控规则中排队等待效果的原理
4.FlowSlot实现流控规则中Warm Up效果的原理
1.FlowSlot根据流控规则对请求进行限流
(1)流控规则FlowRule的配置Demo
(2)注册流控监听器和加载流控规则
(3)FlowSlot根据流控规则对请求进行限流
(1)流控规则FlowRule的配置Demo
从下图可知,流控规则包含以下属性:
一.规则id、资源名、针对来源
这三个属性是所有规则共有的属性,会分别封装到AbstractRule的id、resource、limitApp字段中,各个具体的规则子类都会继承AbstractRule类。
二.阈值类型
阈值类型包括QPS和并发线程数两个选项,对应FlowRule中的字段为grade。
三.单机阈值
单机阈值也就是限流阈值。无论是基于QPS还是并发线程数,都要设置限流阈值,对应FlowRule中的字段为count。
四.是否集群
是否集群是一个boolean类型的字段,对应FlowRule中的字段为clusterMode。true表示开启集群模式,false表示单机模式;
五.流控模式
流控模式有三种:直接模式、关联模式和链路模式,对应FlowRule中的字段为strategy。
六.关联资源
当流控模式选择为关联时,此值含义是关联资源,当流控模式选择为链路时,此值含义是入口资源。所以仅当流控模式选择关联和链路时,才对应FlowRule中的字段为refResource。
七.流控效果
流控效果有三种:快速失败、Warm Up、排队等待。还有一个选项未在页面上显示,即Warm Up和排队等待的结合体。也就是Warm Up + 排队等待,对应FlowRule中的字段为controlBehavior。
八.流控控制器
由于流控有多种模式,而每种模式都会对应一个模式流量整形控制器。所以流控规则FlowRule中会有一个字段TrafficShapingController,用来实现不同流控模式下的不同流控效果。
九.预热时长
此选项仅在流控效果选择Warm Up时出现,表示Warm Up的时长,对应FlowRule中的字段为warmUpPeriodSec。
十.超时时间
此选项仅在流控效果选择排队等待时出现,表示超出流控阈值后,排队等待多久才抛出异常,对应FlowRule中的字段为maxQueueingTimeMs。
public abstract class AbstractRule implements Rule {
//rule id. 规则id
private Long id;
//Resource name. 资源名称
private String resource;
//针对来源,默认是default
//多个来源使用逗号隔开,比如黑名单规则,限制userId是1和3的访问,那么就设置limitApp为"1,3"
//Application name that will be limited by origin.
//The default limitApp is {@code default}, which means allowing all origin apps.
//For authority rules, multiple origin name can be separated with comma (',').
private String limitApp;
...
}
//规则id、资源名(resource)、针对来源(limitApp),这三个字段在父类AbstractRule里
//Each flow rule is mainly composed of three factors: grade, strategy and controlBehavior:
//The grade represents the threshold type of flow control (by QPS or thread count).
//The strategy represents the strategy based on invocation relation.
//The controlBehavior represents the QPS shaping behavior (actions on incoming request when QPS exceeds the threshold.
public class FlowRule extends AbstractRule {
public FlowRule() {
super();
setLimitApp(RuleConstant.LIMIT_APP_DEFAULT);
}
public FlowRule(String resourceName) {
super();
setResource(resourceName);
setLimitApp(RuleConstant.LIMIT_APP_DEFAULT);
}
//The threshold type of flow control (0: thread count, 1: QPS).
//阈值类型:1代表QPS;0代表并发线程数
private int grade = RuleConstant.FLOW_GRADE_QPS;
//Flow control threshold count.
//单机阈值:也就是限流数
private double count;
//Flow control strategy based on invocation chain.
//流控模式:0代表直接;1代表关联;2代表链路
//RuleConstant#STRATEGY_DIRECT for direct flow control (by origin);
//RuleConstant#STRATEGY_RELATE for relevant flow control (with relevant resource);
//RuleConstant#STRATEGY_CHAIN for chain flow control (by entrance resource).
private int strategy = RuleConstant.STRATEGY_DIRECT;
//关联资源,当流控模式选择为关联时,此值含义是关联资源,当流控模式选择为链路时,此值含义是入口资源
//Reference resource in flow control with relevant resource or context.
private String refResource;
//Rate limiter control behavior.
//0. default(reject directly), 1. warm up, 2. rate limiter, 3. warm up + rate limiter
//流控效果:0代表快速失败, 1代表Warm Up, 2代表排队等待, 3代表Warm Up + 排队等待
private int controlBehavior = RuleConstant.CONTROL_BEHAVIOR_DEFAULT;
//预热时长:只有当流控效果选择为Warm Up时才会出现
private int warmUpPeriodSec = 10;
//Max queueing time in rate limiter behavior.
//超时时间:只有当流控效果选择排队等待时才会出现
private int maxQueueingTimeMs = 500;
//是否集群:默认false表示单机
private boolean clusterMode;
//Flow rule config for cluster mode.
//集群配置
private ClusterFlowConfig clusterConfig;
//The traffic shaping (throttling) controller.
//流量整形控制器:实现[流控效果]的四种不同模式
private TrafficShapingController controller;
...
}
public class FlowQpsDemo {
private static final String KEY = "abc";
private static AtomicInteger pass = new AtomicInteger();
private static AtomicInteger block = new AtomicInteger();
private static AtomicInteger total = new AtomicInteger();
private static volatile boolean stop = false;
private static final int threadCount = 32;
private static int seconds = 60 + 40;
public static void main(String[] args) throws Exception {
//初始化QPS的流控规则
initFlowQpsRule();
//启动线程定时输出信息
tick();
//first make the system run on a very low condition
//模拟QPS为32时的访问场景
simulateTraffic();
System.out.println("===== begin to do flow control");
System.out.println("only 20 requests per second can pass");
}
private static void initFlowQpsRule() {
List rules = new ArrayList();
FlowRule rule1 = new FlowRule();
rule1.setResource(KEY);
//设置QPS的限制为20
rule1.setCount(20);
rule1.setGrade(RuleConstant.FLOW_GRADE_QPS);
rule1.setLimitApp("default");
rules.add(rule1);
//加载流控规则
FlowRuleManager.loadRules(rules);
}
private static void simulateTraffic() {
for (int i = 0; i < threadCount xss=removed xss=removed xss=removed xss=removed xss=removed xss=removed xss=removed xss=removed xss=removed xss=removed xss=removed xss=removed xss=removed xss=removed xss=removed xss=removed xss=removed xss=removed xss=removed xss=removed xss=removed xss=removed>
(2)注册流控监听器和加载流控规则
一.Sentinel监听器模式的处理流程
Sentinel监听器模式会包含三大角色:
角色一:监听器PropertyListener
角色二:监听器管理器SentinelProperty
角色三:规则管理器RuleManager
首先,规则管理器RuleManager在初始化时,会调用监听器管理器SentinelProperty的addListener()方法将监听器PropertyListener注册到监听器管理器SentinelProperty中。
然后,使用方使用具体的规则时,可以通过调用规则管理器RuleManager的loadRules()方法加载规则。加载规则时会调用监听器管理器SentinelProperty的updateValue()方法通知每一个监听器PropertyListener,也就是通过监听器PropertyListener的configUpdate()方法把规则加载到规则管理器RuleManager的本地中。
二.注册流控监听器和加载流控规则
需要注意的是:加载流控规则时会调用FlowRuleUtil的generateRater()方法,根据不同的流控效果选择不同的流量整形控制器TrafficShapingController。
//One resources can have multiple rules.
//And these rules take effects in the following order:
//requests from specified caller
//no specified caller
public class FlowRuleManager {
//维护每个资源的流控规则列表,key是资源名称,value是资源对应的规则
private static volatile Map> flowRules = new HashMap<>();
//饿汉式单例模式实例化流控规则的监听器对象
private static final FlowPropertyListener LISTENER = new FlowPropertyListener();
//监听器对象的管理器
private static SentinelProperty> currentProperty = new DynamicSentinelProperty>();
...
static {
//将流控规则监听器注册到监听器管理器中
currentProperty.addListener(LISTENER);
startMetricTimerListener();
}
//Load FlowRules, former rules will be replaced.
//加载流控规则
public static void loadRules(List rules) {
//通知监听器管理器中的每一个监听器,规则已发生变化,需要重新加载规则配置
//其实就是更新FlowRuleManager规则管理器中的流控规则列表flowRules
currentProperty.updateValue(rules);
}
private static final class FlowPropertyListener implements PropertyListener> {
@Override
public synchronized void configUpdate(List value) {
Map> rules = FlowRuleUtil.buildFlowRuleMap(value);
if (rules != null) {
flowRules = rules;
}
RecordLog.info("[FlowRuleManager] Flow rules received: {}", rules);
}
@Override
public synchronized void configLoad(List conf) {
Map> rules = FlowRuleUtil.buildFlowRuleMap(conf);
if (rules != null) {
flowRules = rules;
}
RecordLog.info("[FlowRuleManager] Flow rules loaded: {}", rules);
}
}
...
}
public final class FlowRuleUtil {
private static final Function extractResource = new Function() {
@Override
public String apply(FlowRule rule) {
return rule.getResource();
}
};
...
//Build the flow rule map from raw list of flow rules, grouping by resource name.
public static Map> buildFlowRuleMap(List list) {
return buildFlowRuleMap(list, null);
}
//Build the flow rule map from raw list of flow rules, grouping by resource name.
public static Map> buildFlowRuleMap(List list, Predicate filter) {
return buildFlowRuleMap(list, filter, true);
}
//Build the flow rule map from raw list of flow rules, grouping by resource name.
public static Map> buildFlowRuleMap(List list, Predicate filter, boolean shouldSort) {
return buildFlowRuleMap(list, extractResource, filter, shouldSort);
}
//Build the flow rule map from raw list of flow rules, grouping by provided group function.
public static Map> buildFlowRuleMap(List list, Function groupFunction, Predicate filter, boolean shouldSort) {
Map> newRuleMap = new ConcurrentHashMap<>();
if (list == null || list.isEmpty()) {
return newRuleMap;
}
Map> tmpMap = new ConcurrentHashMap<>();
for (FlowRule rule : list) {
...
//获取[流控效果]流量整形控制器TrafficShapingController
TrafficShapingController rater = generateRater(rule);
rule.setRater(rater);
//获取资源名
K key = groupFunction.apply(rule);
if (key == null) {
continue;
}
//获取资源名对应的流控规则列表
Set flowRules = tmpMap.get(key);
//将规则放到Map里,和当前资源绑定
if (flowRules == null) {
//Use hash set here to remove duplicate rules.
flowRules = new HashSet<>();
tmpMap.put(key, flowRules);
}
flowRules.add(rule);
}
Comparator comparator = new FlowRuleComparator();
for (Entry> entries : tmpMap.entrySet()) {
List rules = new ArrayList<>(entries.getValue());
if (shouldSort) {
//Sort the rules.
Collections.sort(rules, comparator);
}
newRuleMap.put(entries.getKey(), rules);
}
return newRuleMap;
}
private static TrafficShapingController generateRater(/*@Valid*/ FlowRule rule) {
//判断只有当阈值类型为QPS时才生效
if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {
//根据流控效果选择不同的流量整形控制器TrafficShapingController
switch (rule.getControlBehavior()) {
case RuleConstant.CONTROL_BEHAVIOR_WARM_UP:
//Warm Up预热模式——冷启动模式
return new WarmUpController(rule.getCount(), rule.getWarmUpPeriodSec(), ColdFactorProperty.coldFactor);
case RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER:
//排队等待模式
return new RateLimiterController(rule.getMaxQueueingTimeMs(), rule.getCount());
case RuleConstant.CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER:
//Warm Up + 排队等待模式
return new WarmUpRateLimiterController(rule.getCount(), rule.getWarmUpPeriodSec(), rule.getMaxQueueingTimeMs(), ColdFactorProperty.coldFactor);
case RuleConstant.CONTROL_BEHAVIOR_DEFAULT:
//快速失败模式——Default默认模式
default:
//Default mode or unknown mode: default traffic shaping controller (fast-reject).
}
}
//默认模式:快速失败用的是DefaultController
return new DefaultController(rule.getCount(), rule.getGrade());
}
...
}
(3)FlowSlot根据流控规则对请求进行限流
FlowSlot中处理限流的核心方法其实是FlowRuleChecker.checkFlow()。该方法首先会从FlowRuleManager.flowRules中获取资源对应的流控规则列表,然后再遍历规则列表并调用canPassCheck()方法验证当前请求是否命中规则。如果命中规则,则抛出异常。如果没命中规则,则允许请求通过。
FlowRuleChecker的canPassCheck()方法会判断规则是否是集群模式。如果流控规则是集群模式,则调用passClusterCheck()方法。如果流控规则不是集群模式,则调用passLocalCheck()方法。
在FlowRuleChecker的passLocalCheck()方法中,首先会根据流控规则选择合适的Node作为限流计算的依据,然后再通过TrafficShapingController的canPass()方法判断是否放行请求。
其中选择合适的Node作为限流计算的依据时,会调用selectNodeByRequesterAndStrategy()方法根据流控规则的针对来源、流控模式和当前请求的来源,从下面三个节点中选择一个作为合适的Node:
节点一:上下文中的来源节点OriginNode
节点二:当前请求的集群节点ClusterNode
节点三:默认节点DefaultNode
注意:limitApp是流控规则里的字段,代表此规则生效时的针对来源。而origin是Context里的字段,代表当前请求的来源是什么。假设limitApp配置的是shop,某个请求的origin也是shop,则规则生效。如果limitApp采取默认值default,则代表全部origin都生效。
@Spi(order = Constants.ORDER_FLOW_SLOT)
public class FlowSlot extends AbstractLinkedProcessorSlot {
private final FlowRuleChecker checker;
private final Function> ruleProvider = new Function>() {
@Override
public Collection apply(String resource) {
//Flow rule map should not be null.
Map> flowRules = FlowRuleManager.getFlowRuleMap();
return flowRules.get(resource);
}
};
public FlowSlot() {
this(new FlowRuleChecker());
}
FlowSlot(FlowRuleChecker checker) {
AssertUtil.notNull(checker, "flow checker should not be null");
this.checker = checker;
}
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {
//检查流控规则,count默认是1
checkFlow(resourceWrapper, context, node, count, prioritized);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
//调用规则检查器FlowRuleChecker的checkFlow()方法进行检查,count默认是1
checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);
}
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
fireExit(context, resourceWrapper, count, args);
}
}
//Rule checker for flow control rules.
public class FlowRuleChecker {
public void checkFlow(Function> ruleProvider, ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
if (ruleProvider == null || resource == null) {
return;
}
//从Map中获取resource资源对应的流控规则列表
Collection rules = ruleProvider.apply(resource.getName());
if (rules != null) {
//循环遍历每一个流控规则
for (FlowRule rule : rules) {
//调用canPassCheck方法进行流控规则验证,判断此次请求是否命中针对resource资源配置的流控规则
//传入的参数count默认是1
if (!canPassCheck(rule, context, node, count, prioritized)) {
throw new FlowException(rule.getLimitApp(), rule);
}
}
}
}
public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount) {
return canPassCheck(rule, context, node, acquireCount, false);
}
public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) {
//参数校验
String limitApp = rule.getLimitApp();
if (limitApp == null) {
return true;
}
//如果是集群模式,则执行passClusterCheck()方法
if (rule.isClusterMode()) {
return passClusterCheck(rule, context, node, acquireCount, prioritized);
}
//如果是单机模式,则执行passLocalCheck()方法,acquireCount默认是1
return passLocalCheck(rule, context, node, acquireCount, prioritized);
}
private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) {
//选择Node作为限流计算的依据
Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
if (selectedNode == null) {
return true;
}
//先通过FlowRule.getRater()方法获取流控规则对应的流量整形控制器
//然后调用TrafficShapingController.canPass()方法对请求进行检查,acquireCount默认是1
return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
}
//选择Node作为限流计算的依据
static Node selectNodeByRequesterAndStrategy(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node) {
//The limit app should not be empty.
//获取流控规则中配置的"针对来源",默认是default
String limitApp = rule.getLimitApp();
//获取流控规则中配置的"流控模式",0代表直接,1代表关联,2代表链路
int strategy = rule.getStrategy();
//从context对象中获取当前请求的来源
String origin = context.getOrigin();
//情形一:当流控规则的针对来源(limitApp)与当前请求的来源(origin)相同时
//这种情况表示该限流规则针对特定来源进行限流
//如果配置了针对app1进行限流,那么app2就不会生效,这就是针对特定来源进行限流
if (limitApp.equals(origin) && filterOrigin(origin)) {
//如果流控规则中配置的"流控模式"是直接(RuleConstant.STRATEGY_DIRECT),则返回上下文中的Origin Node
//因为这种模式要求根据调用方的情况进行限流,而Origin Node包含了调用方的统计信息,所以选择Origin Node作为限流计算的依据
if (strategy == RuleConstant.STRATEGY_DIRECT) {
//Matches limit origin, return origin statistic node.
return context.getOriginNode();
}
//如果流控规则中配置的"流控模式"是关联、链路(RuleConstant.STRATEGY_RELATE或RuleConstant.STRATEGY_CHAIN),则调用selectReferenceNode()方法
//此方法会判断:
//如果"流控模式"是关联(RuleConstant.STRATEGY_RELATE),则返回关联资源的ClusterNode
//如果"流控模式"是链路(RuleConstant.STRATEGY_CHAIN),则返回DefaultNode
return selectReferenceNode(rule, context, node);
}
//情况二:当流控规则的针对来源(limitApp)是默认值(RuleConstant.LIMIT_APP_DEFAULT)时
//这种情况表示该流控规则对所有来源都生效
else if (RuleConstant.LIMIT_APP_DEFAULT.equals(limitApp)) {
//如果流控规则中配置的"流控模式"是直接(RuleConstant.STRATEGY_DIRECT),则返回当前请求的集群节点ClusterNode
//因为此时要求的是根据被调用资源的情况进行限流,而集群节点包含了被调用资源的统计信息,所以选择集群节点作为限流计算的依据
if (strategy == RuleConstant.STRATEGY_DIRECT) {
//Return the cluster node.
return node.getClusterNode();
}
//如果流控规则中配置的"流控模式"是关联、链路(RuleConstant.STRATEGY_RELATE或RuleConstant.STRATEGY_CHAIN),则调用selectReferenceNode()方法
//此方法会判断:
//如果"流控模式"是关联(RuleConstant.STRATEGY_RELATE),则返回关联资源的ClusterNode
//如果"流控模式"是链路(RuleConstant.STRATEGY_CHAIN),则返回DefaultNode
return selectReferenceNode(rule, context, node);
}
//情况三:当流控规则的针对来源(limitApp)是其他(RuleConstant.LIMIT_APP_OTHER),且当前请求的来源(origin)与流控规则的资源名(rule.getResource())不同时
//这种情况表示该流控规则针对除默认来源以外的其他来源进行限流,可实现个性化限流
//比如可以对app1进行个性化限流,对其他所有app进行整体限流
else if (RuleConstant.LIMIT_APP_OTHER.equals(limitApp) && FlowRuleManager.isOtherOrigin(origin, rule.getResource())) {
//如果流控规则中配置的"流控模式"是直接(RuleConstant.STRATEGY_DIRECT),则返回上下文中的来源节点(Origin Node)
//因为这种"流控模式"要求根据调用方的情况进行限流,而来源节点包含了调用方的统计信息,所以选择来源节点作为限流计算的依据
if (strategy == RuleConstant.STRATEGY_DIRECT) {
return context.getOriginNode();
}
//如果流控规则中配置的"流控模式"是关联、链路(RuleConstant.STRATEGY_RELATE或RuleConstant.STRATEGY_CHAIN),则调用selectReferenceNode()方法
//此方法会判断:
//如果"流控模式"是关联(RuleConstant.STRATEGY_RELATE),则返回关联资源的ClusterNode
//如果"流控模式"是链路(RuleConstant.STRATEGY_CHAIN),则返回DefaultNode
return selectReferenceNode(rule, context, node);
}
return null;
}
//如果流控规则中配置的"流控模式"是关联(RuleConstant.STRATEGY_RELATE),则返回关联资源的ClusterNode
//如果流控规则中配置的"流控模式"是链路(RuleConstant.STRATEGY_CHAIN),则返回DefaultNode
static Node selectReferenceNode(FlowRule rule, Context context, DefaultNode node) {
String refResource = rule.getRefResource();
int strategy = rule.getStrategy();
if (StringUtil.isEmpty(refResource)) {
return null;
}
if (strategy == RuleConstant.STRATEGY_RELATE) {
//关联资源的ClusterNode
return ClusterBuilderSlot.getClusterNode(refResource);
}
if (strategy == RuleConstant.STRATEGY_CHAIN) {
if (!refResource.equals(context.getName())) {
return null;
}
return node;
}
//No node.
return null;
}
private static boolean filterOrigin(String origin) {
// Origin cannot be `default` or `other`.
return !RuleConstant.LIMIT_APP_DEFAULT.equals(origin) && !RuleConstant.LIMIT_APP_OTHER.equals(origin);
}
...
}
2.FlowSlot实现流控规则的快速失败效果的原理
(1)流控效果为快速失败时对应的流量整形控制器
(2)流量整形控制器DefaultController执行分析
(3)流控模式中的关联模式和链路模式说明
(1)流控效果为快速失败时对应的流量整形控制器
流控效果有:快速失败、Warm Up、排队等待。
调用FlowRuleManager的loadRules()方法加载流控规则时,会触发执行FlowPropertyListener的configUpdate()方法,该方法又会调用FlowRuleUtil的generateRater()方法。在FlowRuleUtil的generateRater()方法中,便会根据不同的流控效果选择不同的流量整形控制器。
其中当流控规则的流控效果为快速失败时,对应的流量整形控制器为TrafficShapingController的子类DefaultController。
public class FlowRuleManager {
//维护每个资源的流控规则列表,key是资源名称,value是资源对应的规则
private static volatile Map> flowRules = new HashMap<>();
//饿汉式单例模式实例化流控规则的监听器对象
private static final FlowPropertyListener LISTENER = new FlowPropertyListener();
//监听器对象的管理器
private static SentinelProperty> currentProperty = new DynamicSentinelProperty>();
...
static {
//将流控规则监听器注册到监听器管理器中
currentProperty.addListener(LISTENER);
startMetricTimerListener();
}
//Load FlowRules, former rules will be replaced.
//加载流控规则
public static void loadRules(List rules) {
//通知监听器管理器中的每一个监听器,规则已发生变化,需要重新加载规则配置
//其实就是更新FlowRuleManager规则管理器中的流控规则列表flowRules
currentProperty.updateValue(rules);
}
private static final class FlowPropertyListener implements PropertyListener> {
@Override
public synchronized void configUpdate(List value) {
Map> rules = FlowRuleUtil.buildFlowRuleMap(value);
if (rules != null) {
flowRules = rules;
}
RecordLog.info("[FlowRuleManager] Flow rules received: {}", rules);
}
@Override
public synchronized void configLoad(List conf) {
Map> rules = FlowRuleUtil.buildFlowRuleMap(conf);
if (rules != null) {
flowRules = rules;
}
RecordLog.info("[FlowRuleManager] Flow rules loaded: {}", rules);
}
}
...
}
public final class FlowRuleUtil {
private static final Function extractResource = new Function() {
@Override
public String apply(FlowRule rule) {
return rule.getResource();
}
};
...
//Build the flow rule map from raw list of flow rules, grouping by provided group function.
public static Map> buildFlowRuleMap(List list, Function groupFunction, Predicate filter, boolean shouldSort) {
Map> newRuleMap = new ConcurrentHashMap<>();
if (list == null || list.isEmpty()) {
return newRuleMap;
}
Map> tmpMap = new ConcurrentHashMap<>();
for (FlowRule rule : list) {
...
//获取[流控效果]流量整形控制器TrafficShapingController
TrafficShapingController rater = generateRater(rule);
rule.setRater(rater);
//获取资源名
K key = groupFunction.apply(rule);
if (key == null) {
continue;
}
//获取资源名对应的流控规则列表
Set flowRules = tmpMap.get(key);
//将规则放到Map里,和当前资源绑定
if (flowRules == null) {
// Use hash set here to remove duplicate rules.
flowRules = new HashSet<>();
tmpMap.put(key, flowRules);
}
flowRules.add(rule);
}
Comparator comparator = new FlowRuleComparator();
for (Entry> entries : tmpMap.entrySet()) {
List rules = new ArrayList<>(entries.getValue());
if (shouldSort) {
//Sort the rules.
Collections.sort(rules, comparator);
}
newRuleMap.put(entries.getKey(), rules);
}
return newRuleMap;
}
private static TrafficShapingController generateRater(/*@Valid*/ FlowRule rule) {
//判断只有当阈值类型为QPS时才生效
if (rule.getGrade() == RuleConstant.FLOW_GRADE_QPS) {
//根据流控效果选择不同的流量整形控制器TrafficShapingController
switch (rule.getControlBehavior()) {
case RuleConstant.CONTROL_BEHAVIOR_WARM_UP:
//Warm Up预热模式——冷启动模式
return new WarmUpController(rule.getCount(), rule.getWarmUpPeriodSec(), ColdFactorProperty.coldFactor);
case RuleConstant.CONTROL_BEHAVIOR_RATE_LIMITER:
//排队等待模式
return new RateLimiterController(rule.getMaxQueueingTimeMs(), rule.getCount());
case RuleConstant.CONTROL_BEHAVIOR_WARM_UP_RATE_LIMITER:
//Warm Up + 排队等待模式
return new WarmUpRateLimiterController(rule.getCount(), rule.getWarmUpPeriodSec(), rule.getMaxQueueingTimeMs(), ColdFactorProperty.coldFactor);
case RuleConstant.CONTROL_BEHAVIOR_DEFAULT:
//快速失败模式——Default默认模式
default:
// Default mode or unknown mode: default traffic shaping controller (fast-reject).
}
}
//默认模式:快速失败用的是DefaultController
return new DefaultController(rule.getCount(), rule.getGrade());
}
...
}
当FlowSlot的entry()方法对请求进行流控规则验证时,会调用规则检查器FlowRuleChecker的checkFlow()方法进行检查,最终会通过FlowRule的getRater()方法获取流控规则对应的流量整形控制器,然后调用TrafficShapingController的canPass()方法对请求进行检查。当流控效果为快速失败时,具体调用的其实就是DefaultController的canPass()方法。
@Spi(order = Constants.ORDER_FLOW_SLOT)
public class FlowSlot extends AbstractLinkedProcessorSlot {
private final FlowRuleChecker checker;
private final Function> ruleProvider = new Function>() {
@Override
public Collection apply(String resource) {
//Flow rule map should not be null.
Map> flowRules = FlowRuleManager.getFlowRuleMap();
return flowRules.get(resource);
}
};
...
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable {
//检查流控规则
checkFlow(resourceWrapper, context, node, count, prioritized);
fireEntry(context, resourceWrapper, node, count, prioritized, args);
}
void checkFlow(ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
//调用规则检查器FlowRuleChecker的checkFlow()方法进行检查
checker.checkFlow(ruleProvider, resource, context, node, count, prioritized);
}
@Override
public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) {
fireExit(context, resourceWrapper, count, args);
}
}
//Rule checker for flow control rules.
public class FlowRuleChecker {
public void checkFlow(Function> ruleProvider, ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
if (ruleProvider == null || resource == null) {
return;
}
//从Map中获取resource资源对应的流控规则列表
Collection rules = ruleProvider.apply(resource.getName());
if (rules != null) {
//循环遍历每一个流控规则
for (FlowRule rule : rules) {
//调用canPassCheck方法进行流控规则验证,判断此次请求是否命中针对resource资源配置的流控规则
if (!canPassCheck(rule, context, node, count, prioritized)) {
throw new FlowException(rule.getLimitApp(), rule);
}
}
}
}
public boolean canPassCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount) {
return canPassCheck(rule, context, node, acquireCount, false);
}
public boolean canPassCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) {
//参数校验
String limitApp = rule.getLimitApp();
if (limitApp == null) {
return true;
}
//如果是集群模式,则执行passClusterCheck()方法
if (rule.isClusterMode()) {
return passClusterCheck(rule, context, node, acquireCount, prioritized);
}
//如果是单机模式,则执行passLocalCheck()方法
return passLocalCheck(rule, context, node, acquireCount, prioritized);
}
private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) {
//选择Node作为限流计算的依据
Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
if (selectedNode == null) {
return true;
}
//先通过FlowRule.getRater()方法获取流控规则对应的流量整形控制器
//然后调用TrafficShapingController.canPass()方法对请求进行检查
return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
}
...
}
//Default throttling controller (immediately reject strategy).
public class DefaultController implements TrafficShapingController {
private static final int DEFAULT_AVG_USED_TOKENS = 0;
private double count;
private int grade;
public DefaultController(double count, int grade) {
this.count = count;
this.grade = grade;
}
@Override
public boolean canPass(Node node, int acquireCount) {
return canPass(node, acquireCount, false);
}
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
//获取当前请求数
int curCount = avgUsedTokens(node);
//如果当前请求数 + 1超出阈值,那么返回失败
if (curCount + acquireCount > count) {
//进行优先级逻辑处理
if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
long currentTime;
long waitInMs;
currentTime = TimeUtil.currentTimeMillis();
waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);
if (waitInMs < OccupyTimeoutProperty xss=removed xss=removed>
(2)流量整形控制器DefaultController执行分析
流控效果为快速失败时,会调用DefaultController的canPass()方法。
问题一:其中的入参Node是如何选择的
Node的选择主要依赖于三个参数:FlowRule的limitApp、FlowRule的strategy、Context的origin,这三个参数分别表示流控规则的针对来源、流控模式和当前请求的来源。
情形一:如果limitApp和origin相等并且limitApp不是默认值default
此时的流控规则便是针对特定调用方的
如果流控模式为直接,那么选择的Node是OriginNode
如果流控模式为关联,那么选择的Node是关联资源的ClusterNode
如果流控模式为链路,那么选择的Node是DefaultNode
情形二:limitApp值为默认的default
如果流控模式为直接,那么选择的Node是当前资源的ClusterNode
如果流控模式为关联,那么选择的Node是关联资源的ClusterNode
如果流控模式为链路,那么选择的Node是DefaultNode
情形三:limitApp值为other且origin与FlowRule.getResource()不同
如果流控模式为直接,那么选择的Node是OriginNode
如果流控模式为关联,那么选择的Node是关联资源的ClusterNode
如果流控模式为链路,那么选择的Node是DefaultNode
问题二:如何判断阈值类型是QPS还是线程
FlowRule规则类里有个名为grade的字段,代表着阈值类型。所以在初始化DefaultController时就会传入流控规则FlowRule的grade值,这样在DefaultController的avgUsedTokens()方法中,就可以根据grade字段的值判断出阈值类型是QPS还是线程数了。
问题三:其中的入参prioritized的作用是什么
DefaultController的canPass()的入参prioritized表示是否对请求设置优先级。当prioritized为true时,表示该请求具有较高优先级的请求。当prioritized为false时,表示该请求是普通请求。而高优先级的请求需要尽量保证其可以通过限流检查。不过一般情况下,prioritized的值默认为false,除非手动指定为true,毕竟限流的目的是不想让超出的流量通过。
(3)流控模式中的关联模式和链路模式说明
一.关联模式
关联流控模式中,可以将两个资源进行关联。当某个资源的流量超限时,可以触发其他资源的流控规则。
比如用户下单购物时会涉及下单资源和支付资源,如果支付资源达到流控阈值,那么应该要同时禁止下单,也就是通过支付资源来关联到下单资源。
注意:如果采取关联模式,那么设置的QPS阈值是被关联者的,而非关联者的。在如下图示中配置了QPS的阈值为3,这是针对testPay资源设置的,而不是针对testOrder资源设置的。也就是testOrder被流控的时机就是当testPay的QPS达到3的时候,3并不是testOrder所访问的次数,而是testPay这个接口被访问的次数。
二.链路模式
一个资源可能会被多个业务链路调用,不同的业务链路需要进行不同的流控,这时就可以使用链路模式。
下图便为testTrace资源创建了一条链路模式的流控规则,规则为QPS限制到1 ,且链路入口资源为/trace/test2。
这意味着:/trace/test1链路可以随便访问testTrace资源,不受任何限制。/trace/test2链路访问testTrace资源时会限制QPS为1,超出限制被流控。
3.FlowSlot实现流控规则中排队等待效果的原理
(1)实现排队等待流控效果的普通漏桶算法介绍
(2)RateLimiterController如何实现排队等待效果
(3)RateLimiterController如何判断是否超出阈值
(4)RateLimiterController如何实现排队等待
(5)RateLimiterController实现的漏桶算法与普通漏桶算法的区别
(6)RateLimiterController流控存在的问题
(1)实现排队等待流控效果的普通漏桶算法介绍
一.漏桶算法的基本原理
漏桶算法的核心思想是以固定速率流出。
假设有一个水桶,水桶底上有几个孔洞。因为孔洞个数和孔洞大小是固定的,因此水从水桶流出的速度是固定的。那么就会有以下两种情况:
情况一:往水桶注水的速度小于等于水从孔洞流出的速度,那么水桶中将不会有剩余的水,因为"消费大于等于生产"。
情况二:往水桶注水的速度大于水从孔洞流出的速度,那么随着时间推移,水桶会被装满,水会溢出,因为"生产大于消费"。
在请求 / 响应场景中,水桶可以被理解为系统处理请求的能力。水对应成请求,水桶对应成一个有限的缓冲区(请求队列)用于存储请求。那么水桶的容量就代表了系统能够处理的最大并发请求数,当水桶满时(请求队列达到上限),新请求将被拒绝,从而实现流量控制。
二.漏桶算法的处理流程
步骤一:当新的请求到达时,会将新的请求放入缓冲区(请求队列)中,类似于往水桶里注水。
步骤二:系统会以固定的速度处理缓冲区中的请求,类似于水从窟窿中以固定的速度流出,比如开启一个后台线程定时以固定的速度从缓冲区中取出请求然后进行分发处理。
步骤三:如果缓冲区已满,则新的请求将被拒绝或丢弃,类似于水溢出。
三.漏桶算法的主要特点
特点一:固定速率
水从桶底的孔洞中以固定速率流出,类似于网络中以固定速率发送数据包。但写入速度不固定,也就是请求不是匀速产生的。相当于生产者生产消息不固定,消费者消费消息是匀速消费的。
特点二:有限容量
桶的容量有限,当桶满时,新到达的水会溢出,即拒绝超过容量的请求。
特点三:先进先出(FIFO)
水按照先进先出的顺序从桶中流出,类似于请求的处理顺序。
四.漏桶算法的基本实现
应用场景一:假设有一个视频上传服务,为了确保服务器稳定,希望限制用户在1分钟内最多只能上传5个视频。此时可以使用漏桶算法,将每个视频上传请求视为一个"水滴"。桶的容量设为5个水滴,出水速率设为每分钟5个水滴。当用户上传速度超过限制时,多余的请求将被拒绝。
应用场景二:假设有一个向用户发送电子邮件的后台任务,为了确保邮件发送系统的稳定性,希望限制每秒钟最多发送10封邮件。此时可以使用漏桶算法来限制消费MQ的处理速率,将桶的容量设为10个水滴,出水速率设为每秒钟10个水滴。这样,邮件发送系统每秒最多只会处理10封电子邮件。
漏桶算法的代码实现如下:
public class RateLimiter {
//用于记录最后流出水滴的时间,以便计算流出的水滴数量
private static long lastOutTime = System.currentTimeMillis();
//流出速率(每秒10个水滴),即每秒允许流出的水滴数量(对应请求)
private static int outRate = 10;
//桶的最大容量是10个水滴,即桶能容纳的最大水量(对应请求)
private static int maxCapacity = 10;
//当前桶内的水滴数量(对应请求)
private static AtomicInteger currentWater = new AtomicInteger(0);
//调用isLimited()方法往漏桶中注入一滴水
//返回值说明:false:未受到限制;true:受到限制
public static synchronized boolean isLimited(long taskId, int turn) {
//1.如果当前桶是空的,则使用当前时间作为最后一次流出的时间,并将水量加1
if (currentWater.get() == 0) {
lastOutTime = System.currentTimeMillis();
currentWater.incrementAndGet();
return false;
}
//2.模拟补充水从漏桶中匀速流出的处理过程
//计算漏桶中已流出的水量
int leakedWater = (int) ((System.currentTimeMillis() - lastOutTime) / 1000) * outRate;
//计算漏桶中剩余的水量
int remainingWater = currentWater.get() - leakedWater;
//更新当前漏桶的水量
currentWater.set(Math.max(0, remainingWater));
//因为上面将计算出的剩余水量更新为漏桶的当前水量,所以要更新最后流出水滴的时间为当前时间戳
lastOutTime = System.currentTimeMillis();
//3.如果漏桶内的水量未满,则加水以及放行
if (currentWater.get() < maxCapacity>
流程图如下:
(2)RateLimiterController如何实现排队等待效果
当流控规则中指定的流控效果是排队等待时,对应的流量整形控制器是RateLimiterController。
当调用FlowSlot的entry()方法对请求进行流控规则验证时,最终会调用RateLimiterController的canPass()方法来对请求进行检查。
RateLimiterController实现排队等待的效果时使用了漏桶算法。既然使用了漏桶算法,那么就一定包含如下字段:
字段一:count,表示QPS阈值,即QPS超出多少后会进行限流。
字段二:latestPassedTime,表示最近允许请求通过的时间。有了这个参数,就能计算出当前请求最早的预期通过时间。
字段三:maxQueueingTimeMs,表示排队时的最大等待时间。
(3)RateLimiterController如何判断是否超出阈值
在RateLimiterController的canPass()方法中,为了判断是否超出QPS阈值,通过原子类变量latestPassedTime简化成单线程让请求先后通过的处理模型。为了尽量让业务不受Sentinel影响,采用预估请求的被处理时间点的方式。也就是无需等前面的请求完全被处理完,才确定后面的请求被处理的时间。因为在普通的漏桶算法中,是处理完一个请求,才从漏桶取出水滴。而RateLimiterController的漏桶算法,则是假设请求已经被通过了。
具体的判断逻辑如下:首先获取系统的当前时间currentTime。然后计算在满足流控规则中限制的QPS阈值count的情况下,先后的两个请求被允许通过时的最小时间间隔costTime。接着计算当前请求最早的预期通过时间expectedTime,也就是此次请求预计会在几时几分几秒内通过。最后比较expectedTime和currentTime就可知当前请求是否允许通过了。
一.如果expectedTime小于等于currentTime
也就是当前请求最早的预期通过时间比系统当前时间小。如果在此时(currentTime)通过当前请求,则当前请求的通过时间就比它最早的预期通过时间(expectedTime)要晚,即当前请求和最近通过的请求的时间间隔变大了,所以此时不会超QPS阈值。于是返回true允许通过,同时更新最近允许请求通过的时间戳为当前时间。
二.如果expectedTime大于currentTime
也就是当前请求最早的预期通过时间比系统当前时间大。如果在此时(currentTime)通过当前请求,则当前请求的通过时间就比它最早的预期通过时间(expectedTime)要早,即当前请求和最近通过的请求的时间间隔变小了,比最小间隔时间costTime还小,所以此时必然会超QPS阈值。因此返回进行等待或者返回false不允许通过,等待的最小时间就是:最近通过请求的时间 + 先后两个请求允许通过时的最小间隔时间 - 当前时间。
需要注意:Sentinel流量控制的漏桶算法,只能限制在costTime内的流量激增,限制不了costTime外的流量激增。比如系统启动完一瞬间就涌入大量并发请求,此时的流量激增限制不了。又比如系统处理完正常流量的最后一个请求,隔了costTime+的时间后,突然涌入超QPS阈值的并发请求,此时也限制不了这种情况的流量激增。但如果系统处理完正常流量的最后一个请求,隔了costTime-的时间后,突然涌入超QPS阈值的并发请求,此时则可以限制这种情况的流量激增。
public class RateLimiterController implements TrafficShapingController {
//排队等待的意思是超出阈值后等待一段时间,maxQueueingTimeMs就是请求在队列中的最大等待时间
private final int maxQueueingTimeMs;
//流控规则中限制QPS的阈值,也就是QPS超出多少后会进行限制
private final double count;
//最近允许一个请求通过的时间,每次请求通过后就会更新此时间,可以根据该时间计算出当前请求最早的预期通过时间
//注意:Sentinel是在业务前面的,尽量不要让业务受到Sentinel的影响,所以不需要等请求完全被处理完,才确定请求被通过的时间
private final AtomicLong latestPassedTime = new AtomicLong(-1);
public RateLimiterController(int timeOut, double count) {
this.maxQueueingTimeMs = timeOut;
this.count = count;
}
@Override
public boolean canPass(Node node, int acquireCount) {
return canPass(node, acquireCount, false);
}
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
//Pass when acquire count is less or equal than 0.
//acquireCount代表每次从桶底流出多少个请求
//如果acquireCount小于等于0,则表示无需限流直接通过,不过acquireCount一般默认是1
if (acquireCount <= 0) {
return true;
}
//Reject when count is less or equal than 0.
//Otherwise, the costTime will be max of long and waitTime will overflow in some cases.
//如果限流规则的count(即限制QPS的阈值)小于等于0,则直接拒绝,相当于一个请求也不能放行
if (count <= 0) {
return false;
}
//1.首先获取系统的当前时间
long currentTime = TimeUtil.currentTimeMillis();
//Calculate the interval between every two requests.
//2.然后计算,在满足流控规则中限制的QPS阈值count的情况下,先后的两个请求被允许通过时的最小间隔时间(假设请求是单线程处理的)
long costTime = Math.round(1.0 * (acquireCount) / count * 1000);
//Expected pass time of this request.
//3.接着计算当前请求最早的预期通过时间 = 满足QPS阈值下的两个请求的最小时间间隔 + 上次请求的通过时间
long expectedTime = costTime + latestPassedTime.get();
//4.最后判断当前请求最早的预期通过时间是否比系统当前时间小
if (expectedTime <= currentTime) {//等价于没有超出QPS阈值
//当前请求最早的预期通过时间比系统当前时间小
//如果在此时(currentTime)通过当前请求,那么当前请求的实际通过时间就比它最早的预期通过时间(expectedTime)要晚
//也就是当前请求和最近通过的请求的时间间隔变大了,所以此时不会超QPS阈值,返回true允许通过
//Contention may exist here, but it's okay.
//latestPassedTime并不会影响costTime,也就是说,多个线程可以并发执行到这里而不受阈值的影响
//这意味着,Sentinel流量控制的漏桶算法,只能限制在costTime时间内的流量激增,限制不了costTime时间外的流量激增
//比如系统启动完的那一瞬间就涌入超出QPS阈值的并发请求,此时的这种流量激增是限制不了的
//又比如系统正常运行时处理完了正常流量的最后一个请求,隔了costTime+的时间后,突然涌入超出QPS阈值的并发请求,此时也限制不了
//只能限制住这样的一种情况:系统正常运行处理完正常流量的最后一个请求,隔了costTime-的时间,突然涌入超出QPS阈值的并发请求
latestPassedTime.set(currentTime);
return true;
} else {//等价于超出了QPS阈值
//当前请求最早的预期通过时间比系统当前时间大
//如果在此时(currentTime)通过当前请求,那么当前请求的实际通过时间就比它最早的预期通过时间(expectedTime)要早
//也就是当前请求和最近通过的请求的时间间隔变小了,比最小间隔时间costTime还小
//所以此时必然会超QPS阈值,因此返回进行等待或者返回false不允许通过
//而等待的最小时间,就是最近通过请求的时间 + 先后两个请求允许通过时的最小间隔时间 - 当前时间
...
}
}
}
(4)RateLimiterController如何实现排队等待
一.实现排队等待效果的初版
首先计算当前请求预估的等待时间,用于判断是否超出最大等待时间。当前请求预估的等待时间 = 当前请求本来预期通过的时间 - 当前时间。
如果预估的等待时间超出流控规则设置的最大等待时间,则直接返回false,相当于桶内的水溢出了,处理不完,抛出异常。如果预估的等待时间没有超出流控规则设置的最大等待时间,则调用Thread.sleep()进行等待,睡眠时间就是预估的等待时间。
但是直接进行睡眠在并发场景下是无法实现排队等待效果的。因为如果多个并发请求因为在costTime内进入系统导致超QPS而等待时,canPass()方法对各个请求进行验证时,计算出来的waitTime可能都一样。这样睡眠了waitTime的时间后,等待的各个并发线程就会同时被唤醒。这样就没法实现排队等待的效果,也就是让这等待的请求按排队顺序唤醒。
public class RateLimiterController implements TrafficShapingController {
//排队等待的意思是超出阈值后等待一段时间,maxQueueingTimeMs就是请求在队列中的最大等待时间
private final int maxQueueingTimeMs;
//流控规则中限制QPS的阈值,也就是QPS超出多少后会进行限制
private final double count;
//最近允许一个请求通过的时间,每次请求通过后就会更新此时间,可以根据该时间计算出当前请求最早的预期通过时间
//注意:Sentinel是在业务前面的,尽量不要让业务受到Sentinel的影响,所以不需要等请求完全被处理完,才确定请求被通过的时间
private final AtomicLong latestPassedTime = new AtomicLong(-1);
public RateLimiterController(int timeOut, double count) {
this.maxQueueingTimeMs = timeOut;
this.count = count;
}
@Override
public boolean canPass(Node node, int acquireCount) {
return canPass(node, acquireCount, false);
}
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
if (acquireCount <= 0) {
return true;
}
if (count <= 0) {
return false;
}
//1.首先获取系统的当前时间
long currentTime = TimeUtil.currentTimeMillis();
//2.然后计算,在满足流控规则中限制的QPS阈值count的情况下,先后的两个请求被允许通过时的最小间隔时间(假设请求是单线程处理的)
long costTime = Math.round(1.0 * (acquireCount) / count * 1000);
//3.接着计算当前请求最早的预期通过时间 = 满足QPS阈值下的两个请求的最小时间间隔 + 上次请求的通过时间
long expectedTime = costTime + latestPassedTime.get();
//4.最后判断当前请求最早的预期通过时间是否比系统当前时间小
if (expectedTime <= currentTime) {//等价于没有超出QPS阈值
//当前请求最早的预期通过时间比系统当前时间小
//如果在此时(currentTime)通过当前请求,那么当前请求的实际通过时间就比它最早的预期通过时间(expectedTime)要晚
//也就是当前请求和最近通过的请求的时间间隔变大了,所以此时不会超QPS阈值,返回true允许通过
latestPassedTime.set(currentTime);
return true;
} else {//等价于超出了QPS阈值
//当前请求最早的预期通过时间比系统当前时间大
//如果在此时(currentTime)通过当前请求,那么当前请求的实际通过时间就比它最早的预期通过时间(expectedTime)要早
//也就是当前请求和最近通过的请求的时间间隔变小了,比最小间隔时间costTime还小
//所以此时必然会超QPS阈值,因此返回进行等待或者返回false不允许通过
//而等待的最小时间,就是最近通过请求的时间 + 先后两个请求允许通过时的最小间隔时间 - 当前时间
//计算当前请求预估的等待时间,用于判断是否超出流控规则设置的最大等待时间
long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis();
if (waitTime > maxQueueingTimeMs) {
//如果超出最大等待时间,则直接返回false
return false;
} else {
//如果按如下3行代码让当前请求进行等待,那么执行到此处的多个并发请求,具有的waitTime都是一样的
//这样睡眠了waitTime的时间后,等待的多个并发请求就会同时被唤醒
//这样就没法实现排队等待的效果,不能让这等待的请求按排队顺序唤醒
if (waitTime > 0) {
Thread.sleep(waitTime);
}
return true;
}
}
return false;
}
}
二.实现排队等待效果的优化版
为了避免等待的各个并发线程被同时唤醒,可通过原子变量latestPassedTime的addAndGet(costTime)方法 + 假设进行睡眠等待的请求已被通过,实现需要等待的并发请求进行睡眠等待的时间都不一样,从而实现并发请求排队等待的效果。
实现排队等待效果的核心逻辑:由于latestPassedTime的原子性,每个线程都会获得不一样的oldTime。接着根据oldTime - 当前时间,就可以得到每个线程需要睡眠等待的时间waitTime。此时的waitTime都将会不一样,从而避免并发线程同时被唤醒的情况。将latestPassedTime按costTime进行自增,其实相当于假设当前请求在不超过QPS阈值的情况下,被允许通过了。
public class RateLimiterController implements TrafficShapingController {
//排队等待的意思是超出阈值后等待一段时间,maxQueueingTimeMs就是请求在队列中的最大等待时间
private final int maxQueueingTimeMs;
//流控规则中限制QPS的阈值,也就是QPS超出多少后会进行限制
private final double count;
//最近允许一个请求通过的时间,每次请求通过后就会更新此时间,可以根据该时间计算出当前请求最早的预期通过时间
//注意:Sentinel是在业务前面的,尽量不要让业务受到Sentinel的影响,所以不需要等请求完全被处理完,才确定请求被通过的时间
private final AtomicLong latestPassedTime = new AtomicLong(-1);
public RateLimiterController(int timeOut, double count) {
this.maxQueueingTimeMs = timeOut;
this.count = count;
}
@Override
public boolean canPass(Node node, int acquireCount) {
return canPass(node, acquireCount, false);
}
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
//acquireCount代表每次从桶底流出多少个请求
//如果acquireCount小于等于0,则表示无需限流直接通过,不过acquireCount一般默认是1
if (acquireCount <= 0) {
return true;
}
//如果限流规则的count(即限制QPS的阈值)小于等于0,则直接拒绝,相当于一个请求也不能放行
if (count <= 0) {
return false;
}
//1.首先获取系统的当前时间
long currentTime = TimeUtil.currentTimeMillis();
//2.然后计算,在满足流控规则中限制的QPS阈值count的情况下,先后的两个请求被允许通过时的最小间隔时间(假设请求是单线程处理的)
long costTime = Math.round(1.0 * (acquireCount) / count * 1000);
//3.接着计算当前请求最早的预期通过时间 = 满足QPS阈值下的两个请求的最小时间间隔 + 上次请求的通过时间
long expectedTime = costTime + latestPassedTime.get();
//4.最后判断当前请求最早的预期通过时间是否比系统当前时间小
if (expectedTime <= currentTime) {//等价于没有超出QPS阈值
//当前请求最早的预期通过时间比系统当前时间小
//如果在此时(currentTime)通过当前请求,那么当前请求的实际通过时间就比它最早的预期通过时间(expectedTime)要晚
//也就是当前请求和最近通过的请求的时间间隔变大了,所以此时不会超QPS阈值,返回true允许通过
//由这里可知,latestPassedTime并不会影响costTime,也就是说,多个线程可以并发执行到这里而不受阈值的影响
//这意味着,Sentinel流量控制的漏桶算法,只能限制在costTime时间内的流量激增,限制不了costTime时间外的流量激增
//比如系统启动完的那一瞬间就涌入超出QPS阈值的并发请求,此时的这种流量激增是限制不了的;
//又比如系统正常运行时处理完了正常流量的最后一个请求,隔了costTime+的时间后,突然涌入超出QPS阈值的并发请求,此时也限制不了;
//只能限制住这样的一种情况:系统正常运行处理完正常流量的最后一个请求,隔了costTime-的时间,突然涌入超出QPS阈值的并发请求
latestPassedTime.set(currentTime);
return true;
} else {
//如果不是,即当前请求最早的预期通过时间比系统当前时间大
//则说明latestPassedTime.get()大了,也就是上一个可能由于QPS超出阈值的原因导致请求处理慢了,所以需要进行等待
//计算当前请求的等待时间,用于判断是否超出流控规则设置的最大等待时间
long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis();
if (waitTime > maxQueueingTimeMs) {
//如果超出最大等待时间,则直接返回false
return false;
} else {//等价于超出了QPS阈值
//当前请求最早的预期通过时间比系统当前时间大
//如果在此时(currentTime)通过当前请求,那么当前请求的实际通过时间就比它最早的预期通过时间(expectedTime)要早
//也就是当前请求和最近通过的请求的时间间隔变小了,比最小间隔时间costTime还小
//所以此时必然会超QPS阈值,因此返回进行等待或者返回false不允许通过
//而等待的最小时间,就是最近通过请求的时间 + 先后两个请求允许通过时的最小间隔时间 - 当前时间
//首先通过latestPassedTime这个原子变量的addAndGet()方法
//将最近通过请求的时间latestPassedTime,加上先后两次请求需要的最小间隔时间costTime,得到当前请求本来预期的通过时间
//注意:
//当多个并发线程执行到此处时,由于latestPassedTime的原子性,每个线程都会获得不一样的oldTime
//接着根据oldTime - 当前时间,就可以得到每个线程需要睡眠等待的时间waitTime
//此时的waitTime都将会不一样,从而避免并发线程同时被唤醒的情况
//将latestPassedTime进行自增,其实相当于假设当前请求在不超过QPS阈值的情况下,被允许通过了
long oldTime = latestPassedTime.addAndGet(costTime);
try {
//然后计算当前请求需要等待多久 = 当前请求最早的预期通过时间 - 当前系统时间
waitTime = oldTime - TimeUtil.currentTimeMillis();
//如果等待时间大于流控规则设置的最大等待时间,则需要回滚刚才更新的最近通过请求的时间
//也就是将latestPassedTime减去costTime,然后返回false表示请求无法通过
if (waitTime > maxQueueingTimeMs) {
//如果发现新计算的等待时间 大于 最大等待时间,则需要回滚latestPassedTime
latestPassedTime.addAndGet(-costTime);
return false;
}
//in race condition waitTime may <= 0
if (waitTime > 0) {
//当前请求需要进行等待
Thread.sleep(waitTime);
}
return true;
} catch (InterruptedException e) {
}
}
}
return false;
}
}
(5)RateLimiterController实现的漏桶算法与普通漏桶算法的区别
区别一:普通漏桶算法使用的是真实队列
它有个单独的字段去记录当前桶内的水量,也就是请求量。每通过一个请求,则该字段值-1。反之,每新进一个请求,此字段值+1。
区别二:RateLimiterController实现的漏桶算法使用的是虚拟队列
它没有单独的字段去记录当前桶内的请求量,而是根据最近通过请求的时间得出当前请求最早的预期通过时间来实现。其本质就是先假设当前请求可以通过,然后再按照先后请求在QPS阈值下可以允许通过时的最大时间间隔,来计算出当前请求最早的预期通过时间,再对比是否和当前发生冲突。
区别三:普通漏桶算法使用的策略是直接拒绝
如果流入速度大于流出速度,则直接拒绝。
区别四:RateLimiterController实现的漏桶算法使用的策略是排队等待
如果超出了阈值,则不会直接拒绝请求,而是会等待一段时间,只要在这段时间内能处理到这个请求就不会拒绝掉。
(6)RateLimiterController流控存在的问题
问题一:在costTime时间内出现流量激增才能限流
如果在costTime时间外,即最后一次请求通过的时间已经好久了,突然流量激增以及都并发进入系统,那么此时是无法限制住的。
问题二:Sentinel排队等待流控效果支持的QPS阈值不能超过1000
如果超过1000,且costTime计大于等于0.5,则会认为间隔时间都是1ms。如果costTime小于0.5,则认为配置失效,相当于没有配置此条流控规则。
long costTime = Math.round(1.0 * (acquireCount) / count * 1000);
long costTime = Math.round(1.0 * (1) / 1100 * 1000)约等于0.9ms;
默认情况下,acquireCount的值是1,那么:如果QPS阈值count在1000~2000,此时costTime = 1,限流不受阈值影响。如果QPS阈值count大于2000,此时costTime = 0,限流配置失效。
4.FlowSlot实现流控规则中Warm Up效果的原理
(1)令牌桶算法介绍
(2)WarmUpController使用的预热模型
(3)WarmUpController的初始化
(4)WarmUpController的限流原理
(1)令牌桶算法介绍
一.令牌桶算法的原理
二.令牌桶算法的具体流程
三.漏桶算法和令牌桶算法的对比
四.令牌桶算法实现流控的例子
五.令牌桶算法的特点
六.令牌桶算法和漏桶算法的核心区别
七.令牌桶算法的代码实现
一.令牌桶算法的原理
通过控制令牌的生成速率和消耗速率来实现平滑地限制请求的处理速率,同时允许一定程度的突发流量。
首先,令牌桶中预先存放一定数量的令牌,每个令牌代表一个请求单位。然后,令牌会以固定的速率持续生成,并放入令牌桶中。当令牌桶满时,多余的令牌将会被丢弃。最后,每当有新请求到来时,需要从令牌桶中取出一个令牌。如果令牌桶中有足够的令牌,请求被允许通过。如果令牌桶中没有足够的令牌,请求将被限流。通过这种方式,令牌桶算法实现了平滑地限制请求的处理速率。
二.令牌桶算法的具体流程
步骤一:初始化令牌桶,设置其容量和生成速率。
步骤二:当有新请求到来时,检查令牌桶中是否有足够的令牌。如果有足够的令牌,则允许请求通过,并从令牌桶中扣除相应数量令牌。如果没有足够的令牌,则拒绝请求。
步骤三:系统会以固定的速度添加令牌,直到达到令牌桶容量,比如开启一个后台线程以固定的速度向令牌桶中添加令牌。
三.漏桶算法和令牌桶算法的对比
漏桶算法是突然往桶里注水,但是漏水的窟窿是固定大小的,因此流出水的速度是固定的,也就是"生产不限速,消费限速"。
令牌桶算法是突然从桶中抽水,也就是固定大小的窟窿变成了入水口,而没桶盖的桶口变成了出水口。相当于入水速度变得固定了,而出水速度不做限制了,也就是"生产限速,消费不限速"。
四.令牌桶算法实现流控的例子
假设有一个 API,需要限制其每秒请求数量为100,那么也可以使用令牌桶算法来实现这个限制。首先初始化一个令牌桶,其容量为100,令牌的生成速率为每秒100个。这样当有新请求到来时,就可以从令牌桶中获取令牌,以控制请求速率。
如果在某个时间点有120个请求同时到达,由于令牌桶中只有100个令牌,因此只有前100个请求能获取令牌通过,而剩下20个请求将被限流拒绝。当然也可以选择不直接拒绝,而是等待一段时间。随着时间推移,令牌桶会以固定速率(每秒100个令牌)重新填充。因此,被限流的请求在之后的时间窗口内就有机会被允许通过。
五.令牌桶算法的特点
特点一:支持突发流量
令牌桶算法允许在限流内应对突发流量,有助于提高系统的响应能力。
特点二:平滑处理速率
和漏桶算法一样,令牌桶算法也可以平滑处理流量,避免处理速率突变。
六.令牌桶算法和漏桶算法的核心区别
突发流量指的是:假设令牌桶容量为10个令牌,每秒生成2个令牌。当用户开始请求时,令牌桶可能已积累了一定数量的令牌,例如5个。如果用户突然发起大量请求,这时可以消耗令牌桶中的所有令牌。由于令牌桶此时有5个令牌,所以用户可以立即发起5个请求而不受限制。随后用户的请求速度将受到令牌生成速率的限制,每秒最多发起2个请求。可能用户1分钟内没有请求,那么令牌桶随着时间推移会放满10个令牌。当1分钟后突然来一波请求,那么此时的瞬时流量可以获取到10个令牌。
令牌桶算法允许用户在短时间内发起更多请求,从而支持突发流量。漏桶算法只能支持每秒固定处理一定数量的请求,从而不支持突发流量。这就是令牌桶和漏桶的核心区别。
七.令牌桶算法的代码实现
核心代码是:用于填充令牌的refill()方法,以及获取令牌数的tryAcquire()方法。
首先,利用当前系统时间减去上一次生成令牌时间,计算出上一次生成令牌距现在已间隔了多少毫秒。
然后,根据这个间隔毫秒数和平均每秒生成的令牌数,计算出上一次令牌生成时间到现在这段时间内,应该生成多少令牌。
接着,如果计算出来的令牌数大于0,则执行更新操作。更新时会使用Math.min()方法,确保令牌桶内不会超出容量阈值。
最后,更新最后一次生成令牌的时间为当前时间,以便下一次填充令牌时可以正确计算时间间隔。
public class TokenBucket {
//令牌桶的容量
private final int capacity;
//令牌生成速度,也就是每秒产生多少个令牌
private final int tokensPerSecond;
//当前桶内的令牌数量
private final AtomicInteger currentTokens;
//最后一次令牌生成时间
private final AtomicLong lastRefillTime;
//初始化
public TokenBucket(int capacity, int tokensPerSecond) {
this.capacity = capacity;
this.tokensPerSecond = tokensPerSecond;
this.currentTokens = new AtomicInteger(capacity);
this.lastRefillTime = new AtomicLong(System.currentTimeMillis());
}
//true:放行;false:限流
public synchronized boolean tryAcquire(int tokens) {
//填充令牌
refill();
//规则判断
return currentTokens.addAndGet(-tokens) >= 0;
}
//填充令牌
private void refill() {
//获取当前系统时间
long currentTime = System.currentTimeMillis();
//用当前系统时间 - 上一次令牌生成时间 得出两次生成令牌需要间隔多久ms
long timeSinceLastRefill = currentTime - lastRefillTime.get();
//得出上一次令牌生成时间到现在这段时间内,应该生成多少令牌
int tokensToAdd = (int) (timeSinceLastRefill * tokensPerSecond / 1000);
if (tokensToAdd > 0) {
//更新当前令牌数
int newTokenCount = Math.min(capacity, currentTokens.get() + tokensToAdd);
currentTokens.set(newTokenCount);
//更新上一次令牌生成时间为当前系统时间
lastRefillTime.set(currentTime);
}
}
}
(2)WarmUpController使用的预热模型
Sentinel中的令牌桶算法,参考了Guava的RateLimiter。
//The principle idea comes from Guava.
//However, the calculation of Guava is rate-based, which means that we need to translate rate to QPS.
//这个原理来自于Guava;
//然而,Guava的计算是基于速率的,这意味着我们需要将速率转换为QPS;
//Requests arriving at the pulse may drag down long idle systems even though it has a much larger handling capability in stable period.
//It usually happens in scenarios that require extra time for initialization,
//e.g. DB establishes a connection, connects to a remote service, and so on.
//That's why we need "warm up".
//突发式的流量可能会拖累一个长期空闲的系统,即使这个系统在稳定阶段具有更大的流量处理能力;
//这通常发生在需要额外时间进行初始化的场景中,比如DB建立连接、连接到远程服务等;
//这就是为什么我们需要对系统进行"预热";
//Sentinel's "warm-up" implementation is based on the Guava's algorithm.
//However, Guava’s implementation focuses on adjusting the request interval, which is similar to leaky bucket.
//Sentinel pays more attention to controlling the count of incoming requests per second without calculating its interval,
//which resembles token bucket algorithm.
//Sentinel的"预热"实现是基于Guava的算法的;
//然而,Guava的实现侧重于调整请求间隔,这类似于漏桶;
//而Sentinel更注重控制每秒传入请求的数量,而不计算其间隔,这类似于令牌桶算法;
//The remaining tokens in the bucket is used to measure the system utility.
//Suppose a system can handle b requests per second.
//Every second b tokens will be added into the bucket until the bucket is full.
//And when system processes a request, it takes a token from the bucket.
//The more tokens left in the bucket, the lower the utilization of the system;
//when the token in the token bucket is above a certain threshold,
//we call it in a "saturation" state.
//桶中存储的令牌是用来测量系统的实用程序的;
//假设一个系统每秒可以处理b个请求;
//那么每秒就有b个令牌被添加到桶中,直到桶满为止;
//当系统处理一个请求时,就会从桶中获取一个令牌;
//桶中存储的令牌剩余得越多,那么就说明系统的利用率就越低;
//当令牌桶中的令牌数高于某个阈值时,我们称之为"饱和"状态;
//Base on Guava’s theory, there is a linear equation we can write this in the form
//y = m * x + b where y (a.k.a y(x)), or qps(q)),
//is our expected QPS given a saturated period (e.g. 3 minutes in),
//m is the rate of change from our cold (minimum) rate to our stable (maximum) rate,
//x (or q) is the occupied token.
//根据Guava的理论,有一个线性方程,我们可以把它写成y = m * x + b;
//这是在给定饱和周期(例如3分钟)的情况下预期的QPS;
//m是从我们的冷(最小)速率到我们的稳定(最大)速率的变化率;
//x(或q)就是需要被占用的令牌数;
public class WarmUpController implements TrafficShapingController {
...
...
}
WarmUpController基于的预热模型如下:
x轴表示当前令牌桶已存储的令牌数,y轴表示生成令牌的时间间隔,令牌的消费是从右往左的。
变量一:stableInterval
表示系统预热完成后,生成令牌的时间间隔。若QPS限制为100,则说明每10ms生成一个令牌。
变量二:coldInterval
表示系统水位最低时,生成令牌的时间间隔,与coldFactor有关。
变量三:coldFactor
冷却因子,表示倍数,即coldInterval是stableInterval的多少倍。
变量四:thresholdPermits
表示进入预热阶段的临界令牌数。当令牌桶中的令牌数量减少到临界值时,系统预热结束。当令牌桶中的令牌数量大于临界值时,系统进入冷启动模式。
变量五:maxPermits
表示令牌桶的容量。当令牌桶中的令牌数达到最大容量时,生成的令牌将被抛弃。
变量六:slope
表示斜率。用于计算当前令牌生成时的时间间隔,从而计算当前每秒能生成多少令牌。
变量七:warmUpPeriodSec
表示系统预热时间,也就是梯形的面积:
warmUpPeriodSec = 0.5 * (stableInterval + coldInterval) * (maxPermits - thresholdPermits)
在预热模型图中,梯形的面积 = (coldFactor - 1) * 长方形面积。
Guava中的预热是通过控制令牌的生成时间来实现的,Sentinel中的预热则是通过控制每秒通过的请求数来实现的。在Guava中,冷却因子coldFactor固定为3,已被写死。在Sentinel中,冷却因子coldFactor默认为3,可通过参数修改。
(3)WarmUpController的初始化
WarmUpController的初始化会调用WarmUpController的construct()方法,该方法会重点初始化三个属性的值:warningToken、maxToken、slope。
属性一:warningToken
告警值,大于告警值系统就进入预热阶段,小于告警值系统进入稳定阶段。计算公式如下:
warningToken
= warmUpPeriodSec / (coldFactor - 1) / stableInterval
= (warmUpPeriodSec * count) / (coldFactor - 1)
其中warmUpPeriodSec是构造方法中传入的预热时间,count是FlowRule中设定的QPS阈值,coldFactor默认是3。
属性二:maxToken
系统最冷时桶内存储的令牌数,即令牌桶可以存储的最大令牌数。计算公式如下:
maxToken
= warningToken + 2 * warmUpPeriodSec / (stableInterval + coldInterval)
= warningToken + 2 * warmUpPeriodSec * count / (coldFactor + 1)
属性三:slope
预热阶段令牌生成速率的增速。用于计算当前令牌生成时的时间间隔,从而计算当前每秒能生成多少令牌。计算公式如下:
slope
= (coldInterval - stableInterval) / (maxToken - warningToken)
= (coldFactor - 1) / count / (maxToken - warningToken)
注意:预热阶段生成令牌的速率会越来越慢,也就是生成令牌的间隔越来越大。当桶内已存储的令牌超过告警值后,令牌越多,那1秒可允许的QPS越小。
public class WarmUpController implements TrafficShapingController {
//count是QPS阈值,即FlowRule中设定的阈值,表示系统在稳定阶段下允许的最大QPS
//在预热阶段,系统允许的QPS不会直接到达count值,而是会逐渐增加(对应预热模型图从右向左),直到达到这个count值为止
//这样就能实现让系统接收到的流量是一个平滑上升的状态,而不是让系统瞬间被打满
protected double count;
//coldFactor是冷却因子,表示系统在最冷时(预热阶段刚开始时)允许的QPS阈值与稳定阶段下允许的QPS阈值之比
//此参数直接影响预热阶段允许的QPS递增值,冷却因子越大,预热阶段允许的QPS递增值越低,默认为3
private int coldFactor;
//告警值,大于告警值系统就进入预热阶段,小于告警值系统进入稳定阶段
protected int warningToken = 0;
//令牌桶可以存储的最大令牌数
private int maxToken;
//斜率,预热阶段令牌生成速率的增速
protected double slope;
//令牌桶中已存储的令牌数
protected AtomicLong storedTokens = new AtomicLong(0);
//最后一次添加令牌的时间戳
protected AtomicLong lastFilledTime = new AtomicLong(0);
public WarmUpController(double count, int warmUpPeriodInSec, int coldFactor) {
construct(count, warmUpPeriodInSec, coldFactor);
}
public WarmUpController(double count, int warmUpPeriodInSec) {
//warmUpPeriodInSec是预热时长,表示系统需要多长时间从预热阶段到稳定阶段
//比如限制QPS为100,设置预热时长为10s,那么在预热阶段,令牌生成的速率会越来越快
//可能第1s只允许10个请求通过,第2s可能允许15个请求通过,这样逐步递增,直至递增到100为止
construct(count, warmUpPeriodInSec, 3);
}
private void construct(double count, int warmUpPeriodInSec, int coldFactor) {
if (coldFactor <= 1) {
throw new IllegalArgumentException("Cold factor should be larger than 1");
}
this.count = count;
this.coldFactor = coldFactor;
//thresholdPermits = 0.5 * warmupPeriodMicros / stableIntervalMicros;
//1.告警值,大于告警值系统就进入预热阶段;例如预热时长为5s,QPS为100,那么warningToken就为250
warningToken = (int)(warmUpPeriodInSec * count) / (coldFactor - 1);
//maxPermits = thresholdPermits + 2 * warmupPeriodMicros / (stableIntervalMicros + coldIntervalMicros);
//2.系统最冷时桶内存储的令牌数,例如预热时长为5s,QPS为100,那么maxToken为500
maxToken = warningToken + (int)(2 * warmUpPeriodInSec * count / (1.0 + coldFactor));
//slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits - thresholdPermits);
//3.slope斜率,例如预热时长为5s,QPS为100,那么slope为0.00008
slope = (coldFactor - 1.0) / count / (maxToken - warningToken);
}
...
}
假如FlowRule设定的QPS阈值count为100(1s内QPS阈值为100),设置的预热时间warmUpPeriodSec为5s,冷却因子coldFactor为3,那么WarmUpController初始化后的属性如下:
stableInterval = 1 / count = 10ms
coldInterval = coldFactor * stableInterval = 30ms
warningToken
= warmUpPeriodSec / (coldFactor - 1) / stableInterval
= (warmUpPeriodSec * count) / (coldFactor - 1) = 250
maxToken
= warningToken + 2 * warmUpPeriodSec / (stableInterval + coldInterval)
= warningToken + 2 * warmUpPeriodSec * count / (coldFactor + 1) = 500
slope
= (coldInterval - stableInterval) / (maxToken - warningToken)
= (coldFactor - 1) / count / (maxToken - warningToken) = 0.00008
(4)WarmUpController的限流原理
一.WarmUpController.canPass()方法
二.WarmUpController.syncToken()方法
三.WarmUpController.coolDownTokens()方法
一.WarmUpController.canPass()方法
步骤一:调用WarmUpController的syncToken()方法生成令牌并同步到令牌桶内
步骤二:判断令牌桶内剩余令牌数是否大于告警值
情况一:如果剩余令牌数大于警戒值,说明系统处于预热阶段,此时需要进一步比较令牌的生产速率与令牌的消耗速率。若消耗速率大,则限流,否则请求正常通行。
情况二:如果剩余令牌数小于警戒值,说明系统处于稳定阶段。此时就直接判断当前请求的QPS与阈值大小,超过阈值则限流。
二.WarmUpController.syncToken()方法
该方法会生成令牌并同步到令牌桶内。其中入参passQps是前一个时间窗口的QPS,即上一秒通过的QPS数。首先验证当前时间与最后更新时间,避免在同一时间窗口重复添加令牌。其次通过WarmUpController的coolDownTokens()方法获取最新的令牌数,接着利用CAS来保证更新令牌桶的线程安全性,最后通过减去上一秒通过的QPS数得到目前令牌桶剩余的令牌数来进行更新。
三.WarmUpController.coolDownTokens()方法
该方法会根据当前时间和上一个时间窗口通过的QPS计算更新后的令牌数。具体来说就是,首先获取当前令牌桶已存储的令牌数,然后判断桶内令牌数和告警值的大小。
情况一:如果令牌桶中已存储的令牌数小于告警值
说明系统已结束冷启动,即退出预热阶段进入了稳定阶段。也就是桶内已存储的令牌数没达到进入预热阶段的阈值,此时需要较快地向令牌桶中添加令牌。
情况二:如果令牌桶中已存储的令牌数大于告警值
说明系统处于预热阶段,还在进行冷启动。此时如果上一个时间窗口通过的QPS,小于系统最冷时允许通过的QPS。那么就说明当前系统的负载比较低,可以向令牌桶中添加令牌。系统最冷时允许通过的QPS = (1 / (1 / count * coldFactor))。
其中,向令牌桶中添加令牌的处理,就是在当前令牌数量的基础上,加上从上次添加令牌到现在经过的时间乘以QPS阈值。
注意:Guava中的预热是通过控制令牌的生成时间来实现的,Sentinel中的预热是通过控制每秒通过的请求数来实现的。
Guava的实现侧重于调整请求间隔,这类似于漏桶算法。而Sentinel更注重控制每秒传入请求的数量,而不计算其间隔,这类似于令牌桶算法。
//The principle idea comes from Guava.
//However, the calculation of Guava is rate-based, which means that we need to translate rate to QPS.
//这个原理来自于Guava;
//然而,Guava的计算是基于速率的,这意味着我们需要将速率转换为QPS;
//Requests arriving at the pulse may drag down long idle systems even though it has a much larger handling capability in stable period.
//It usually happens in scenarios that require extra time for initialization,
//e.g. DB establishes a connection, connects to a remote service, and so on.
//That's why we need "warm up".
//突发式的流量可能会拖累一个长期空闲的系统,即使这个系统在稳定阶段具有更大的流量处理能力;
//这通常发生在需要额外时间进行初始化的场景中,比如DB建立连接、连接到远程服务等;
//这就是为什么我们需要对系统进行"预热";
//Sentinel's "warm-up" implementation is based on the Guava's algorithm.
//However, Guava’s implementation focuses on adjusting the request interval, which is similar to leaky bucket.
//Sentinel pays more attention to controlling the count of incoming requests per second without calculating its interval,
//which resembles token bucket algorithm.
//Sentinel的"预热"实现是基于Guava的算法的;
//然而,Guava的实现侧重于调整请求间隔,这类似于漏桶;
//而Sentinel更注重控制每秒传入请求的数量,而不计算其间隔,这类似于令牌桶算法;
//The remaining tokens in the bucket is used to measure the system utility.
//Suppose a system can handle b requests per second.
//Every second b tokens will be added into the bucket until the bucket is full.
//And when system processes a request, it takes a token from the bucket.
//The more tokens left in the bucket, the lower the utilization of the system;
//when the token in the token bucket is above a certain threshold,
//we call it in a "saturation" state.
//桶中存储的令牌是用来测量系统的实用程序的;
//假设一个系统每秒可以处理b个请求;
//那么每秒就有b个令牌被添加到桶中,直到桶满为止;
//当系统处理一个请求时,就会从桶中获取一个令牌;
//桶中存储的令牌剩余得越多,那么就说明系统的利用率就越低;
//当令牌桶中的令牌数高于某个阈值时,我们称之为"饱和"状态;
//Base on Guava’s theory, there is a linear equation we can write this in the form
//y = m * x + b where y (a.k.a y(x)), or qps(q)),
//is our expected QPS given a saturated period (e.g. 3 minutes in),
//m is the rate of change from our cold (minimum) rate to our stable (maximum) rate,
//x (or q) is the occupied token.
//根据Guava的理论,有一个线性方程,我们可以把它写成y = m * x + b;
//这是在给定饱和周期(例如3分钟)的情况下预期的QPS;
//m是从我们的冷(最小)速率到我们的稳定(最大)速率的变化率;
//x(或q)就是需要被占用的令牌数;
public class WarmUpController implements TrafficShapingController {
...
@Override
public boolean canPass(Node node, int acquireCount) {
return canPass(node, acquireCount, false);
}
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
//获取当前1s的QPS
long passQps = (long) node.passQps();
//获取上一窗口通过的QPS
long previousQps = (long) node.previousPassQps();
//1.生成令牌并同步到令牌桶内
syncToken(previousQps);
//获取令牌桶内剩余的令牌数
long restToken = storedTokens.get();
//2.如果令牌桶中的令牌数量大于告警值,说明还处于预热阶段,此时需要判断令牌的生成速度和消费速度
if (restToken >= warningToken) {
//获取桶内剩余令牌数超过告警值的令牌个数
long aboveToken = restToken - warningToken;
//当前令牌的生成间隔 = 稳定阶段的生成间隔 + 桶内超出告警值部分的已存储令牌数 * slope
//其中,稳定阶段的生成间隔是1/count,桶内超出告警值部分的已存储令牌数是aboveToken
//注意:预热阶段生成令牌的速率会越来越慢,也就是生成令牌的间隔越来越大;
//当桶内已存储的令牌超过告警值后,令牌越多,那1秒可允许的QPS越小;
//下面代码计算的是:
//当前1s内的时间窗口能够生成的令牌数量,即当前时间窗口生成的令牌可满足的QPS = 1 / 当前令牌的生成间隔
double warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));
//如果当前消费令牌的速度(passQps + acquireCount) <= 当前生成令牌的速度(warningQps),则允许通过
//如果当前时间窗口通过的QPS + 客户端申请的令牌数 小于等于 当前预热阶段的告警QPS,则代表允许通过
if (passQps + acquireCount <= warningQps) {
return true;
}
}
//3.如果令牌桶中的令牌数量小于告警值,说明预热结束,进入稳定阶段
else {
//如果当前消费令牌的速度(passQps + acquireCount) <= 当前生成令牌的速度(count),则允许通过
if (passQps + acquireCount <= count) {
return true;
}
}
return false;
}
//生成令牌并同步到令牌桶内
//入参passQps是前一个时间窗口的QPS,也就是上一秒通过的QPS数
//syncToken()方法的逻辑是:
//1.首先验证当前时间与最后更新令牌桶的时间,避免在同一个时间窗口重复添加令牌;
//2.其次通过WarmUpController.coolDownTokens()方法获取最新的令牌数;
//3.接着利用CAS来保证更新令牌桶的线程安全性;
//4.最后将桶内已存储的令牌数,减去上一秒通过的QPS数,得到目前令牌桶剩余的令牌数;
protected void syncToken(long passQps) {
//获取当前时间ms
long currentTime = TimeUtil.currentTimeMillis();
//将当前时间ms转换为s
currentTime = currentTime - currentTime 00;
//获取上一次更新令牌桶已存储的令牌数量的时间
long oldLastFillTime = lastFilledTime.get();
//如果上一次更新令牌桶已存储的令牌数量的时间和当前时间一样,或发生了时钟回拨等情况导致比当前时间还小
//那么就无需更新,直接return即可
if (currentTime <= oldLastFillTime) {
return;
}
//先获取目前令牌桶已存储的令牌数
long oldValue = storedTokens.get();
//调用WarmUpController.coolDownTokens()方法得到最新的令牌数
long newValue = coolDownTokens(currentTime, passQps);
//通过CAS更新令牌桶已存储的令牌数
//注意:系统初始化完毕,第一个请求进来调用WarmUpController.canPass()方法时,storedTokens = maxToken
if (storedTokens.compareAndSet(oldValue, newValue)) {
//设置令牌桶内已存储的最新令牌数 = 当前令牌数 - 上一个时间窗口通过的请求数
long currentValue = storedTokens.addAndGet(0 - passQps);
if (currentValue < 0 xss=removed xss=removed xss=removed> warningToken) {
//如果上一个时间窗口通过的QPS,小于系统最冷时允许通过的QPS(1 / (1 / count * coldFactor))
//那么就说明当前系统的负载比较低,可以向令牌桶中添加令牌
if (passQps < (int)count / coldFactor) {
//在当前令牌数量的基础上,加上从上次添加令牌到现在经过的时间(以秒为单位)乘以令牌生成速率(QPS阈值count)
newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
}
}
//确保令牌桶更新后的令牌数不超过最大令牌数(maxToken)
//系统初始化完毕,第一个请求进来调用WarmUpController.canPass()方法时,
//oldValue = 0,lastFilledTime = 0,此时返回maxToken
return Math.min(newValue, maxToken);
}
}
(5)总结
WarmUpController的核心原理是:首先根据当前时间和上一个时间窗口通过的QPS同步令牌桶内的令牌数,然后比较桶内令牌数和告警值来计算当前时间窗口允许通过的告警QPS,最后比较当前请求下的QPS是否大于允许通过的告警QPS来决定限流。
注意:系统在预热阶段会逐渐提高令牌的生成速度,从而平滑过渡到稳定阶段。当系统启动时,桶内令牌数最大,令牌生成速率最低,允许的QPS最低。随着桶内令牌数减少,令牌生成速度逐渐提高,允许的QPS也逐渐提高。最后到达稳定阶段,此时允许的QPS便是FlowRule中设置的QPS阈值。
所以根据稳定阶段令牌的生成速率是1/count,默认冷却因子为3,得出系统最冷时令牌的生成速率是3/count。因此预热阶段一开始允许的QPS为count/3,预热完毕的QPS就是count。