前文請看:
本篇講基於Resilience4j的限流RateLimiter和艙壁Bulkhead。
RateLimiterpom配置請看《實踐三》。
配置:
#拿不到透過許可權,等多久resilience4j.ratelimiter.configs.default.timeout-duration=50ms#單位時間是多久resilience4j.ratelimiter.configs.default.limit-refresh-period=10s#單位時間允許透過多少請求resilience4j.ratelimiter.configs.default.limit-for-period=1
說明:
1、注意其中的default,這是自定義的RateLimiter的策略名,可以針對不同的要求,配置不同的限流策略,在後面的程式碼的註解中使用即可。
程式碼實現:
因為是限制請求的,所以要與OpenFeign配搭使用,以註解的方式,放在介面AServerClient裡。
@FeignClient(value = "a-server")public interface AServerClient { @GetMapping("/getError") @RateLimiter(name = "default", fallbackMethod = "getUserErrorRateLimiterFallback") public UserEntity getError(); default UserEntity getUserErrorRateLimiterFallback(RequestNotPermitted e) { System.out.println("getUserError限流降級:" + e.getLocalizedMessage()); return new UserEntity(888, "getUserError限流降級"); }}
注意:
1、RateLimiter的name是default,就是配置檔案裡設定的default,可以設定為其他名字。
2、如果達到最大請求數,會丟擲RequestNotPermitted異常:
getUserError限流降級:RateLimiter 'default' does not permit further calls
RateLimiter通常與CircuitBreaker配搭使用,RateLimiter處理所有的請求數,CircuitBreaker處理異常的請求數。
但因為RateLimiter的處理順便先與CircuitBreaker,所以如果OpenFeign丟擲的異常由RateLimiter處理了,則CircuitBreaker就不會處理。
BulkheadBulkhead的實現方式有兩種,1是執行緒池,2是訊號量,配置如下:
resilience4j.bulkhead.configs.default.max-concurrent-calls=3resilience4j.bulkhead.configs.default.max-wait-duration=0s#resilience4j.thread-pool-bulkhead.instances.default.max-thread-pool-size=1#resilience4j.thread-pool-bulkhead.instances.default.core-thread-pool-size=1#resilience4j.thread-pool-bulkhead.instances.default.queue-capacity=1
注意:
1、同樣要注意default。
2、前兩個是配置訊號量,後面註釋掉的3個是執行緒池。
程式碼實現:
以註解的方式,放在Service實現類裡:
@Bulkhead(name = "default", fallbackMethod = "getUserErrorFallback", type = Bulkhead.Type.SEMAPHORE) public UserEntity getUserError() { return aServerClient.getError(); } public UserEntity getUserErrorFallback(BulkheadFullException e) { System.out.println("getUserError執行緒已滿: " + e.getLocalizedMessage()); return new UserEntity(777, "getUserError執行緒已滿"); }
注意:
1、比前文的程式碼多了一個降級方法,專門處理BulkheadFullException。
2、註解上的type設定的是Bulkhead.Type.SEMAPHORE訊號量,還有一個是Bulkhead.Type.THREADPOOL執行緒池,與配置檔案要對應。
Bulkhead是處理併發量的,所以Controller也要做一些改動:
@GetMapping("/getUserError") @ResponseBody public List<UserEntity> getUserError() { List<UserEntity> users = Lists.newArrayList(); final CountDownLatch countDownLatch = new CountDownLatch(5); for (int i = 0; i < 5; i++) { new Thread(new Runnable() { @Override public void run() { users.add(helloService.getUserError()); countDownLatch.countDown(); } }).start(); } try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); } return users; }
在Controller裡,建立了5個執行緒去呼叫Service的方法。
假設Bulkhead與RateLimiter配搭使用,如果併發量設定為3,限流數為4,則5個請求中,3個正常呼叫A-Server的介面,剩下2個則由Bulkhead進行降級處理:
getUserError執行緒已滿: Bulkhead 'default' is full and does not permit further callsgetUserError執行緒已滿: Bulkhead 'default' is full and does not permit further callsgetUserError降級:Read timed out executing GET http://a-server/getErrorgetUserError降級:Read timed out executing GET http://a-server/getErrorgetUserError降級:Read timed out executing GET http://a-server/getError
返回結果:
[ { "id": 777, "userName": "getUserError執行緒已滿" }, { "id": 777, "userName": "getUserError執行緒已滿" }, { "id": 999, "userName": "getUserError降級" }, { "id": 999, "userName": "getUserError降級" }, { "id": 999, "userName": "getUserError降級" }]
如果設定併發量為4,限流數為3,則5個請求中,3個正常呼叫A-Server介面,剩下的2個請求,一個由RateLimiter處理,一個由Bulkhead處理:
getUserError執行緒已滿: Bulkhead 'default' is full and does not permit further callsgetUserError限流降級:RateLimiter 'default' does not permit further callsgetUserError降級Read timed out executing GET http://a-server/getErrorgetUserError降級Read timed out executing GET http://a-server/getErrorgetUserError降級Read timed out executing GET http://a-server/getError
返回結果:
[ { "id": 777, "userName": "getUserError執行緒已滿" }, { "id": 888, "userName": "getUserError限流降級" }, { "id": 999, "userName": "getUserError降級" }, { "id": 999, "userName": "getUserError降級" }, { "id": 999, "userName": "getUserError降級" }]
原因還是之前說的,Resilience4j這幾個模組的處理順序是:
Retry(CircuitBreaker(RateLimiter(TimeLimiter(Bulkhead(Function)))))
控制併發的Bulkhead一定是第一個執行的,如果想改變順序,可以作如下配置:
resilience4j.circuitbreaker.circuit-breaker-aspect-order=1resilience4j.ratelimiter.rate-limiter-aspect-order=2resilience4j.retry.retry-aspect-order=3resilience4j.timelimiter.time-limiter-aspect-order=3
不過Bulkhead的順序無法改變,是寫死的Integer.MAX_VALUE。可以參看BulkheadConfigurationProperties類。
多嘴一句,雖然都是控制數量的,Bulkhead是控制併發量,RateLimiter是控制請求數。
好比一個門店,RateLimiter是控制能進來多少人,Bulkhead是控制一個人能買多少件東西。
整個鼠年我是從年頭衰到年尾最後一天,希望牛年能轉轉運吧。
這是牛年的第一篇,也是Spring Cloud 2020新元件替換Netflix套件的最後一篇,還有其他的元件,有機會了再寫吧。