AI全套:Python3+TensorFlow打造人脸识别智能小程序
最新人工智能资料-Google工程师亲授 Tensorflow-入门到进阶
黑马头条项目 - Java Springboot2.0(视频、资料、代码和讲义)14天完整版
作者:aneasystone 来源:aneasystone.com/archives/2020/08/spring-cloud-gateway-current-limiting.html
一、常见的限流场景 二、常见的限流算法 2.1 固定窗口算法(Fixed Window) 2.2 滑动窗口算法(Rolling Window 或 Sliding Window) 2.3 漏桶算法(Leaky Bucket) 2.4 令牌桶算法(Token Bucket) 三、一些开源项目 四、在网关中实现限流 总结 参考
Built on Spring Framework 5, Project Reactor and Spring Boot 2.0 Able to match routes on any request attribute Predicates and filters are specific to routes Hystrix Circuit Breaker integration Spring Cloud DiscoveryClient integration Easy to write Predicates and Filters Request Rate Limiting Path Rewriting
一、常见的限流场景
1.1 限流的对象
通过上面的介绍,我们对限流的概念可能感觉还是比较模糊,到底限流限的是什么?顾名思义,限流就是限制流量,但这里的流量是一个比较笼统的概念。如果考虑各种不同的场景,限流是非常复杂的,而且和具体的业务规则密切相关,可以考虑如下几种常见的场景:
从上面的例子可以看出,根据不同的请求者和请求资源,可以组合出不同的限流规则。可以根据请求者的 IP 来进行限流,或者根据请求对应的用户来限流,又或者根据某个特定的请求参数来限流。而限流的对象可以是请求的频率,传输的速率,或者并发量等,其中最常见的两个限流对象是请求频率和并发量,他们对应的限流被称为 请求频率限流(Request rate limiting)和 并发量限流(Concurrent requests limiting)。传输速率限流 在下载场景下比较常用,比如一些资源下载站会限制普通用户的下载速度,只有购买会员才能提速,这种限流的做法实际上和请求频率限流类似,只不过一个限制的是请求量的多少,一个限制的是请求数据报文的大小。这篇文章主要介绍请求频率限流和并发量限流。
1.2 限流的处理方式
拒绝服务 排队等待 服务降级
1.3 限流的架构
针对不同的系统架构,需要使用不同的限流方案。如下图所示,服务部署的方式一般可以分为单机模式和集群模式:
二、常见的限流算法
2.1 固定窗口算法(Fixed Window)
不过这个算法的缺陷也比较明显,那就是存在严重的临界问题。由于每过一个时间窗口,计数器就会清零,这使得限流效果不够平滑,恶意用户可以利用这个特点绕过我们的限流规则。如下图所示,我们的限流条件本来是每分钟 5 次,但是恶意用户在 11:00:00 ~ 11:00:59 这个时间窗口的后半分钟发起 5 次请求,接下来又在 11:01:00 ~ 11:01:59 这个时间窗口的前半分钟发起 5 次请求,这样我们的系统就在 1 分钟内承受了 10 次请求。
2.2 滑动窗口算法(Rolling Window 或 Sliding Window)
2.3 漏桶算法(Leaky Bucket)
当请求到达时,不直接处理请求,而是将其放入一个队列,然后另一个线程以固定的速率从队列中读取请求并处理,从而达到限流的目的。注意的是这个队列可以有不同的实现方式,比如设置请求的存活时间,或将队列改造成 PriorityQueue,根据请求的优先级排序而不是先进先出。当然队列也有满的时候,如果队列已经满了,那么请求只能被丢弃了。漏桶算法有一个缺陷,在处理突发流量时效率很低,于是人们又想出了下面的令牌桶算法。
2.4 令牌桶算法(Token Bucket)
令牌桶算法(Token Bucket)是目前应用最广泛的一种限流算法,它的基本思想由两部分组成:生成令牌 和 消费令牌。
令牌桶算法的图示如下:
public class TokenBucket {
private final long capacity;
private final double refillTokensPerOneMillis;
private double availableTokens;
private long lastRefillTimestamp;
public TokenBucket(long capacity, long refillTokens, long refillPeriodMillis) {
this.capacity = capacity;
this.refillTokensPerOneMillis = (double) refillTokens / (double) refillPeriodMillis;
this.availableTokens = capacity;
this.lastRefillTimestamp = System.currentTimeMillis();
}
synchronized public boolean tryConsume(int numberTokens) {
refill();
if (availableTokens < numberTokens) {
return false;
} else {
availableTokens -= numberTokens;
return true;
}
}
private void refill() {
long currentTimeMillis = System.currentTimeMillis();
if (currentTimeMillis > lastRefillTimestamp) {
long millisSinceLastRefill = currentTimeMillis - lastRefillTimestamp;
double refill = millisSinceLastRefill * refillTokensPerOneMillis;
this.availableTokens = Math.min(capacity, availableTokens + refill);
this.lastRefillTimestamp = currentTimeMillis;
}
}
}
可以像下面这样创建一个令牌桶(桶大小为 100,且每秒生成 100 个令牌):
TokenBucket limiter = new TokenBucket(100, 100, 1000);
三、一些开源项目
3.1 Guava 的 RateLimiter
RateLimiter limiter = RateLimiter.create(5);
System.out.println(limiter.acquire());
System.out.println(limiter.acquire());
System.out.println(limiter.acquire());
System.out.println(limiter.acquire());
输出结果如下:
0.0
0.198239
0.196083
0.200609
RateLimiter limiter = RateLimiter.create(5);
System.out.println(limiter.acquire(10));
System.out.println(limiter.acquire(1));
System.out.println(limiter.acquire(1));
0.0
1.997428
0.192273
0.200616
RateLimiter limiter = RateLimiter.create(2, 3, TimeUnit.SECONDS);
System.out.println(limiter.acquire(1));
System.out.println(limiter.acquire(1));
System.out.println(limiter.acquire(1));
System.out.println(limiter.acquire(1));
System.out.println(limiter.acquire(1));
0.0
1.329289
0.994375
0.662888
0.501287
3.2 Bucket4j
Bucket4j是一个基于令牌桶算法实现的强大的限流库,它不仅支持单机限流,还支持通过诸如 Hazelcast、Ignite、Coherence、Infinispan 或其他兼容 JCache API (JSR 107) 规范的分布式缓存实现分布式限流。
在使用 Bucket4j 之前,我们有必要先了解 Bucket4j 中的几个核心概念:
Bucket Bandwidth Refill
Bucket bucket = Bucket4j.builder().addLimit(limit).build();
if(bucket.tryConsume(1)) {
System.out.println("ok");
} else {
System.out.println("error");
}
Bandwidth limit = Bandwidth.simple(10, Duration.ofMinutes(1));
simple方式桶大小和填充速度是一样的,classic 方式更灵活一点,可以自定义填充速度,下面的例子表示桶大小为 10,填充速度为每分钟 5 个令牌:
Refill filler = Refill.greedy(5, Duration.ofMinutes(1));
Bandwidth limit = Bandwidth.classic(10, filler);
Refill filler = Refill.intervally(5, Duration.ofMinutes(1));
所谓间隔策略指的是每隔一段时间,一次性的填充所有令牌,比如上面的例子,会每隔一分钟,填充 5 个令牌,如下所示:
而贪婪策略会尽可能贪婪的填充令牌,同样是上面的例子,会将一分钟划分成 5 个更小的时间单元,每隔 12 秒,填充 1 个令牌,如下所示:
3.3 Resilience4j
// 创建一个 Bulkhead,最大并发量为 150
BulkheadConfig bulkheadConfig = BulkheadConfig.custom()
.maxConcurrentCalls(150)
.maxWaitTime(100)
.build();
Bulkhead bulkhead = Bulkhead.of("backendName", bulkheadConfig);
// 创建一个 RateLimiter,每秒允许一次请求
RateLimiterConfig rateLimiterConfig = RateLimiterConfig.custom()
.timeoutDuration(Duration.ofMillis(100))
.limitRefreshPeriod(Duration.ofSeconds(1))
.limitForPeriod(1)
.build();
RateLimiter rateLimiter = RateLimiter.of("backendName", rateLimiterConfig);
// 使用 Bulkhead 和 RateLimiter 装饰业务逻辑
Supplier<String> supplier = () -> backendService.doSomething();
Supplier<String> decoratedSupplier = Decorators.ofSupplier(supplier)
.withBulkhead(bulkhead)
.withRateLimiter(rateLimiter)
.decorate();
// 调用业务逻辑
Try<String> try = Try.ofSupplier(decoratedSupplier);
assertThat(try.isSuccess()).isTrue();
3.4 其他
https://github.com/mokies/ratelimitj https://github.com/wangzheng0822/ratelimiter4j https://github.com/wukq/rate-limiter https://github.com/marcosbarbero/spring-cloud-zuul-ratelimit https://github.com/onblog/SnowJena https://gitee.com/zhanghaiyang/spring-boot-starter-current-limiting https://github.com/Netflix/concurrency-limits
四、在网关中实现限流
在文章一开始介绍 Spring Cloud Gateway 的特性时,我们注意到其中有一条 Request Rate Limiting,说明网关自带了限流的功能,但是 Spring Cloud Gateway 自带的限流有很多限制,譬如不支持单机限流,不支持并发量限流,而且它的请求频率限流也是不尽人意,这些都需要我们自己动手来解决。
4.1 实现单机请求频率限流
Spring Cloud Gateway 中定义了关于限流的一个接口 RateLimiter,如下:
public interface RateLimiter<C> extends StatefulConfigurable<C> {
Mono<RateLimiter.Response> isAllowed(String routeId, String id);
}
这个接口就一个方法 isAllowed,第一个参数 routeId 表示请求路由的 ID,根据 routeId 可以获取限流相关的配置,第二个参数 id 表示要限流的对象的唯一标识,可以是用户名,也可以是 IP,或者其他的可以从 ServerWebExchange 中得到的信息。我们看下 RequestRateLimiterGatewayFilterFactory 中对 isAllowed 的调用逻辑:
@Override
public GatewayFilter apply(Config config) {
// 从配置中得到 KeyResolver
KeyResolver resolver = getOrDefault(config.keyResolver, defaultKeyResolver);
// 从配置中得到 RateLimiter
RateLimiter<Object> limiter = getOrDefault(config.rateLimiter,
defaultRateLimiter);
boolean denyEmpty = getOrDefault(config.denyEmptyKey, this.denyEmptyKey);
HttpStatusHolder emptyKeyStatus = HttpStatusHolder
.parse(getOrDefault(config.emptyKeyStatus, this.emptyKeyStatusCode));
return (exchange, chain) -> resolver.resolve(exchange).defaultIfEmpty(EMPTY_KEY)
.flatMap(key -> {
// 通过 KeyResolver 得到 key,作为唯一标识 id 传入 isAllowed() 方法
if (EMPTY_KEY.equals(key)) {
if (denyEmpty) {
setResponseStatus(exchange, emptyKeyStatus);
return exchange.getResponse().setComplete();
}
return chain.filter(exchange);
}
// 获取当前路由 ID,作为 routeId 参数传入 isAllowed() 方法
String routeId = config.getRouteId();
if (routeId == null) {
Route route = exchange
.getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR);
routeId = route.getId();
}
return limiter.isAllowed(routeId, key).flatMap(response -> {
for (Map.Entry<String, String> header : response.getHeaders()
.entrySet()) {
exchange.getResponse().getHeaders().add(header.getKey(),
header.getValue());
}
// 请求允许,直接走到下一个 filter
if (response.isAllowed()) {
return chain.filter(exchange);
}
// 请求被限流,返回设置的 HTTP 状态码(默认是 429)
setResponseStatus(exchange, config.getStatusCode());
return exchange.getResponse().setComplete();
});
});
}
从上面的的逻辑可以看出,通过实现 KeyResolver 接口的 resolve 方法就可以自定义要限流的对象了。
public interface KeyResolver {
Mono<String> resolve(ServerWebExchange exchange);
}
比如下面的 HostAddrKeyResolver 可以根据 IP 来限流:
public interface KeyResolver {
Mono<String> resolve(ServerWebExchange exchange);
}
比如下面的 HostAddrKeyResolver 可以根据 IP 来限流:
public class HostAddrKeyResolver implements KeyResolver {
@Override
public Mono<String> resolve(ServerWebExchange exchange) {
return Mono.just(exchange.getRequest().getRemoteAddress().getAddress().getHostAddress());
}
}
我们继续看 Spring Cloud Gateway 的代码发现,RateLimiter 接口只提供了一个实现类 RedisRateLimiter:
很显然是基于 Redis 实现的限流,虽说通过 Redis 也可以实现单机限流,但是总感觉有些大材小用,而且对于那些没有 Redis 的环境很不友好。所以,我们要实现真正的本地限流。
public Mono<Response> isAllowed(String routeId, String id) {
Config routeConfig = loadConfiguration(routeId);
// How many requests per second do you want a user to be allowed to do?
int replenishRate = routeConfig.getReplenishRate();
// How many seconds for a token refresh?
int refreshPeriod = routeConfig.getRefreshPeriod();
// How many tokens are requested per request?
int requestedTokens = routeConfig.getRequestedTokens();
final io.github.resilience4j.ratelimiter.RateLimiter rateLimiter = RateLimiterRegistry
.ofDefaults()
.rateLimiter(id, createRateLimiterConfig(refreshPeriod, replenishRate));
final boolean allowed = rateLimiter.acquirePermission(requestedTokens);
final Long tokensLeft = (long) rateLimiter.getMetrics().getAvailablePermissions();
Response response = new Response(allowed, getHeaders(routeConfig, tokensLeft));
return Mono.just(response);
}
有意思的是,这个类 还有一个早期版本,是基于 Bucket4j 实现的:
public Mono<Response> isAllowed(String routeId, String id) {
Config routeConfig = loadConfiguration(routeId);
// How many requests per second do you want a user to be allowed to do?
int replenishRate = routeConfig.getReplenishRate();
// How much bursting do you want to allow?
int burstCapacity = routeConfig.getBurstCapacity();
// How many tokens are requested per request?
int requestedTokens = routeConfig.getRequestedTokens();
final Bucket bucket = bucketMap.computeIfAbsent(id,
(key) -> createBucket(replenishRate, burstCapacity));
final boolean allowed = bucket.tryConsume(requestedTokens);
Response response = new Response(allowed,
getHeaders(routeConfig, bucket.getAvailableTokens()));
return Mono.just(response);
}
实现方式都是类似的,在上面对 Bucket4j 和 Resilience4j 已经作了比较详细的介绍,这里不再赘述。不过从这里也可以看出 Spring 生态圈对 Resilience4j 是比较看好的,我们也可以将其引入到我们的项目中。
4.2 实现分布式请求频率限流
上面介绍了如何实现单机请求频率限流,接下来再看下分布式请求频率限流。这个就比较简单了,因为上面说了,Spring Cloud Gateway 自带了一个限流实现,就是 RedisRateLimiter,可以用于分布式限流。它的实现原理依然是基于令牌桶算法的,不过实现逻辑是放在一段 lua 脚本中的,我们可以在 src/main/resources/META-INF/scripts 目录下找到该脚本文件 request_rate_limiter.lua:
local tokens_key = KEYS[1]
local timestamp_key = KEYS[2]
local rate = tonumber(ARGV[1])
local capacity = tonumber(ARGV[2])
local now = tonumber(ARGV[3])
local requested = tonumber(ARGV[4])
local fill_time = capacity/rate
local ttl = math.floor(fill_time*2)
local last_tokens = tonumber(redis.call("get", tokens_key))
if last_tokens == nil then
last_tokens = capacity
end
local last_refreshed = tonumber(redis.call("get", timestamp_key))
if last_refreshed == nil then
last_refreshed = 0
end
local delta = math.max(0, now-last_refreshed)
local filled_tokens = math.min(capacity, last_tokens+(delta*rate))
local allowed = filled_tokens >= requested
local new_tokens = filled_tokens
local allowed_num = 0
if allowed then
new_tokens = filled_tokens - requested
allowed_num = 1
end
if ttl > 0 then
redis.call("setex", tokens_key, ttl, new_tokens)
redis.call("setex", timestamp_key, ttl, now)
end
return { allowed_num, new_tokens }
有两种方式来配置 Spring Cloud Gateway 自带的限流。第一种方式是通过配置文件,比如下面所示的代码,可以对某个 route 进行限流:
spring:
cloud:
gateway:
routes:
- id: test
uri: http://httpbin.org:80/get
filters:
- name: RequestRateLimiter
args:
key-resolver: '#{@hostAddrKeyResolver}'
redis-rate-limiter.replenishRate: 1
redis-rate-limiter.burstCapacity: 3
@Bean
public RouteLocator myRoutes(RouteLocatorBuilder builder) {
return builder.routes()
.route(p -> p
.path("/get")
.filters(filter -> filter.requestRateLimiter()
.rateLimiter(RedisRateLimiter.class, rl -> rl.setBurstCapacity(3).setReplenishRate(1)).and())
.uri("http://httpbin.org:80"))
.build();
}
4.3 实现单机并发量限流
public class SemaphoreTest {
private static ExecutorService threadPool = Executors.newFixedThreadPool(100);
private static Semaphore semaphore = new Semaphore(10);
public static void main(String[] args) {
for (int i = 0; i < 100; i++) {
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
semaphore.acquire();
System.out.println("Request processing ...");
semaphore.release();
} catch (InterruptedException e) {
e.printStack();
}
}
});
}
threadPool.shutdown();
}
}
public class AtomicLongTest {
private static ExecutorService threadPool = Executors.newFixedThreadPool(100);
private static AtomicLong atomic = new AtomicLong();
public static void main(String[] args) {
for (int i = 0; i < 100; i++) {
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
if(atomic.incrementAndGet() > 10) {
System.out.println("Request rejected ...");
return;
}
System.out.println("Request processing ...");
atomic.decrementAndGet();
} catch (InterruptedException e) {
e.printStack();
}
}
});
}
threadPool.shutdown();
}
}
semaphore.acquire();
System.out.println("Request processing ...");
semaphore.release();
想象一下如果在处理请求时出现异常了会怎么样?很显然,信号量被该线程获取了,但是却永远不会释放,如果请求异常多了,这将导致信号量被占满,最后一个请求也进不来。在单机场景下,这个问题可以很容易解决,加一个 finally 就行了:
try {
semaphore.acquire();
System.out.println("Request processing ...");
} catch (InterruptedException e) {
e.printStack();
} finally {
semaphore.release();
}
请求结束时,直接在 Redis 中当前时间窗口减一即可,就算是负数也没关系。请求列表中的该请求不用急着删除,可以打上结束标记,在迁移线程中统一删除(当然,如果请求的开始时间和结束时间在同一个窗口,可以直接删除); 迁移的时间间隔要小于时间窗口,一般设置为 30s; Redis 中的 key 一定要设置 TTL,时间至少为 2 个时间窗口,一般设置为 3 分钟; 迁移过程涉及到“从上一个时间窗口减”和“在当前时间窗口加”两个操作,要注意操作的原子性; 获取当前并发量可以通过 MGET 一次性读取两个时间窗口的值,不用 GET 两次; 获取并发量和判断并发量是否超限,这个过程也要注意操作的原子性。
总结
参考
微服务网关实战——Spring Cloud Gateway 《亿级流量网站架构核心技术》张开涛 聊聊高并发系统之限流特技 架构师成长之路之限流 微服务接口限流的设计与思考 常用4种限流算法介绍及比较 来谈谈限流-从概念到实现 高并发下的限流分析 计数器算法 基于Redis的限流系统的设计 API 调用次数限制实现 Techniques to Improve QoS An alternative approach to rate limiting Scaling your API with rate limiters Brief overview of token-bucket algorithm Rate limiting Spring MVC endpoints with bucket4j Rate Limiter Internals in Resilience4j 高可用框架Resilience4j使用指南 阿里巴巴开源限流系统 Sentinel 全解析 spring cloud gateway 之限流篇 服务容错模式 你的API会自适应「弹性」限流吗?
全栈架构社区交流群
「全栈架构社区」建立了读者架构师交流群,大家可以添加小编微信进行加群。欢迎有想法、乐于分享的朋友们一起交流学习。
看完本文有收获?请转发分享给更多人
往期资源: