大家好,我又人傻了。這次的經驗告訴我們,出來寫程式碼偷的懶,遲早要還的。
問題現象與背景
昨晚我們的閘道器雪崩了一段時間,現象是:
1.不斷有各種微服務報異常:在寫 HTTP 響應的時候,連線已經關閉:
reactor.netty.http.client.PrematureCloseException: Connection prematurely closed BEFORE response
2.同時還有請求還沒讀取完,連線已經關閉的異常:
org.springframework.http.converter.HttpMessageNotReadableException: I/O error while reading input message; nested exception is java.io.IOException: UT000128: Remote peer closed connection before all data could be read
3.前端不斷有請求超時的報警,504 Gateway Time-out
4.閘道器程序不斷健康檢查失敗而被重啟
5.重啟後的閘道器程序,立刻請求數量激增,每個例項峰值 2000 qps,閒時每個例項 500 qps,忙時由於有擴容也能保持每個例項在 1000 qps 以內,然後健康檢查介面就很長時間沒有響應,導致例項不斷重啟
其中,1 和 2 的問題應該是應為閘道器不斷重啟,並且由於某些原因優雅關閉失敗導致強制關閉,強制關閉導致連線被強制斷開從而有 1 和 2 相關的異常。
我們的閘道器是基於 Spring Cloud Gateway 實現的,並且有自動根據 CPU 負載擴容的機制。奇怪的是在請求數量彪增的時候,CPU 利用率並沒有提高很多,保持在 60% 左右,由於 CPU 負載沒有達到擴容的界限,所以一直沒有自動擴容。為了快速解決問題,我們手動擴容了幾個閘道器例項,將閘道器單例項負載控制在了 1000 以內,暫時解決了問題。
問題分析
為了徹底解決這個問題,我們使用 JFR 分析。首先先根據已知的線索去分析:
Spring Cloud Gateway 是基於 Spring-WebFlux 實現的非同步響應式閘道器,http 業務執行緒是有限的(預設是 2 * 可以使用的 CPU 個數,我們這裡是 4)。閘道器程序不斷健康檢查失敗,健康檢查呼叫的是 /actuator/health 介面,這個介面一直超時。健康檢查介面超時一般有兩個原因:
健康檢查介面檢查某個元件的時候,阻塞住了。例如資料庫如果卡住,那麼可能資料庫健康檢查會一直沒有返回。http 執行緒池沒來得及處理健康檢查請求,請求就超時了。我們可以先去看 JFR 中的定時堆疊,看是否有 http 執行緒卡在健康檢查上面。查看出問題後的執行緒堆疊,重點關注那 4 個 http 執行緒,結果發現這 4 個執行緒的堆疊基本一樣,都是在執行 Redis 命令:
"reactor-http-nio-1" #68 daemon prio=5 os_prio=0 cpu=70832.99ms elapsed=199.98s tid=0x0000ffffb2f8a740 nid=0x69 waiting on condition [0x0000fffe8adfc000] java.lang.Thread.State: TIMED_WAITING (parking) at jdk.internal.misc.Unsafe.park([email protected]/Native Method) - parking to wait for <0x00000007d50eddf8> (a java.util.concurrent.CompletableFuture$Signaller) at java.util.concurrent.locks.LockSupport.parkNanos([email protected]/LockSupport.java:234) at java.util.concurrent.CompletableFuture$Signaller.block([email protected]/CompletableFuture.java:1798) at java.util.concurrent.ForkJoinPool.managedBlock([email protected]/ForkJoinPool.java:3128) at java.util.concurrent.CompletableFuture.timedGet([email protected]/CompletableFuture.java:1868) at java.util.concurrent.CompletableFuture.get([email protected]/CompletableFuture.java:2021) at io.lettuce.core.protocol.AsyncCommand.await(AsyncCommand.java:83) at io.lettuce.core.internal.Futures.awaitOrCancel(Futures.java:244) at io.lettuce.core.FutureSyncInvocationHandler.handleInvocation(FutureSyncInvocationHandler.java:75) at io.lettuce.core.internal.AbstractInvocationHandler.invoke(AbstractInvocationHandler.java:80) at com.sun.proxy.$Proxy245.get(Unknown Source) at org.springframework.data.redis.connection.lettuce.LettuceStringCommands.get(LettuceStringCommands.java:68) at org.springframework.data.redis.connection.DefaultedRedisConnection.get(DefaultedRedisConnection.java:267) at org.springframework.data.redis.connection.DefaultStringRedisConnection.get(DefaultStringRedisConnection.java:406) at org.springframework.data.redis.core.DefaultValueOperations$1.inRedis(DefaultValueOperations.java:57) at org.springframework.data.redis.core.AbstractOperations$ValueDeserializingRedisCallback.doInRedis(AbstractOperations.java:60) at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:222) at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:189) at org.springframework.data.redis.core.AbstractOperations.execute(AbstractOperations.java:96) at org.springframework.data.redis.core.DefaultValueOperations.get(DefaultValueOperations.java:53) at com.jojotech.apigateway.filter.AccessCheckFilter.traced(AccessCheckFilter.java:196) at com.jojotech.apigateway.filter.AbstractTracedFilter.filter(AbstractTracedFilter.java:39) at org.springframework.cloud.gateway.handler.FilteringWebHandler$GatewayFilterAdapter.filter(FilteringWebHandler.java:137) at org.springframework.cloud.gateway.filter.OrderedGatewayFilter.filter(OrderedGatewayFilter.java:44) at org.springframework.cloud.gateway.handler.FilteringWebHandler$DefaultGatewayFilterChain.lambda$filter$0(FilteringWebHandler.java:117) at org.springframework.cloud.gateway.handler.FilteringWebHandler$DefaultGatewayFilterChain$$Lambda$1478/0x0000000800b84c40.get(Unknown Source) at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:44) at reactor.core.publisher.Mono.subscribe(Mono.java:4150) at com.jojotech.apigateway.common.TracedMono.subscribe(TracedMono.java:24) at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) at reactor.core.publisher.Mono.subscribe(Mono.java:4150) at com.jojotech.apigateway.common.TracedMono.subscribe(TracedMono.java:24) at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) at reactor.core.publisher.Mono.subscribe(Mono.java:4150) at com.jojotech.apigateway.common.TracedMono.subscribe(TracedMono.java:24) at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) at reactor.core.publisher.Mono.subscribe(Mono.java:4150) at com.jojotech.apigateway.common.TracedMono.subscribe(TracedMono.java:24) at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) at reactor.core.publisher.Mono.subscribe(Mono.java:4150) at com.jojotech.apigateway.common.TracedMono.subscribe(TracedMono.java:24) at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) at reactor.core.publisher.Mono.subscribe(Mono.java:4150) at com.jojotech.apigateway.common.TracedMono.subscribe(TracedMono.java:24) at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) at reactor.core.publisher.Mono.subscribe(Mono.java:4150) at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:255) at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51) at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157) at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:73) at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82) at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerNext(FluxConcatMap.java:281) at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:860) at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120) at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:73) at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1815) at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:151) at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120) at reactor.core.publisher.MonoNext$NextSubscriber.onNext(MonoNext.java:82) at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerNext(FluxConcatMap.java:281) at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:860) at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79) at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:180) at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1815) at reactor.core.publisher.MonoFilterWhen$MonoFilterWhenMain.onNext(MonoFilterWhen.java:149) at reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2397) at reactor.core.publisher.MonoFilterWhen$MonoFilterWhenMain.onSubscribe(MonoFilterWhen.java:112) at reactor.core.publisher.MonoJust.subscribe(MonoJust.java:54) at reactor.core.publisher.Mono.subscribe(Mono.java:4150) at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:448) at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onNext(FluxConcatMap.java:250) at reactor.core.publisher.FluxDematerialize$DematerializeSubscriber.onNext(FluxDematerialize.java:98) at reactor.core.publisher.FluxDematerialize$DematerializeSubscriber.onNext(FluxDematerialize.java:44) at reactor.core.publisher.FluxIterable$IterableSubscription.slowPath(FluxIterable.java:270) at reactor.core.publisher.FluxIterable$IterableSubscription.request(FluxIterable.java:228) at reactor.core.publisher.FluxDematerialize$DematerializeSubscriber.request(FluxDematerialize.java:127) at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onSubscribe(FluxConcatMap.java:235) at reactor.core.publisher.FluxDematerialize$DematerializeSubscriber.onSubscribe(FluxDematerialize.java:77) at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:164) at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:86) at reactor.core.publisher.InternalFluxOperator.subscribe(InternalFluxOperator.java:62) at reactor.core.publisher.FluxDefer.subscribe(FluxDefer.java:54) at reactor.core.publisher.Mono.subscribe(Mono.java:4150) at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:448) at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.onSubscribe(FluxConcatMap.java:218) at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:164) at reactor.core.publisher.FluxIterable.subscribe(FluxIterable.java:86) at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) at org.springframework.cloud.sleuth.instrument.web.TraceWebFilter$MonoWebFilterTrace.subscribe(TraceWebFilter.java:184) at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) at reactor.core.publisher.Mono.subscribe(Mono.java:4150) at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:255) at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51) at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64) at reactor.netty.http.server.HttpServer$HttpServerHandle.onStateChange(HttpServer.java:915) at reactor.netty.ReactorNetty$CompositeConnectionObserver.onStateChange(ReactorNetty.java:654) at reactor.netty.transport.ServerTransport$ChildObserver.onStateChange(ServerTransport.java:478) at reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:526) at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:94) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at reactor.netty.http.server.HttpTrafficHandler.channelRead(HttpTrafficHandler.java:209) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at reactor.netty.http.server.logging.AccessLogHandlerH1.channelRead(AccessLogHandlerH1.java:59) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436) at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:324) at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:296) at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:719) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:655) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:581) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.lang.Thread.run([email protected]/Thread.java:834)
發現 http 執行緒沒有卡在健康檢查,同時其他執行緒也沒有任何和健康檢查相關的堆疊(非同步環境下,健康檢查也是非同步的,其中某些過程可能交給其他執行緒)。所以,健康檢查請求應該是還沒被執行就超時取消了。
那麼為啥會這樣呢?於此同時,我還發現這裡用的是 RedisTemplate,是 spring-data-redis 的同步 Redis API。我猛然想起來之前寫這裡的程式碼的時候,因為只是驗證一個 key 是否存在和修改 key 的過期時間,偷懶沒有用非同步 API。這裡是不是因為使用同步 API 阻塞了 http 執行緒導致的雪崩呢?
我們來驗證下這個猜想:我們的專案中 redis 操作是透過 spring-data-redis + Lettuce 連線池,啟用並且增加了關於 Lettuce 命令的 JFR 監控,可以參考我的這篇文章:這個 Redis 連線池的新監控方式針不戳~我再加一點佐料,截至目前我的 pull request 已經合併,這個特性會在 6.2.x 版本釋出。我們看下出問題時間附近的 Redis 命令採集,如下圖所示:
我們來簡單計算下執行 Redis 命令導致的阻塞時間(我們的採集是 10s 一次,count 是命令次數,時間單位是微秒):使用這裡的命令個數乘以 50% 的中位數,除以 10(因為是 10s),得出每秒因為執行 Redis 命令導致的阻塞時間:
32*152=48641*860=8605*163=81532*176=56321*178=17816959*168=2849112774*176=1362243144*166=52190417343*179=3104397702*166=116532總和 67405186740518 / 10 = 674051.8 us = 0.67s
這個僅僅是使用中位數計算的阻塞時間,從圖上的分佈其實可以看出真正的值應該比這個大,這樣很有可能每秒需要在 Redis 同步介面上阻塞的時間就超過了 1s,不斷地請求,請求沒有減少,從而導致了請求越積越多,最後雪崩。
並且由於是阻塞介面,執行緒很多時間消耗在等待 io 了,所以 CPU 上不去,導致沒有自動擴容。業務高峰時,由於有設定好的預先擴容,導致閘道器單例項沒有達到出問題的壓力,所以沒問題。
解決問題
我們來改寫原有程式碼,使用同步 spring-data-redis Api 原有程式碼是(其實就是 spring-cloud-gateway 的 Filter 介面的核心方法 public Mono<Void> traced(ServerWebExchange exchange, GatewayFilterChain chain) 的方法體):
if (StringUtils.isBlank(token)) { //如果 token 不存在,則根據路徑決定繼續請求還是返回需要登入的狀態碼 return continueOrUnauthorized(path, exchange, chain, headers);} else { try { String accessTokenValue = redisTemplate.opsForValue().get(token); if (StringUtils.isNotBlank(accessTokenValue)) { //如果 accessTokenValue 不為空,則續期 4 小時,保證登入使用者只要有操作就不會讓 token 過期 Long expire = redisTemplate.getExpire(token); log.info("accessTokenValue = {}, expire = {}", accessTokenValue, expire); if (expire != null && expire < 4 * 60 * 60) { redisTemplate.expire(token, 4, TimeUnit.HOURS); } //解析,獲取 userId JSONObject accessToken = JSON.parseObject(accessTokenValue); String userId = accessToken.getString("userId"); //如果 userId 不為空才合法 if (StringUtils.isNotBlank(userId)) { //解析 Token HttpHeaders newHeaders = parse(accessToken); //繼續請求 return FilterUtil.changeRequestHeader(exchange, chain, newHeaders); } } } catch (Exception e) { log.error("read accessToken error: {}", e.getMessage(), e); } //如果 token 不合法,則根據路徑決定繼續請求還是返回需要登入的狀態碼 return continueOrUnauthorized(path, exchange, chain, headers);}
改成使用非同步:
if (StringUtils.isBlank(token)) { return continueOrUnauthorized(path, exchange, chain, headers);} else { HttpHeaders finalHeaders = headers; //必須使用 tracedPublisherFactory 包裹,否則鏈路資訊會丟失,這裡參考我的另一篇文章:Spring Cloud Gateway 沒有鏈路資訊,我 TM 人傻了 return tracedPublisherFactory.getTracedMono( redisTemplate.opsForValue().get(token) //必須切換執行緒,否則後續執行緒使用的還是 Redisson 的執行緒,如果耗時長則會影響其他使用 Redis 的業務,並且這個耗時也算在 Redis 連線命令超時中 .publishOn(Schedulers.parallel()), exchange ).doOnSuccess(accessTokenValue -> { if (accessTokenValue != null) { //accessToken續期,4小時 tracedPublisherFactory.getTracedMono(redisTemplate.getExpire(token).publishOn(Schedulers.parallel()), exchange).doOnSuccess(expire -> { log.info("accessTokenValue = {}, expire = {}", accessTokenValue, expire); if (expire != null && expire.toHours() < 4) { redisTemplate.expire(token, Duration.ofHours(4)).subscribe(); } }).subscribe(); } }) //必須轉換成非 null,否則 flatmap 不會執行;也不能在末尾用 switchIfEmpty,因為整體返回的是 Mono<Void> 本來裡面承載的就是空的,會導致每個請求傳送兩遍。 .defaultIfEmpty("") .flatMap(accessTokenValue -> { try { if (StringUtils.isNotBlank(accessTokenValue)) { JSONObject accessToken = JSON.parseObject(accessTokenValue); String userId = accessToken.getString("userId"); if (StringUtils.isNotBlank(userId)) { //解析 Token HttpHeaders newHeaders = parse(accessToken); //繼續請求 return FilterUtil.changeRequestHeader(exchange, chain, newHeaders); } } return continueOrUnauthorized(path, exchange, chain, finalHeaders); } catch (Exception e) { log.error("read accessToken error: {}", e.getMessage(), e); return continueOrUnauthorized(path, exchange, chain, finalHeaders); } });}
這裡有幾個注意點:
Spring-Cloud-Sleuth 對於 Spring-WebFlux 中做的鏈路追蹤優先,如果我們在 Filter 中建立新的 Flux 或者 Mono,這裡面是沒有鏈路資訊的,需要我們手動加入。這個可以參考我的另一篇文章:Spring Cloud Gateway 沒有鏈路資訊,我 TM 人傻了spring-data-redis + Lettuce 連線池的組合,對於非同步介面,我們最好在獲取響應之後切換成別的執行緒池執行,否則後續執行緒使用的還是 Redisson 的執行緒,如果耗時長則會影響其他使用 Redis 的業務,並且這個耗時也算在 Redis 連線命令超時中Project Reactor 如果中間結果有 null 值,則後面的 flatmap、map 等流操作就不執行了。如果在這裡終止,前端收到的響應是有問題的。所以中間結果我們要在每一步考慮 null 問題。spring-cloud-gateway 的核心 GatewayFilter 介面,核心方法返回的是 Mono<Void>。Mono<Void> 本來裡面承載的就是空的,導致我們不能使用末尾的 switchIfEmpty 來簡化中間步驟的 null,如果用了會導致每個請求傳送兩遍。這樣修改後,壓測了下閘道器,單例項 2w qps 請求也沒有出現這個問題了。