實戰(zhàn)?Spring?Cloud?Gateway?之限流篇
來源:https://www.aneasystone.com/archives/2020/08/spring-cloud-gateway-current-limiting.html
話說在 Spring Cloud Gateway 問世之前,Spring Cloud 的微服務(wù)世界里,網(wǎng)關(guān)一定非 Netflix Zuul 莫屬。但是由于 Zuul 1.x 存在的一些問題,比如阻塞式的 API,不支持 WebSocket 等,一直被人所詬病,而且 Zuul 升級新版本依賴于 Netflix 公司,經(jīng)過幾次跳票之后,Spring 開源社區(qū)決定推出自己的網(wǎng)關(guān)組件,替代 Netflix Zuul。從 18 年 6 月 Spring Cloud 發(fā)布的 Finchley 版本開始,Spring Cloud Gateway 逐漸嶄露頭角,它基于 Spring 5.0、Spring Boot 2.0 和 Project Reactor 等技術(shù)開發(fā),不僅支持響應(yīng)式和無阻塞式的 API,而且支持 WebSocket,和 Spring 框架緊密集成。盡管 Zuul 后來也推出了 2.x 版本,在底層使用了異步無阻塞式的 API,大大改善了其性能,但是目前看來 Spring 并沒有打算繼續(xù)集成它的計劃。根據(jù)官網(wǎng)的描述,Spring Cloud Gateway 的主要特性如下:
- Built on Spring Framework 5, Project Reactor and Spring Boot 2.0
- Able to match routes on any request attribute
- Predicates and filters are specific to routes
- Hystrix Circuit Breaker integration
- Spring Cloud DiscoveryClient integration
- Easy to write Predicates and Filters
- Request Rate Limiting
- Path Rewriting
一、常見的限流場景
緩存、降級?和?限流?被稱為高并發(fā)、分布式系統(tǒng)的三駕馬車,網(wǎng)關(guān)作為整個分布式系統(tǒng)中的第一道關(guān)卡,限流功能自然必不可少。通過限流,可以控制服務(wù)請求的速率,從而提高系統(tǒng)應(yīng)對突發(fā)大流量的能力,讓系統(tǒng)更具彈性。限流有著很多實際的應(yīng)用場景,比如雙十一的秒殺活動, 12306 的搶票等。1.1 限流的對象
通過上面的介紹,我們對限流的概念可能感覺還是比較模糊,到底限流限的是什么?顧名思義,限流就是限制流量,但這里的流量是一個比較籠統(tǒng)的概念。如果考慮各種不同的場景,限流是非常復(fù)雜的,而且和具體的業(yè)務(wù)規(guī)則密切相關(guān),可以考慮如下幾種常見的場景:- 限制某個接口一分鐘內(nèi)最多請求 100 次
- 限制某個用戶的下載速度最多 100KB/S
- 限制某個用戶同時只能對某個接口發(fā)起 5 路請求
- 限制某個 IP 來源禁止訪問任何請求
1.2 限流的處理方式
在系統(tǒng)中設(shè)計限流方案時,有一個問題值得設(shè)計者去仔細(xì)考慮,當(dāng)請求者被限流規(guī)則攔截之后,我們該如何返回結(jié)果。一般我們有下面三種限流的處理方式:- 拒絕服務(wù)
- 排隊等待
- 服務(wù)降級
1.3 限流的架構(gòu)
針對不同的系統(tǒng)架構(gòu),需要使用不同的限流方案。如下圖所示,服務(wù)部署的方式一般可以分為單機模式和集群模式:


二、常見的限流算法
通過上面的學(xué)習(xí),我們知道限流可以分為請求頻率限流和并發(fā)量限流,根據(jù)系統(tǒng)架構(gòu)的不同,又可以分為網(wǎng)關(guān)層限流和分布式限流。在不同的應(yīng)用場景下,我們需要采用不同的限流算法。這一節(jié)將介紹一些主流的限流算法。有一點要注意的是,利用池化技術(shù)也可以達(dá)到限流的目的,比如線程池或連接池,但這不是本文的重點。2.1 固定窗口算法(Fixed Window)
固定窗口算法是一種最簡單的限流算法,它根據(jù)限流的條件,將請求時間映射到一個時間窗口,再使用計數(shù)器累加訪問次數(shù)。譬如限流條件為每分鐘 5 次,那么就按照分鐘為單位映射時間窗口,假設(shè)一個請求時間為 11:00:45,時間窗口就是 11:00:00 ~ 11:00:59,在這個時間窗口內(nèi)設(shè)定一個計數(shù)器,每來一個請求計數(shù)器加一,當(dāng)這個時間窗口的計數(shù)器超過 5 時,就觸發(fā)限流條件。當(dāng)請求時間落在下一個時間窗口內(nèi)時(11:01:00 ~ 11:01:59),上一個窗口的計數(shù)器失效,當(dāng)前的計數(shù)器清零,重新開始計數(shù)。計數(shù)器算法非常容易實現(xiàn),在單機場景下可以使用 AtomicLong、LongAdder 或 Semaphore 來實現(xiàn)計數(shù),而在分布式場景下可以通過 Redis 的 INCR 和 EXPIRE 等命令并結(jié)合 EVAL 或 lua 腳本來實現(xiàn),Redis 官網(wǎng)提供了幾種簡單的實現(xiàn)方式。無論是請求頻率限流還是并發(fā)量限流都可以使用這個算法。不過這個算法的缺陷也比較明顯,那就是存在嚴(yán)重的臨界問題。由于每過一個時間窗口,計數(shù)器就會清零,這使得限流效果不夠平滑,惡意用戶可以利用這個特點繞過我們的限流規(guī)則。如下圖所示,我們的限流條件本來是每分鐘 5 次,但是惡意用戶在 11:00:00 ~ 11:00:59 這個時間窗口的后半分鐘發(fā)起 5 次請求,接下來又在 11:01:00 ~ 11:01:59 這個時間窗口的前半分鐘發(fā)起 5 次請求,這樣我們的系統(tǒng)就在 1 分鐘內(nèi)承受了 10 次請求。
2.2 滑動窗口算法(Rolling Window 或 Sliding Window)
為了解決固定窗口算法的臨界問題,可以將時間窗口劃分成更小的時間窗口,然后隨著時間的滑動刪除相應(yīng)的小窗口,而不是直接滑過一個大窗口,這就是滑動窗口算法。我們?yōu)槊總€小時間窗口都設(shè)置一個計數(shù)器,大時間窗口的總請求次數(shù)就是每個小時間窗口的計數(shù)器的和。如下圖所示,我們的時間窗口是 5 秒,可以按秒進(jìn)行劃分,將其劃分成 5 個小窗口,時間每過一秒,時間窗口就滑過一秒:
2.3 漏桶算法(Leaky Bucket)
除了計數(shù)器算法,另一個很自然的限流思路是將所有的請求緩存到一個隊列中,然后按某個固定的速度慢慢處理,這其實就是漏桶算法(Leaky Bucket)。漏桶算法假設(shè)將請求裝到一個桶中,桶的容量為 M,當(dāng)桶滿時,請求被丟棄。在桶的底部有一個洞,桶中的請求像水一樣按固定的速度(每秒 r 個)漏出來。我們用下面這個形象的圖來表示漏桶算法:

2.4 令牌桶算法(Token Bucket)
令牌桶算法(Token Bucket)是目前應(yīng)用最廣泛的一種限流算法,它的基本思想由兩部分組成:生成令牌?和?消費令牌。- 生成令牌:假設(shè)有一個裝令牌的桶,最多能裝 M 個,然后按某個固定的速度(每秒 r 個)往桶中放入令牌,桶滿時不再放入;
- 消費令牌:我們的每次請求都需要從桶中拿一個令牌才能放行,當(dāng)桶中沒有令牌時即觸發(fā)限流,這時可以將請求放入一個緩沖隊列中排隊等待,或者直接拒絕;

2
3????private?final?long?capacity;
4????private?final?double?refillTokensPerOneMillis;
5????private?double?availableTokens;
6????private?long?lastRefillTimestamp;
7
8????public?TokenBucket(long?capacity,?long?refillTokens,?long?refillPeriodMillis)?{
9????????this.capacity?=?capacity;
10????????this.refillTokensPerOneMillis?=?(double)?refillTokens?/?(double)?refillPeriodMillis;
11????????this.availableTokens?=?capacity;
12????????this.lastRefillTimestamp?=?System.currentTimeMillis();
13????}
14
15????synchronized?public?boolean?tryConsume(int?numberTokens)?{
16????????refill();
17????????if?(availableTokens?18????????????return?false;
19????????}?else?{
20????????????availableTokens?-=?numberTokens;
21????????????return?true;
22????????}
23????}
24
25????private?void?refill()?{
26????????long?currentTimeMillis?=?System.currentTimeMillis();
27????????if?(currentTimeMillis?>?lastRefillTimestamp)?{
28????????????long?millisSinceLastRefill?=?currentTimeMillis?-?lastRefillTimestamp;
29????????????double?refill?=?millisSinceLastRefill?*?refillTokensPerOneMillis;
30????????????this.availableTokens?=?Math.min(capacity,?availableTokens? ?refill);
31????????????this.lastRefillTimestamp?=?currentTimeMillis;
32????????}
33????}
34}
可以像下面這樣創(chuàng)建一個令牌桶(桶大小為 100,且每秒生成 100 個令牌):1TokenBucket?limiter?=?new?TokenBucket(100,?100,?1000);
從上面的代碼片段可以看出,令牌桶算法的實現(xiàn)非常簡單也非常高效,僅僅通過幾個變量的運算就實現(xiàn)了完整的限流功能。核心邏輯在于 refill()?這個方法,在每次消費令牌時,計算當(dāng)前時間和上一次填充的時間差,并根據(jù)填充速度計算出應(yīng)該填充多少令牌。在重新填充令牌后,再判斷請求的令牌數(shù)是否足夠,如果不夠,返回 false,如果足夠,則減去令牌數(shù),并返回 true。在實際的應(yīng)用中,往往不會直接使用這種原始的令牌桶算法,一般會在它的基礎(chǔ)上作一些改進(jìn),比如,填充速率支持動態(tài)調(diào)整,令牌總數(shù)支持透支,基于 Redis 支持分布式限流等,不過總體來說還是符合令牌桶算法的整體框架,我們在后面學(xué)習(xí)一些開源項目時對此會有更深的體會。
三、一些開源項目
有很多開源項目中都實現(xiàn)了限流的功能,這一節(jié)通過一些開源項目的學(xué)習(xí),了解限流是如何實現(xiàn)的。3.1 Guava 的 RateLimiter
Google Guava 是一個強大的核心庫,包含了很多有用的工具類,例如:集合、緩存、并發(fā)庫、字符串處理、I/O 等等。其中在并發(fā)庫中,Guava 提供了兩個和限流相關(guān)的類:RateLimiter 和 SmoothRateLimiter。Guava 的 RateLimiter 基于令牌桶算法實現(xiàn),不過在傳統(tǒng)的令牌桶算法基礎(chǔ)上做了點改進(jìn),支持兩種不同的限流方式:平滑突發(fā)限流(SmoothBursty)?和?平滑預(yù)熱限流(SmoothWarmingUp)。下面的方法可以創(chuàng)建一個平滑突發(fā)限流器(SmoothBursty):1RateLimiter?limiter?=?RateLimiter.create(5);
RateLimiter.create(5)?表示這個限流器容量為 5,并且每秒生成 5 個令牌,也就是每隔 200 毫秒生成一個。我們可以使用 limiter.acquire()?消費令牌,如果桶中令牌足夠,返回 0,如果令牌不足,則阻塞等待,并返回等待的時間。我們連續(xù)請求幾次:1System.out.println(limiter.acquire());
2System.out.println(limiter.acquire());
3System.out.println(limiter.acquire());
4System.out.println(limiter.acquire());
輸出結(jié)果如下:10.0
20.198239
30.196083
40.200609
可以看出限流器創(chuàng)建之后,初始會有一個令牌,然后每隔 200 毫秒生成一個令牌,所以第一次請求直接返回 0,后面的請求都會阻塞大約 200 毫秒。另外,SmoothBursty 還具有應(yīng)對突發(fā)的能力,而且?還允許消費未來的令牌,比如下面的例子:1RateLimiter?limiter?=?RateLimiter.create(5);
2System.out.println(limiter.acquire(10));
3System.out.println(limiter.acquire(1));
4System.out.println(limiter.acquire(1));
會得到類似下面的輸出:10.0
21.997428
30.192273
40.200616
限流器創(chuàng)建之后,初始令牌只有一個,但是我們請求 10 個令牌竟然也通過了,只不過看后面請求發(fā)現(xiàn),第二次請求花了 2 秒左右的時間把前面的透支的令牌給補上了。Guava 支持的另一種限流方式是平滑預(yù)熱限流器(SmoothWarmingUp),可以通過下面的方法創(chuàng)建:
1RateLimiter?limiter?=?RateLimiter.create(2,?3,?TimeUnit.SECONDS);
2System.out.println(limiter.acquire(1));
3System.out.println(limiter.acquire(1));
4System.out.println(limiter.acquire(1));
5System.out.println(limiter.acquire(1));
6System.out.println(limiter.acquire(1));
第一個參數(shù)還是每秒創(chuàng)建的令牌數(shù)量,這里是每秒 2 個,也就是每 500 毫秒生成一個,后面的參數(shù)表示從冷啟動速率過渡到平均速率的時間間隔,也就是所謂的熱身時間間隔(warm up period)。我們看下輸出結(jié)果:10.0
21.329289
30.994375
40.662888
50.501287
第一個請求還是立即得到令牌,但是后面的請求和上面平滑突發(fā)限流就完全不一樣了,按理來說 500 毫秒就會生成一個令牌,但是我們發(fā)現(xiàn)第二個請求卻等了 1.3s,而不是 0.5s,后面第三個和第四個請求也等了一段時間。不過可以看出,等待時間在慢慢的接近 0.5s,直到第五個請求等待時間才開始變得正常。從第一個請求到第五個請求,這中間的時間間隔就是熱身階段,可以算出熱身的時間就是我們設(shè)置的 3 秒。3.2 Bucket4j
Bucket4j是一個基于令牌桶算法實現(xiàn)的強大的限流庫,它不僅支持單機限流,還支持通過諸如 Hazelcast、Ignite、Coherence、Infinispan 或其他兼容 JCache API (JSR 107)?規(guī)范的分布式緩存實現(xiàn)分布式限流。在使用 Bucket4j 之前,我們有必要先了解 Bucket4j 中的幾個核心概念:- Bucket
- Bandwidth
- Refill
1Bucket?bucket?=?Bucket4j.builder().addLimit(limit).build();
2if(bucket.tryConsume(1))?{
3????System.out.println("ok");
4}?else?{
5????System.out.println("error");
6}
Bandwidth 的意思是帶寬, 可以理解為限流的規(guī)則。Bucket4j 提供了兩種方法來創(chuàng)建 Bandwidth:simple 和 classic。下面是 simple 方式創(chuàng)建的 Bandwidth,表示桶大小為 10,填充速度為每分鐘 10 個令牌:1Bandwidth?limit?=?Bandwidth.simple(10,?Duration.ofMinutes(1));
simple方式桶大小和填充速度是一樣的,classic 方式更靈活一點,可以自定義填充速度,下面的例子表示桶大小為 10,填充速度為每分鐘 5 個令牌:1Refill?filler?=?Refill.greedy(5,?Duration.ofMinutes(1));
2Bandwidth?limit?=?Bandwidth.classic(10,?filler);
其中,Refill 用于填充令牌桶,可以通過它定義填充速度,Bucket4j 有兩種填充令牌的策略:間隔策略(intervally)?和?貪婪策略(greedy)。在上面的例子中我們使用的是貪婪策略,如果使用間隔策略可以像下面這樣創(chuàng)建 Refill:1Refill?filler?=?Refill.intervally(5,?Duration.ofMinutes(1));
所謂間隔策略指的是每隔一段時間,一次性的填充所有令牌,比如上面的例子,會每隔一分鐘,填充 5 個令牌,如下所示:

- 基于令牌桶算法
- 高性能,無鎖實現(xiàn)
- 不存在精度問題,所有計算都是基于整型的
- 支持通過符合 JCache API 規(guī)范的分布式緩存系統(tǒng)實現(xiàn)分布式限流
- 支持為每個 Bucket 設(shè)置多個 Bandwidth
- 支持同步和異步 API
- 支持可插拔的監(jiān)聽 API,用于集成監(jiān)控和日志
- 不僅可以用于限流,還可以用于簡單的調(diào)度
3.3 Resilience4j
Resilience4j 是一款輕量級、易使用的高可用框架。用過 Spring Cloud 早期版本的同學(xué)肯定都聽過 Netflix Hystrix,Resilience4j 的設(shè)計靈感就來自于它。自從 Hystrix 停止維護(hù)之后,官方也推薦大家使用 Resilience4j 來代替 Hystrix。
1//?創(chuàng)建一個?Bulkhead,最大并發(fā)量為?150
2BulkheadConfig?bulkheadConfig?=?BulkheadConfig.custom()
3????.maxConcurrentCalls(150)
4????.maxWaitTime(100)
5????.build();
6Bulkhead?bulkhead?=?Bulkhead.of("backendName",?bulkheadConfig);
7
8//?創(chuàng)建一個?RateLimiter,每秒允許一次請求
9RateLimiterConfig?rateLimiterConfig?=?RateLimiterConfig.custom()
10????.timeoutDuration(Duration.ofMillis(100))
11????.limitRefreshPeriod(Duration.ofSeconds(1))
12????.limitForPeriod(1)
13????.build();
14RateLimiter?rateLimiter?=?RateLimiter.of("backendName",?rateLimiterConfig);
15
16//?使用?Bulkhead?和?RateLimiter?裝飾業(yè)務(wù)邏輯
17Supplier?supplier?=?()?->?backendService.doSomething();
18Supplier?decoratedSupplier?=?Decorators.ofSupplier(supplier)
19??.withBulkhead(bulkhead)
20??.withRateLimiter(rateLimiter)
21??.decorate();
22
23//?調(diào)用業(yè)務(wù)邏輯
24Try?try?=?Try.ofSupplier(decoratedSupplier);
25assertThat(try.isSuccess()).isTrue();
Resilience4j 在功能特性上比 Bucket4j 強大不少,而且還支持并發(fā)量限流。不過最大的遺憾是,Resilience4j 不支持分布式限流。3.4 其他
網(wǎng)上還有很多限流相關(guān)的開源項目,不可能一一介紹,這里列出來的只是冰山之一角:- https://github.com/mokies/ratelimitj
- https://github.com/wangzheng0822/ratelimiter4j
- https://github.com/wukq/rate-limiter
- https://github.com/marcosbarbero/spring-cloud-zuul-ratelimit
- https://github.com/onblog/SnowJena
- https://gitee.com/zhanghaiyang/spring-boot-starter-current-limiting
- https://github.com/Netflix/concurrency-limits
四、在網(wǎng)關(guān)中實現(xiàn)限流
在文章一開始介紹 Spring Cloud Gateway 的特性時,我們注意到其中有一條 Request Rate Limiting,說明網(wǎng)關(guān)自帶了限流的功能,但是 Spring Cloud Gateway 自帶的限流有很多限制,譬如不支持單機限流,不支持并發(fā)量限流,而且它的請求頻率限流也是不盡人意,這些都需要我們自己動手來解決。4.1 實現(xiàn)單機請求頻率限流
Spring Cloud Gateway 中定義了關(guān)于限流的一個接口 RateLimiter,如下:1public?interface?RateLimiter<C>?extends?StatefulConfigurable<C>?{
2????Mono?isAllowed(String?routeId,?String?id);
3}
這個接口就一個方法 isAllowed,第一個參數(shù) routeId 表示請求路由的 ID,根據(jù) routeId 可以獲取限流相關(guān)的配置,第二個參數(shù) id 表示要限流的對象的唯一標(biāo)識,可以是用戶名,也可以是 IP,或者其他的可以從 ServerWebExchange 中得到的信息。我們看下 RequestRateLimiterGatewayFilterFactory 中對 isAllowed 的調(diào)用邏輯: 1@Override
2public?GatewayFilter?apply(Config?config)?{
3????//?從配置中得到?KeyResolver
4????KeyResolver?resolver?=?getOrDefault(config.keyResolver,?defaultKeyResolver);
5????//?從配置中得到?RateLimiter
6????RateLimiter
從上面的的邏輯可以看出,通過實現(xiàn) KeyResolver 接口的 resolve 方法就可以自定義要限流的對象了。1public?interface?KeyResolver?{
2????Mono?resolve(ServerWebExchange?exchange) ;
3}
比如下面的?HostAddrKeyResolver?可以根據(jù) IP 來限流: 1public?interface?KeyResolver?{
2????Mono?resolve(ServerWebExchange?exchange) ;
3}
4比如下面的 HostAddrKeyResolver 可以根據(jù) IP 來限流:
5public?class?HostAddrKeyResolver?implements?KeyResolver?{
6????@Override
7????public?Mono?resolve(ServerWebExchange?exchange)? {
8????????return?Mono.just(exchange.getRequest().getRemoteAddress().getAddress().getHostAddress());
9????}
10}
我們繼續(xù)看 Spring Cloud Gateway 的代碼發(fā)現(xiàn),RateLimiter 接口只提供了一個實現(xiàn)類 RedisRateLimiter:
1public?Mono?isAllowed(String?routeId,?String?id)? {
2????Config?routeConfig?=?loadConfiguration(routeId);
3
4????//?How?many?requests?per?second?do?you?want?a?user?to?be?allowed?to?do?
5????int?replenishRate?=?routeConfig.getReplenishRate();
6
7????//?How?many?seconds?for?a?token?refresh?
8????int?refreshPeriod?=?routeConfig.getRefreshPeriod();
9
10????//?How?many?tokens?are?requested?per?request?
11????int?requestedTokens?=?routeConfig.getRequestedTokens();
12
13????final?io.github.resilience4j.ratelimiter.RateLimiter?rateLimiter?=?RateLimiterRegistry
14????????????.ofDefaults()
15????????????.rateLimiter(id,?createRateLimiterConfig(refreshPeriod,?replenishRate));
16
17????final?boolean?allowed?=?rateLimiter.acquirePermission(requestedTokens);
18????final?Long?tokensLeft?=?(long)?rateLimiter.getMetrics().getAvailablePermissions();
19
20????Response?response?=?new?Response(allowed,?getHeaders(routeConfig,?tokensLeft));
21????return?Mono.just(response);
22}
有意思的是,這個類?還有一個早期版本,是基于 Bucket4j 實現(xiàn)的: 1public?Mono?isAllowed(String?routeId,?String?id)? {
2
3????Config?routeConfig?=?loadConfiguration(routeId);
4
5????//?How?many?requests?per?second?do?you?want?a?user?to?be?allowed?to?do?
6????int?replenishRate?=?routeConfig.getReplenishRate();
7
8????//?How?much?bursting?do?you?want?to?allow?
9????int?burstCapacity?=?routeConfig.getBurstCapacity();
10
11????//?How?many?tokens?are?requested?per?request?
12????int?requestedTokens?=?routeConfig.getRequestedTokens();
13
14????final?Bucket?bucket?=?bucketMap.computeIfAbsent(id,
15????????????(key)?->?createBucket(replenishRate,?burstCapacity));
16
17????final?boolean?allowed?=?bucket.tryConsume(requestedTokens);
18
19????Response?response?=?new?Response(allowed,
20????????????getHeaders(routeConfig,?bucket.getAvailableTokens()));
21????return?Mono.just(response);
22}
實現(xiàn)方式都是類似的,在上面對 Bucket4j 和 Resilience4j 已經(jīng)作了比較詳細(xì)的介紹,這里不再贅述。不過從這里也可以看出 Spring 生態(tài)圈對 Resilience4j 是比較看好的,我們也可以將其引入到我們的項目中。4.2 實現(xiàn)分布式請求頻率限流
上面介紹了如何實現(xiàn)單機請求頻率限流,接下來再看下分布式請求頻率限流。這個就比較簡單了,因為上面說了,Spring Cloud Gateway 自帶了一個限流實現(xiàn),就是 RedisRateLimiter,可以用于分布式限流。它的實現(xiàn)原理依然是基于令牌桶算法的,不過實現(xiàn)邏輯是放在一段 lua 腳本中的,我們可以在 src/main/resources/META-INF/scripts 目錄下找到該腳本文件 request_rate_limiter.lua: 1local?tokens_key?=?KEYS[1]
2local?timestamp_key?=?KEYS[2]
3
4local?rate?=?tonumber(ARGV[1])
5local?capacity?=?tonumber(ARGV[2])
6local?now?=?tonumber(ARGV[3])
7local?requested?=?tonumber(ARGV[4])
8
9local?fill_time?=?capacity/rate
10local?ttl?=?math.floor(fill_time*2)
11
12local?last_tokens?=?tonumber(redis.call("get",?tokens_key))
13if?last_tokens?==?nil?then
14??last_tokens?=?capacity
15end
16
17local?last_refreshed?=?tonumber(redis.call("get",?timestamp_key))
18if?last_refreshed?==?nil?then
19??last_refreshed?=?0
20end
21
22local?delta?=?math.max(0,?now-last_refreshed)
23local?filled_tokens?=?math.min(capacity,?last_tokens (delta*rate))
24local?allowed?=?filled_tokens?>=?requested
25local?new_tokens?=?filled_tokens
26local?allowed_num?=?0
27if?allowed?then
28??new_tokens?=?filled_tokens?-?requested
29??allowed_num?=?1
30end
31
32if?ttl?>?0?then
33??redis.call("setex",?tokens_key,?ttl,?new_tokens)
34??redis.call("setex",?timestamp_key,?ttl,?now)
35end
36
37return?{?allowed_num,?new_tokens?}
這段代碼和上面介紹令牌桶算法時用 Java 實現(xiàn)的那段經(jīng)典代碼幾乎是一樣的。這里使用 lua 腳本,主要是利用了 Redis 的單線程特性,以及執(zhí)行 lua 腳本的原子性,避免了并發(fā)訪問時可能出現(xiàn)請求量超出上限的現(xiàn)象。想象目前令牌桶中還剩 1 個令牌,此時有兩個請求同時到來,判斷令牌是否足夠也是同時的,兩個請求都認(rèn)為還剩 1 個令牌,于是兩個請求都被允許了。有兩種方式來配置 Spring Cloud Gateway 自帶的限流。第一種方式是通過配置文件,比如下面所示的代碼,可以對某個 route 進(jìn)行限流: 1spring:
2??cloud:
3????gateway:
4??????routes:
5??????-?id:?test
6????????uri:?http://httpbin.org:80/get
7????????filters:
8????????-?name:?RequestRateLimiter
9??????????args:
10????????????key-resolver:?'#{@hostAddrKeyResolver}'
11????????????redis-rate-limiter.replenishRate:?1
12????????????redis-rate-limiter.burstCapacity:?3
其中,key-resolver 使用 SpEL 表達(dá)式?#{@beanName}?從 Spring 容器中獲取 hostAddrKeyResolver 對象,burstCapacity 表示令牌桶的大小,replenishRate 表示每秒往桶中填充多少個令牌,也就是填充速度。第二種方式是通過下面的代碼來配置: 1@Bean
2public?RouteLocator?myRoutes(RouteLocatorBuilder?builder)?{
3??return?builder.routes()
4????.route(p?->?p
5??????.path("/get")
6??????.filters(filter?->?filter.requestRateLimiter()
7????????.rateLimiter(RedisRateLimiter.class,?rl?->?rl.setBurstCapacity(3).setReplenishRate(1)).and())
8??????.uri("http://httpbin.org:80"))
9????.build();
10}
這樣就可以對某個 route 進(jìn)行限流了。但是這里有一點要注意,Spring Cloud Gateway 自帶的限流器有一個很大的坑,replenishRate 不支持設(shè)置小數(shù),也就是說往桶中填充的 token 的速度最少為每秒 1 個,所以,如果我的限流規(guī)則是每分鐘 10 個請求(按理說應(yīng)該每 6 秒填充一次,或每秒填充 1/6 個 token),這種情況 Spring Cloud Gateway 就沒法正確的限流。網(wǎng)上也有人提了 issue,support greater than a second resolution for the rate limiter,但還沒有得到解決。4.3 實現(xiàn)單機并發(fā)量限流
上面學(xué)習(xí) Resilience4j 的時候,我們提到了 Resilience4j 的一個功能特性,叫?隔離(Bulkhead)。Bulkhead 這個單詞的意思是船的艙壁,利用艙壁可以將不同的船艙隔離起來,這樣如果一個船艙破損進(jìn)水,那么只損失這一個船艙,其它船艙可以不受影響。借鑒造船行業(yè)的經(jīng)驗,這種模式也被引入到軟件行業(yè),我們把它叫做?艙壁模式(Bulkhead pattern)。艙壁模式一般用于服務(wù)隔離,對于一些比較重要的系統(tǒng)資源,如 CPU、內(nèi)存、連接數(shù)等,可以為每個服務(wù)設(shè)置各自的資源限制,防止某個異常的服務(wù)把系統(tǒng)的所有資源都消耗掉。這種服務(wù)隔離的思想同樣可以用來做并發(fā)量限流。正如前文所述,Resilience4j 提供了兩種 Bulkhead 的實現(xiàn):SemaphoreBulkhead 和 ThreadPoolBulkhead,這也正是艙壁模式常見的兩種實現(xiàn)方案:一種是帶計數(shù)的信號量,一種是固定大小的線程池??紤]到多線程場景下的線程切換成本,默認(rèn)推薦使用信號量。在操作系統(tǒng)基礎(chǔ)課程中,我們學(xué)習(xí)過兩個名詞:互斥量(Mutex)?和?信號量(Semaphores)?;コ饬坑糜诰€程的互斥,它和臨界區(qū)有點相似,只有擁有互斥對象的線程才有訪問資源的權(quán)限,由于互斥對象只有一個,因此任何情況下只會有一個線程在訪問此共享資源,從而保證了多線程可以安全的訪問和操作共享資源。而信號量是用于線程的同步,這是由荷蘭科學(xué)家 E.W.Dijkstra 提出的概念,它和互斥量不同,信號允許多個線程同時使用共享資源,但是它同時設(shè)定了訪問共享資源的線程最大數(shù)目,從而可以進(jìn)行并發(fā)量控制。下面是使用信號量限制并發(fā)訪問的一個簡單例子: 1public?class?SemaphoreTest?{
2
3????private?static?ExecutorService?threadPool?=?Executors.newFixedThreadPool(100);
4????private?static?Semaphore?semaphore?=?new?Semaphore(10);
5
6????public?static?void?main(String[]?args)?{
7????????for?(int?i?=?0;?i?100;?i )?{
8????????????threadPool.execute(new?Runnable()?{
9????????????????@Override
10????????????????public?void?run()?{
11????????????????????try?{
12????????????????????????semaphore.acquire();
13????????????????????????System.out.println("Request?processing?...");
14????????????????????????semaphore.release();
15????????????????????}?catch?(InterruptedException?e)?{
16????????????????????????e.printStack();
17????????????????????}
18????????????????}
19????????????});
20????????}
21????????threadPool.shutdown();
22????}
23}
這里我們創(chuàng)建了 100 個線程同時執(zhí)行,但是由于信號量計數(shù)為 10,所以同時只能有 10 個線程在處理請求。說到計數(shù),實際上,在 Java 里除了 Semaphore 還有很多類也可以用作計數(shù),比如 AtomicLong 或 LongAdder,這在并發(fā)量限流中非常常見,只是無法提供像信號量那樣的阻塞能力: 1public?class?AtomicLongTest?{
2
3????private?static?ExecutorService?threadPool?=?Executors.newFixedThreadPool(100);
4????private?static?AtomicLong?atomic?=?new?AtomicLong();
5
6????public?static?void?main(String[]?args)?{
7????????for?(int?i?=?0;?i?100;?i )?{
8????????????threadPool.execute(new?Runnable()?{
9????????????????@Override
10????????????????public?void?run()?{
11????????????????????try?{
12????????????????????????if(atomic.incrementAndGet()?>?10)?{
13????????????????????????????System.out.println("Request?rejected?...");
14????????????????????????????return;
15????????????????????????}
16????????????????????????System.out.println("Request?processing?...");
17????????????????????????atomic.decrementAndGet();
18????????????????????}?catch?(InterruptedException?e)?{
19????????????????????????e.printStack();
20????????????????????}
21????????????????}
22????????????});
23????????}
24????????threadPool.shutdown();
25????}
26}
4.4 實現(xiàn)分布式并發(fā)量限流通過在單機實現(xiàn)并發(fā)量限流,我們掌握了幾種常用的手段:信號量、線程池、計數(shù)器,這些都是單機上的概念。那么稍微拓展下,如果能實現(xiàn)分布式信號量、分布式線程池、分布式計數(shù)器,那么實現(xiàn)分布式并發(fā)量限流不就易如反掌了嗎?關(guān)于分布式線程池,是我自己杜撰的詞,在網(wǎng)上并沒有找到類似的概念,比較接近的概念是資源調(diào)度和分發(fā),但是又感覺不像,這里直接忽略吧。關(guān)于分布式信號量,還真有這樣的東西,比如 Apache Ignite 就提供了 IgniteSemaphore 用于創(chuàng)建分布式信號量,它的使用方式和 Semaphore 非常類似。使用 Redis 的 ZSet 也可以實現(xiàn)分布式信號量,比如?這篇博客介紹的方法,還有《Redis in Action》這本電子書中也提到了這樣的例子,教你如何實現(xiàn) Counting semaphores。另外,Redisson 也實現(xiàn)了基于 Redis 的分布式信號量 RSemaphore,用法也和 Semaphore 類似。使用分布式信號量可以很容易實現(xiàn)分布式并發(fā)量限流,實現(xiàn)方式和上面的單機并發(fā)量限流幾乎是一樣的。最后,關(guān)于分布式計數(shù)器,實現(xiàn)方案也是多種多樣。比如使用 Redis 的 INCR 就很容易實現(xiàn),更有甚者,使用 MySQL 數(shù)據(jù)庫也可以實現(xiàn)。只不過使用計數(shù)器要注意操作的原子性,每次請求時都要經(jīng)過這三步操作:取計數(shù)器當(dāng)前的值、判斷是否超過閾值,超過則拒絕、將計數(shù)器的值自增。這其實和信號量的 P 操作是一樣的,而釋放就對應(yīng) V 操作。所以,利用分布式信號量和計數(shù)器就可以實現(xiàn)并發(fā)量限流了嗎?問題當(dāng)然沒有這么簡單。實際上,上面通過信號量和計數(shù)器實現(xiàn)單機并發(fā)量限流的代碼片段有一個嚴(yán)重 BUG:1semaphore.acquire();
2System.out.println("Request?processing?...");
3semaphore.release();
想象一下如果在處理請求時出現(xiàn)異常了會怎么樣?很顯然,信號量被該線程獲取了,但是卻永遠(yuǎn)不會釋放,如果請求異常多了,這將導(dǎo)致信號量被占滿,最后一個請求也進(jìn)不來。在單機場景下,這個問題可以很容易解決,加一個 finally 就行了:1try?{
2????semaphore.acquire();
3????System.out.println("Request?processing?...");
4}?catch?(InterruptedException?e)?{
5????e.printStack();
6}?finally?{
7????semaphore.release();
8}
由于無論出現(xiàn)何種異常,finally 中的代碼一定會執(zhí)行,這樣就保證了信號量一定會被釋放。但是在分布式系統(tǒng)中,就不是加一個 finally 這么簡單了。這是因為在分布式系統(tǒng)中可能存在的異常不一定是可被捕獲的代碼異常,還有可能是服務(wù)崩潰或者不可預(yù)知的系統(tǒng)宕機,就算是正常的服務(wù)重啟也可能導(dǎo)致分布式信號量無法釋放。對于這個問題,我和幾個同事連續(xù)討論了幾個晚上,想出了兩種解決方法:第一種方法是使用帶 TTL 的計數(shù)器,第二種方法是基于雙窗口滑動的一種比較 tricky 的算法。第一種方法比較容易理解,我們?yōu)槊總€請求賦予一個唯一 ID,并在 Redis 里寫入一個鍵值對,key 為 requests_xxx(xxx 為請求 ID),value 為 1,并給這個 key 設(shè)置一個 TTL(如果你的應(yīng)用中存在耗時非常長的請求,譬如對于一些 WebSockket 請求可能會持續(xù)幾個小時,還需要開一個線程定期去刷新這個 key 的 TTL)。然后在判斷并發(fā)量時,使用 KEYS 命令查詢 requests_*?開頭的 key 的個數(shù),就可以知道當(dāng)前一共有多少個請求,如果超過并發(fā)量上限則拒絕請求。這種方法可以很好的應(yīng)對服務(wù)崩潰或重啟的問題,由于每個 key 都設(shè)置了 TTL,所以經(jīng)過一段時間后,這些 key 就會自動消失,就不會出現(xiàn)信號量占滿不釋放的情況了。但是這里使用 KEYS 命令查詢請求個數(shù)是一個非常低效的做法,在請求量比較多的情況下,網(wǎng)關(guān)的性能會受到嚴(yán)重影響。我們可以把 KEYS 命令換成 SCAN,性能會得到些許提升,但總體來說效果還是很不理想的。針對第一種方法,我們可以進(jìn)一步優(yōu)化,不用為每個請求寫一個鍵值對,而是為每個分布式系統(tǒng)中的每個實例賦予一個唯一 ID,并在 Redis 里寫一個鍵值對,key 為 instances_xxx(xxx 為實例 ID),value 為這個實例當(dāng)前的并發(fā)量。同樣的,我們?yōu)檫@個 key 設(shè)置一個 TTL,并且開啟一個線程定期去刷新這個 TTL。每接受一個請求后,計數(shù)器加一,請求結(jié)束,計數(shù)器減一,這和單機場景下的處理方式一樣,只不過在判斷并發(fā)量時,還是需要使用 KEYS 或 SCAN 獲取所有的實例,并計算出并發(fā)量的總和。不過由于實例個數(shù)是有限的,性能比之前的做法有了明顯的提升。第二種方法我稱之為?雙窗口滑動算法,結(jié)合了 TTL 計數(shù)器和滑動窗口算法。我們按分鐘來設(shè)置一個時間窗口,在 Redis 里對應(yīng) 202009051130?這樣的一個 key,value 為計數(shù)器,表示請求的數(shù)量。當(dāng)接受一個請求后,在當(dāng)前的時間窗口中加一,當(dāng)請求結(jié)束,在當(dāng)前的時間窗口中減一,注意,接受請求和請求結(jié)束的時間窗口可能不是同一個。另外,我們還需要一個本地列表來記錄當(dāng)前實例正在處理的所有請求和請求對應(yīng)的時間窗口,并通過一個小于時間窗口的定時線程(如 30 秒)來遷移過期的請求,所謂過期,指的是請求的時間窗口和當(dāng)前時間窗口不一致。那么具體如何遷移呢?我們首先需要統(tǒng)計列表中一共有多少請求過期了,然后將列表中的過期請求時間更新為當(dāng)前時間窗口,并從 Redis 中上一個時間窗口移動相應(yīng)數(shù)量到當(dāng)前時間窗口,也就是上一個時間窗口減 X,當(dāng)前時間窗口加 X。由于遷移線程定期執(zhí)行,所以過期的請求總是會被移動到當(dāng)前窗口,最終 Redis 中只有當(dāng)前時間窗口和上個時間窗口這兩個時間窗口中有數(shù)據(jù),再早一點的窗口時間中的數(shù)據(jù)會被往后遷移,所以可以給這個 key 設(shè)置一個 3 分鐘或 5 分鐘的 TTL。判斷并發(fā)量時,由于只有兩個 key,只需要使用 MGET 獲取兩個值相加即可。下面的流程圖詳細(xì)描述了算法的運行過程:
- 請求結(jié)束時,直接在 Redis 中當(dāng)前時間窗口減一即可,就算是負(fù)數(shù)也沒關(guān)系。請求列表中的該請求不用急著刪除,可以打上結(jié)束標(biāo)記,在遷移線程中統(tǒng)一刪除(當(dāng)然,如果請求的開始時間和結(jié)束時間在同一個窗口,可以直接刪除);
- 遷移的時間間隔要小于時間窗口,一般設(shè)置為 30s;
- Redis 中的 key 一定要設(shè)置 TTL,時間至少為 2 個時間窗口,一般設(shè)置為 3 分鐘;
- 遷移過程涉及到“從上一個時間窗口減”和“在當(dāng)前時間窗口加”兩個操作,要注意操作的原子性;
- 獲取當(dāng)前并發(fā)量可以通過 MGET 一次性讀取兩個時間窗口的值,不用 GET 兩次;
- 獲取并發(fā)量和判斷并發(fā)量是否超限,這個過程也要注意操作的原子性。
總結(jié)
網(wǎng)關(guān)作為微服務(wù)架構(gòu)中的重要一環(huán),充當(dāng)著一夫當(dāng)關(guān)萬夫莫開的角色,所以對網(wǎng)關(guān)服務(wù)的穩(wěn)定性要求和性能要求都非常高。為保證網(wǎng)關(guān)服務(wù)的穩(wěn)定性,一代又一代的程序員們前仆后繼,想出了十八般武藝:限流、熔斷、隔離、緩存、降級、等等等等。這篇文章從限流入手,詳細(xì)介紹了限流的場景和算法,以及源碼實現(xiàn)和可能踩到的坑。盡管限流只是網(wǎng)關(guān)的一個非常小的功能,但卻影響到網(wǎng)關(guān)的方方面面,在系統(tǒng)架構(gòu)的設(shè)計中至關(guān)重要。雖然我試著從不同的角度希望把限流介紹的更完全,但終究是管中窺豹,只見一斑,還有很多的內(nèi)容沒有介紹到,比如阿里開源的 Sentinel 組件也可以用于限流,因為篇幅有限未能展開。另外前文提到的 Netflix 不再維護(hù) Hystrix 項目,這是因為他們把精力放到另一個限流項目 concurrency-limits 上了,這個項目的目標(biāo)是打造一款自適應(yīng)的,極具彈性的限流組件,它借鑒了 TCP 擁塞控制的算法(TCP congestion control algorithm),實現(xiàn)系統(tǒng)的自動限流,感興趣的同學(xué)可以去它的項目主頁了解更多內(nèi)容。本文篇幅較長,難免疏漏,如有問題,還望不吝賜教。參考
- 微服務(wù)網(wǎng)關(guān)實戰(zhàn)——Spring Cloud Gateway
- 《億級流量網(wǎng)站架構(gòu)核心技術(shù)》張開濤
- 聊聊高并發(fā)系統(tǒng)之限流特技
- 架構(gòu)師成長之路之限流
- 微服務(wù)接口限流的設(shè)計與思考
- 常用4種限流算法介紹及比較
- 來談?wù)勏蘖?從概念到實現(xiàn)
- 高并發(fā)下的限流分析
- 計數(shù)器算法
- 基于Redis的限流系統(tǒng)的設(shè)計
- API 調(diào)用次數(shù)限制實現(xiàn)
- Techniques to Improve QoS
- An alternative approach to rate limiting
- Scaling your API with rate limiters
- Brief overview of token-bucket algorithm
- Rate limiting Spring MVC endpoints with bucket4j
- Rate Limiter Internals in Resilience4j
- 高可用框架Resilience4j使用指南
- 阿里巴巴開源限流系統(tǒng) Sentinel 全解析
- spring cloud gateway 之限流篇
- 服務(wù)容錯模式
- 你的API會自適應(yīng)「彈性」限流嗎???