1.es簡介 1.1 起源
https://www.elastic.co/cn/what-is/elasticsearch,es的起源,是因為程式設計師Shay Banon在使用Apache Lucene發現不太好用,然後手動改造升級的過程中發展起來的。(程式設計師就是需要有這種動力~)實際上es也是一個java應用,跑在jvm裡面的
1.2 與關係型資料庫的區別 1.3 為什麼這麼快索引方式的區別,es主要是利用倒排索引(inverted index),這個翻譯可能會讓初次接觸的人產生誤解,誤以為是倒著排序?其實不是這樣,一般關係型資料庫索引是把某個欄位建立起一張索引表,傳入這個欄位的某個值,再去索引中判斷是否有這個值,從而找到這個值所在資料(id)的位置。而倒排索引則是把這個值所在的文件id記錄下來,當輸入這個值的時候,直接查詢出這個值所匹配的文件id,再取出id。所以我們在建立es索引的時候,有分詞的概念,相當於可以把filed欄位內容拆分,然後索引記錄下來。例如“我愛中國”,可以拆分成“我”,“愛”,“中國”,“我愛中國”這五個詞,同時記錄下來這幾個關鍵詞的對應文件資料id是1,當我們查詢“我”,“中國”時,都能查出這條資料。而如果使用關係型資料庫去查包含“中國”這個關鍵字的資料的時候,則需要like前後通配全表掃描,無法快速找到關鍵詞所在的資料行。
1.4 下載安裝https://www.elastic.co/cn/start 在這個地址裡面下載最新版本,目前是7.10.2(拖了一個月寫完,我下載的時候是7.9.3- -!)
Windows版是一個壓縮包檔案,解壓後(進入bin點開bat)即可使用。Linux版由於是直接在k8s里拉的映象,這裡就不做贅述。
啟動完成之後訪問:http://127.0.0.1:9200/,看見如下頁面:You Know, for Search,就算啟動成功啦。
1.5 安裝視覺化軟體像資料庫一樣,視覺化介面有Navicat,SQLyog,MySql自帶的Workbench。es也是需要一個視覺化ui介面來方便我們操作的。這裡選擇的也是官方的的kibana:
https://www.elastic.co/cn/downloads/kibana :
請注意需要選擇與es匹配的版本,如果版本不匹配,則會提示你:
或者是其他類似版本不匹配的錯誤。
安裝完成後就可以開啟kibana玩耍啦,由於我本地沒有資料,拿的是7.6.2版本搭建的elk中kibana介面:
如果需要連線環境上的es,則可以在這裡配置使用者名稱和密碼:
這個工具的搜尋很方便,不需要指定查哪個欄位的哪個值,直接在輸入框搜尋想要查詢的欄位即可。如果想看他對應的查詢語句,點開F12開啟控制檯即可研究:
es的查詢條件還是比較複雜的,但是在業務查詢當中,一些比較簡單的查詢就可以滿足大多數的通用分頁查詢了,除非是要開發報表查詢,會複雜一些。
1.6 機器要求本地跑demo的話還是很容易的,這兩個應用預設佔用記憶體都不大,有需求可以自行調小一點:
2.Java中使用Elasticsearch 2.1 使用spring-data提供的封裝 2.1.1 maven依賴 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-elasticsearch</artifactId> </dependency>
2.1.2 yml引數 2.1.3 程式碼中對映索引實體
其中“omsElasticsearchSettings”這一段的意思是像mybatis那樣解析表示式,找到omsElasticsearchSettings這個bean的getSuffix方法獲取前後綴。這樣就可以實現動態的根據環境生成對映對應的索引
1 @Configuration 2 @AllArgsConstructor 3 public class ElasticsearchConfig { 5 private final Environment env; 7 @Bean 8 public ElasticsearchSettings omsElasticsearchSettings(){ 9 return new ElasticsearchSettings().setSuffix(env.getActiveProfiles()[0]);10 }12 }13 16 @Data17 @Accessors(chain = true)18 public class ElasticsearchSettings {20 public String suffix;22 }
2.1.4 索引mapping生成在啟動專案的時候,SpringData會檢測配置中的es裡是否存在對應索引,如不存在,則會根據@Document實體中配置的@Field欄位來生成mapping檔案:
生成的Mapping Demo如下:
PUT om_package_dev/?pretty{ "settings": { "number_of_shards" :1, "number_of_replicas" : 1 }, "mappings": { "properties": { "_class": { "type": "text", "fields": { "keyword": { "type": "keyword", "ignore_above": 256 } } }, "actualFreightCost": { "type": "double" }, "actualPackageCost": { "type": "double" } } }}
2.1.5 增刪改建立一個@Repository像mybatis一樣來做增刪改查的對映封裝:
底層是SpringData提供封裝的統一方法:
儲存資料的時候直接呼叫即可:
2.1.6 查查是Es的重頭戲,我們開啟org.elasticsearch.index.query.AbstractQueryBuilder檢視實現類可以發現,繼承這個抽象類的各種查詢類有四五十個之多,不得不讓人感嘆es的查詢強大,(與反人類,學習成本太高了)。
好訊息是,如果業務場景不復雜,僅僅是想在分頁查詢上提高速度,那麼只需要掌握一下幾個類的用法即可:
我們封裝了兩個查詢列舉,一個用來定義該實體是es查詢條件實體@interface QueryEntity:
@Retention(RetentionPolicy.RUNTIME)@Target(ElementType.TYPE)public @interface QueryEntity { String[] dbOrders() default {}; String[] esOrders() default {}; String dbLogicField() default ""; String esLogicField() default "";}
另外一個是用來定義欄位,即使用es的哪個條件去查詢@interface QueryField:
@Target({ElementType.FIELD})@Retention(RetentionPolicy.RUNTIME)public @interface QueryField { String esField() default ""; String dbField() default ""; boolean like() default false; boolean range() default false; boolean require() default false; boolean match() default false; boolean commaSupported() default false; boolean isBigDecimal() default false; Class<?> searchTypeEnum() default void.class; Class<?> sortTypeEnum() default void.class;}
對應到實體上的用法demo就是:
這樣可以支援區間查詢,欄位型別,對應es欄位,從設計上規避了根據每個欄位,呼叫每個拼接語句的上百個if/else噩夢。透過一個通用的查詢工具類,來封裝拼接這些查詢條件QueryUtils:
@Slf4jpublic class QueryUtils { private static ConcurrentHashMap<Class<?>, HashMap<String, Field>> classFieldMap = new ConcurrentHashMap<>(); /** * 構建查詢 * * @param obj * @return 若為 null 說明該查詢必定不會返回結果,無需查詢 ES */ public static BoolQueryBuilder boolQuery(Object obj) { if (obj == null) { return null; } BoolQueryBuilder root = QueryBuilders.boolQuery(); if (!classFieldMap.containsKey(obj.getClass())) { HashMap<String, Field> filedNameMap = new HashMap<>(obj.getClass().getDeclaredFields().length); for (Field field : obj.getClass().getDeclaredFields()) { filedNameMap.put(field.getName(), field); } classFieldMap.put(obj.getClass(), filedNameMap); } HashMap<String, Field> filedNameMap = classFieldMap.get(obj.getClass()); QueryEntity entitySetting = obj.getClass().getAnnotation(QueryEntity.class); for (Field field : filedNameMap.values()) { QueryField fieldSetting; if ((fieldSetting = field.getAnnotation(QueryField.class)) == null) { continue; } Object value = ReflectionUtil.getValue(field, obj); if (isNullOrEmpty(value)) { if (!fieldSetting.require()) { continue; } return null; } String fieldName = getEsQueryFieldName(field, fieldSetting); if (fieldSetting.range()) { BoolQueryBuilder bool = QueryBuilders.boolQuery(); String[] arr = (String[]) value; RangeQueryBuilder range = QueryBuilders.rangeQuery(fieldName); if (arr.length != 2 || (StringUtils.isEmpty(arr[0]) && StringUtils.isEmpty(arr[1]))) { continue; } if (!StringUtils.isEmpty(arr[0]) && StringUtils.isEmpty(arr[1])) { bool.must(range.from( fieldSetting.isBigDecimal() ? new BigDecimal(arr[0]) : DateUtil.parseAndGetTimestamp(arr[0]))); } else if (StringUtils.isEmpty(arr[0]) && !StringUtils.isEmpty(arr[1])) { bool.must(range.to(fieldSetting.isBigDecimal() ? new BigDecimal(arr[1]) : DateUtil.parseAndGetTimestamp(arr[1]))); } else { bool.must(range.from(fieldSetting.isBigDecimal() ? new BigDecimal(arr[0]) : DateUtil.parseAndGetTimestamp(arr[0])). to(fieldSetting.isBigDecimal() ? new BigDecimal(arr[1]) : DateUtil.parseAndGetTimestamp(arr[1]))); } root.must(bool); } else if (field.getType() == List.class) { assert value instanceof List<?>; List<?> list = (List<?>) value; if (CollectionUtils.isEmpty(list)) { if (fieldSetting.require()) { return null; } continue; } if (list.get(0) instanceof StoreListBO) { BoolQueryBuilder bool1 = QueryBuilders.boolQuery(); for (Object store : list) { StoreListBO bo = (StoreListBO) store; BoolQueryBuilder bool2 = QueryBuilders.boolQuery(); if (!bo.getFlagAll()) { bool2.must(QueryBuilders.termQuery("platformCode", bo.getPlatformCode())); bool2.must(QueryBuilders.termsQuery("storeCode", bo.getStoreCodeList())); } bool1.should(bool2); } root.must(bool1); } else { root.must(QueryBuilders.termsQuery(fieldName, (List<?>) value)); } } else if (fieldSetting.like()) { root.must(QueryBuilders.wildcardQuery(fieldName, String.format("*%s*", value))); } else if (fieldSetting.commaSupported()) { root.must(QueryBuilders.termsQuery(fieldName, StringUtility.splitCommaString((String) value))); } else if (fieldSetting.match()) { if (fieldSetting.commaSupported()) { root.must(QueryBuilders.multiMatchQuery(fieldName, StringUtility.splitCommaString((String) value))); } else { root.must(QueryBuilders.matchQuery(fieldName, value)); } } else if (fieldSetting.searchTypeEnum().isEnum()) { try { Object[] objects = fieldSetting.searchTypeEnum().getEnumConstants(); if (objects[0] instanceof IEsSearchTypeEnum) { IEsSearchTypeEnum searchTypeEnum = (IEsSearchTypeEnum) objects[0]; fieldName = searchTypeEnum.getFiledName((Integer) value); Field filed = filedNameMap.get(IEsSearchTypeEnum.searchContent); filed.setAccessible(true); String searchContent = (String) ReflectUtil.getField(filed, obj); if (!StringUtils.isEmpty(fieldName) && !StringUtils.isEmpty(searchContent)) { root.must(QueryBuilders.termsQuery(fieldName, searchContent.split(","))); } } } catch (Exception e) { e.printStackTrace(); log.error("拼接搜尋型別有誤:", e.getMessage()); } } else if (fieldSetting.sortTypeEnum().isEnum()) { continue; } else { root.must(QueryBuilders.termQuery(fieldName, value)); } } if (entitySetting != null) { if (!StringUtils.isEmpty(entitySetting.esLogicField())) { root.must(QueryBuilders.termQuery(entitySetting.esLogicField(), LogicValueConstants.NORMAL)); } } root.must(QueryBuilders.termQuery("tenantId", AuthUtil.getTenantId())); log.info("query : {}", Strings.toString(root)); return root; } private static boolean isNullOrEmpty(Object value) { return Objects.isNull(value) || isEmptyString(value) || isEmptyCollection(value); } private static String getEsQueryFieldName(Field field, QueryField fieldSetting) { return StringUtils.isEmpty(fieldSetting.esField()) ? field.getName() : fieldSetting.esField(); } private static boolean isEmptyCollection(Object value) { return (value instanceof Collection) && CollectionUtils.isEmpty((Collection<?>) value); } private static boolean isEmptyString(Object value) { return (value instanceof String) && StringUtils.isEmpty(value); } public static void handlePageable(Object obj, NativeSearchQueryBuilder builder) { if (obj instanceof PageDTO) { PageDTO pageDTO = (PageDTO) obj; builder.withPageable(PageRequest.of(pageDTO.currForEsPaging(), pageDTO.size())); } } public static void dealSort(Object obj, NativeSearchQueryBuilder builder) { // 預設按最後更新時間倒序 String fieldName = null; Boolean isAsc = false; Boolean asc2Desc; try { HashMap<String, Field> fieldNameMap = classFieldMap.get(obj.getClass()); Field sortTypeField = fieldNameMap.get(IEsSortTypeEnum.SORT_TYPE); if (sortTypeField != null) { QueryField fieldSetting = sortTypeField.getAnnotation(QueryField.class); if (fieldSetting != null && fieldSetting.sortTypeEnum().isEnum()) { Object[] objects = fieldSetting.sortTypeEnum().getEnumConstants(); if (objects[0] instanceof IEsSortTypeEnum) { IEsSortTypeEnum sortTypeEnum = (IEsSortTypeEnum) objects[0]; fieldName = sortTypeEnum.getFiledName((Integer) sortTypeField.get(obj)); asc2Desc = sortTypeEnum.getAsc2Desc((Integer) sortTypeField.get(obj)); Field filed = ReflectUtil.getField(obj.getClass(), IEsSortTypeEnum.SORT_ASC); filed.setAccessible(true); isAsc = (Boolean) ReflectUtil.getField(filed, obj); if (isAsc != null) { isAsc = asc2Desc ? !isAsc : isAsc; } } } } } catch (Exception e) { e.printStackTrace(); log.error("拼接排序型別有誤:", e); } builder.withSort(SortBuilders.fieldSort(fieldName == null ? IEsSortTypeEnum.DEFAULT_SORT_FILED : fieldName).order(isAsc == null ? SortOrder.DESC : isAsc ? SortOrder.ASC : SortOrder.DESC)); log.info("es 排序引數" + Strings.toString(builder.build().getElasticsearchSorts().get(0))); }
支援排序拼接、count統計型別拼接、時間區間拼接,金額拼接、list集合查詢拼接,輸入多個單號的時候,透過分隔符分隔拼接
實體搜尋型別
(PS:1.8新增了列舉類可以實現介面,這樣列舉用起來也很舒服了)
這兩個列舉類的作用主要是適配按照不同的搜尋條件以及排序條件排序
3.千萬級資料測試 3.1資料準備標題寫的那麼誇張,千萬級資料,哈哈,其實就是老套路搞了個儲存過程往資料庫塞一千萬資料,然後同步到es測試啦
DROP PROCEDURE IF EXISTS test;DELIMITER $$CREATE PROCEDURE test()BEGIN DECLARE v_i INT UNSIGNED DEFAULT 10000001; WHILE v_i < 10000894 DO INSERT INTO ‘test’ VALUES('v_i') SET v_i = v_i+1; END WHILE;END $$DELIMITER ;CALL test();
言歸正傳,測試的目的有兩個,一個是後臺任務的同步程式碼情況,驗證資料庫與es的資料一致性,同時估算效能,做到上線時遷移資料心中有數。二是模擬es在大資料量的情況下會不會有什麼影響。
最終es結果如下:
後臺任務分頁查資料庫,每頁五千條,加上其他關聯表查詢,5000條差不多1~2秒。機器效能i7 9700 32g,10000000 / 5000 * 2s / 3600s = 大概一個小時左右同步完成。一千萬資料大概佔用1.3gb空間,要根據mapping欄位多少來看。僅供參考分頁這邊需要調整引數:要不然es預設只能查出最大10000條。同時也需要調整es的引數:
PUT om_package_dev/_settings{ "index" : {"max_result_window" : 10000000}}
其實這個地方可以從業務角度思考一下,es預設10000也不是沒有道理。對於大資料量,精準點選第666666頁的人都是像我這樣吃飽了沒事幹的。點到那頁去幹嘛?點之前你也不知道那頁有啥呀。。。並且es分頁效率也很低,選最後一頁很慢。大資料量如果需要查詢,一般根據條件精準查詢。目前這點資料量查詢還是非常快的。
4.小結 4.1 資料一致性目前我們的方案主要是靠程式碼層面實現。當資料有變動時,傳送一條訊息給mq,由mq非同步去同步es。同時,有一個後臺任務一直在跑三分鐘(根據資料量決定)以內的資料,以防mq失效有一個兜底任務。當然還有其他方案,比如透過MySQL的binlog寫到es裡面去,這種方案對效能要求高,同時需要引入第三方元件。最終我們選擇了程式碼層面自己比較可控的一種方案。
4.2 elasticsearch-sql從開始看見拼es查詢條件,就在想如果能直接把sql轉化成es就好了。後來搜了一下,果然有這種好東西,是中國自然語言處理開源組織提供的外掛。但是已經寫完了通用查詢,就沒有去研究這個外掛怎麼用。有興趣的小夥伴可以試試。https://github.com/NLPchina/elasticsearch-sql 另外:kibana的工具控制檯也可以直接傳送sql請求
POST _sql/translate{ "query": """ SELECT doc.message FROM "filebeat-7.6.2-2021.01.30" """, "fetch_size": 100}
4.3 Connection Rest By Peer
出現Connection Rest By Peer的問題,一般是一端關閉了連線,而另一端還以為對方在呢,然後傻乎乎的發請求過去,發現對方已經不跟它玩了。查看了es所在機器的k8s keepalive設定:
可以看見預設的keepalive連線超時時間是7200s,也就是兩小時。吻合了測試發現的報錯時間點,也符合日誌中記錄的時間。而es客戶端這邊如果不指定keepalive的話,預設取的是ConnectionKeepAliveStrategy裡面的-1。所以java客戶端這邊-1不會斷開連線,而Linux那邊兩小時就會斷開,從而造成了Connection Rest By Peer。研究了一下SpringData配置es引數的地方,發現Spring配置除了獲取配置中的url和密碼之外,沒有可以配置keepalive的地方。只有重寫RestClientBuilder的構建邏輯,實際上SpringData底層也是用的es提供的客戶端,只不過在上層再封裝了一下:
package com.zhkj.oms.config;import org.apache.http.HeaderElement;import org.apache.http.HeaderElementIterator;import org.apache.http.HttpHost;import org.apache.http.HttpResponse;import org.apache.http.auth.AuthScope;import org.apache.http.auth.UsernamePasswordCredentials;import org.apache.http.client.CredentialsProvider;import org.apache.http.conn.ConnectionKeepAliveStrategy;import org.apache.http.impl.client.BasicCredentialsProvider;import org.apache.http.message.BasicHeaderElementIterator;import org.apache.http.protocol.HTTP;import org.apache.http.protocol.HttpContext;import org.apache.http.util.Args;import org.elasticsearch.client.RestClient;import org.elasticsearch.client.RestClientBuilder;import org.springframework.beans.factory.ObjectProvider;import org.springframework.boot.autoconfigure.elasticsearch.ElasticsearchRestClientProperties;import org.springframework.boot.autoconfigure.elasticsearch.RestClientBuilderCustomizer;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.util.StringUtils;import java.net.URI;import java.net.URISyntaxException;/** * @author Xxx * @since 2021/1/19/0019 11:04 * * 0.運維那邊沒有改keepalive,linux預設7200s 我們這邊預設-1無限制 * 1.SpringData裡面沒有設定keepalive的地方,只有重寫RestClientBuilder的構建 * 2.再重新實現HttpAsyncClientBuilder裡面的ConnectionKeepAliveStrategy獲取keepalive的方法 */@Configurationpublic class EsRestClientBuilderConfig { @Bean RestClientBuilder elasticsearchRestClientBuilder(ElasticsearchRestClientProperties properties, ObjectProvider<RestClientBuilderCustomizer> builderCustomizers) { HttpHost[] hosts = properties.getUris().stream().map(this::createHttpHost).toArray(HttpHost[]::new); RestClientBuilder builder = RestClient.builder(hosts); final CredentialsProvider credentialsProvider = new BasicCredentialsProvider(); credentialsProvider.setCredentials(AuthScope.ANY, new UsernamePasswordCredentials(properties.getUsername(), properties.getPassword())); builder.setHttpClientConfigCallback((httpClientBuilder) -> { builderCustomizers.orderedStream().forEach((customizer) -> customizer.customize(httpClientBuilder)); httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider); httpClientBuilder.setKeepAliveStrategy(new ConnectionKeepAliveStrategy() { @Override public long getKeepAliveDuration(HttpResponse response, HttpContext context) { Args.notNull(response, "HTTP response"); final HeaderElementIterator it = new BasicHeaderElementIterator( response.headerIterator(HTTP.CONN_KEEP_ALIVE)); while (it.hasNext()) { final HeaderElement he = it.nextElement(); final String param = he.getName(); final String value = he.getValue(); if (value != null && param.equalsIgnoreCase("timeout")) { try { return Long.parseLong(value) * 1000; } catch (final NumberFormatException ignore) { } } } // 三分鐘 return 1 * 60 * 1 * 1000; }}); return httpClientBuilder; }); builder.setRequestConfigCallback((requestConfigBuilder) -> { builderCustomizers.orderedStream().forEach((customizer) -> customizer.customize(requestConfigBuilder)); return requestConfigBuilder; }); builderCustomizers.orderedStream().forEach((customizer) -> customizer.customize(builder)); return builder; } private HttpHost createHttpHost(String uri) { try { return createHttpHost(URI.create(uri)); } catch (IllegalArgumentException ex) { return HttpHost.create(uri); } } private HttpHost createHttpHost(URI uri) { if (!StringUtils.hasLength(uri.getUserInfo())) { return HttpHost.create(uri.toString()); } try { return HttpHost.create(new URI(uri.getScheme(), null, uri.getHost(), uri.getPort(), uri.getPath(), uri.getQuery(), uri.getFragment()).toString()); } catch (URISyntaxException ex) { throw new IllegalStateException(ex); } }}
看完三件事❤️如果你覺得這篇內容對你還蠻有幫助,我想邀請你幫我三個小忙: