首頁>技術>

前文請看:

本篇講基於Resilience4j的限流RateLimiter和艙壁Bulkhead。

RateLimiter

pom配置請看《實踐三》。

配置:

#拿不到透過許可權,等多久resilience4j.ratelimiter.configs.default.timeout-duration=50ms#單位時間是多久resilience4j.ratelimiter.configs.default.limit-refresh-period=10s#單位時間允許透過多少請求resilience4j.ratelimiter.configs.default.limit-for-period=1

說明:

1、注意其中的default,這是自定義的RateLimiter的策略名,可以針對不同的要求,配置不同的限流策略,在後面的程式碼的註解中使用即可。

程式碼實現:

因為是限制請求的,所以要與OpenFeign配搭使用,以註解的方式,放在介面AServerClient裡。

@FeignClient(value = "a-server")public interface AServerClient {    @GetMapping("/getError")    @RateLimiter(name = "default", fallbackMethod = "getUserErrorRateLimiterFallback")    public UserEntity getError();    default UserEntity getUserErrorRateLimiterFallback(RequestNotPermitted e) {        System.out.println("getUserError限流降級:" + e.getLocalizedMessage());        return new UserEntity(888, "getUserError限流降級");    }}

注意:

1、RateLimiter的name是default,就是配置檔案裡設定的default,可以設定為其他名字。

2、如果達到最大請求數,會丟擲RequestNotPermitted異常:

getUserError限流降級:RateLimiter 'default' does not permit further calls

RateLimiter通常與CircuitBreaker配搭使用,RateLimiter處理所有的請求數,CircuitBreaker處理異常的請求數。

但因為RateLimiter的處理順便先與CircuitBreaker,所以如果OpenFeign丟擲的異常由RateLimiter處理了,​則CircuitBreaker就不會處理。​

Bulkhead

Bulkhead的實現方式有兩種,1是執行緒池,2是訊號量,配置如下:

resilience4j.bulkhead.configs.default.max-concurrent-calls=3resilience4j.bulkhead.configs.default.max-wait-duration=0s#resilience4j.thread-pool-bulkhead.instances.default.max-thread-pool-size=1#resilience4j.thread-pool-bulkhead.instances.default.core-thread-pool-size=1#resilience4j.thread-pool-bulkhead.instances.default.queue-capacity=1

注意:

1、同樣要注意default

2、前兩個是配置訊號量,後面註釋掉的3個是執行緒池。

程式碼實現:

以註解的方式,放在Service實現類裡:

    @Bulkhead(name = "default", fallbackMethod = "getUserErrorFallback", type = Bulkhead.Type.SEMAPHORE)    public UserEntity getUserError() {        return aServerClient.getError();    }    public UserEntity getUserErrorFallback(BulkheadFullException e) {        System.out.println("getUserError執行緒已滿: " + e.getLocalizedMessage());        return new UserEntity(777, "getUserError執行緒已滿");    }

注意:

1、比前文的程式碼多了一個降級方法,專門處理BulkheadFullException。

2、註解上的type設定的是Bulkhead.Type.SEMAPHORE訊號量,還有一個是Bulkhead.Type.THREADPOOL執行緒池,與配置檔案​要對應。

Bulkhead是處理併發量的,所以​Controller也要做一些改動:

    @GetMapping("/getUserError")    @ResponseBody    public List<UserEntity> getUserError() {        List<UserEntity> users = Lists.newArrayList();        final CountDownLatch countDownLatch = new CountDownLatch(5);        for (int i = 0; i < 5; i++) {            new Thread(new Runnable() {                @Override                public void run() {                    users.add(helloService.getUserError());                    countDownLatch.countDown();                }            }).start();        }        try {            countDownLatch.await();        } catch (InterruptedException e) {            e.printStackTrace();        }        return users;    }

在Controller裡,建立了5個執行緒去呼叫Service的方法。

假設Bulkhead與RateLimiter配搭使用,如果併發量設定為3,限流數為4,則5個請求中,3個正常呼叫A-Server的介面,剩下2個則由Bulkhead進行降級處理:

getUserError執行緒已滿: Bulkhead 'default' is full and does not permit further callsgetUserError執行緒已滿: Bulkhead 'default' is full and does not permit further callsgetUserError降級:Read timed out executing GET http://a-server/getErrorgetUserError降級:Read timed out executing GET http://a-server/getErrorgetUserError降級:Read timed out executing GET http://a-server/getError

返回結果:

[    {        "id": 777,        "userName": "getUserError執行緒已滿"    },    {        "id": 777,        "userName": "getUserError執行緒已滿"    },    {        "id": 999,        "userName": "getUserError降級"    },    {        "id": 999,        "userName": "getUserError降級"    },    {        "id": 999,        "userName": "getUserError降級"    }]

如果設定併發量為4,限流數為3,則5個請求中,3個正常呼叫A-Server介面,剩下的2個請求,一個由RateLimiter處理,一個由Bulkhead處理:

getUserError執行緒已滿: Bulkhead 'default' is full and does not permit further callsgetUserError限流降級:RateLimiter 'default' does not permit further callsgetUserError降級Read timed out executing GET http://a-server/getErrorgetUserError降級Read timed out executing GET http://a-server/getErrorgetUserError降級Read timed out executing GET http://a-server/getError

返回結果:

[    {        "id": 777,        "userName": "getUserError執行緒已滿"    },    {        "id": 888,        "userName": "getUserError限流降級"    },    {        "id": 999,        "userName": "getUserError降級"    },    {        "id": 999,        "userName": "getUserError降級"    },    {        "id": 999,        "userName": "getUserError降級"    }]

原因還是之前說的,Resilience4j這幾個模組的處理順序是:

Retry(CircuitBreaker(RateLimiter(TimeLimiter(Bulkhead(Function)))))

控制併發的Bulkhead一定是第一個執行的,如果想改變順序,可以作如下配置:

resilience4j.circuitbreaker.circuit-breaker-aspect-order=1resilience4j.ratelimiter.rate-limiter-aspect-order=2resilience4j.retry.retry-aspect-order=3resilience4j.timelimiter.time-limiter-aspect-order=3

不過Bulkhead的順序無法改變,是寫死的Integer.MAX_VALUE。可以參看BulkheadConfigurationProperties類。

多嘴一句,雖然都是控制數量的,Bulkhead是控制併發量,RateLimiter是控制請求數。

好比一個門店,RateLimiter是控制能進來多少人,Bulkhead是控制一個人能買多少件東西。

整個鼠年我是從年頭衰到年尾最後一天,希望牛年能轉轉運吧。

這是牛年的第一篇,也是Spring Cloud 2020新元件替換Netflix套件的最後一篇,還有其他的元件,有機會了再寫吧。

13
最新評論
  • BSA-TRITC(10mg/ml) TRITC-BSA 牛血清白蛋白改性標記羅丹明
  • React with Typescript入門