一、背景
Elasticsearch是最近幾年非常熱門的分散式搜尋和資料分析引擎,攜程內部不僅使用ES實現了大規模的日誌平臺,也廣泛使用ES實現了各個業務場景的搜尋、推薦等功能。
二、現狀調研
我們的需求大致包括全量、增量地從Hive、MySQL、Soa服務、Mq等不同型別的資料來源獲取資料,部分資料還需要進行一定的計算或者轉換,然後近實時地同步到ES中,以被使用者搜尋到。
索引內容為文章,主要的資訊儲存在article表裡;每個文章關聯了tag,儲存在article_tag表裡;tag表裡的tagName也需要進入ES索引,以便使用標籤名字搜尋文章。在以前同步這樣的資料進入ES,單條文章的資料組裝虛擬碼如下:
List<Long> tagIds = articleTagDao.query("select tagId from article_tags where articleId=?", articleId);
List<TagPojo> tags =tagDao.query("select id, name from tags whereid in (?)");
ArticleInEs articleInEs = new ArticleInEs();
articleInEs.setTagIds(tagIds);
articleInEs.setTagNames(tags.stream().filter(tag-> tagIds.contains(tag.getId())).map(TagPojo::getName).collect(Collectors.toList()));
只是一個標籤的資訊的組裝程式碼就如此繁瑣,而實際的情況要複雜得多。比如可能會有十幾個乃至幾十個ES索引欄位,或者還要考慮程式碼、SQL效能以及業務邏輯,組裝資料這個工作本身就已經讓人頭禿了,更別說可能還有業務的邏輯要處理了。
由於攜程使用ES的業務非常多,迫切需要一個簡單易用的框架或者工具完成這個工作,以便大家從繁瑣重複的程式碼裡脫身,專注在完成業務本身上。
開源社群有很多類似的實現,類似elasticsearch-jdbc、go-mysql-elasticsearch、Logstash等,經過評估我們認為存在以下問題,無法落地:
常見的工具都是基於配置的,固然非常方便,但是由於安全的原因,我們拿不到生產環境的DB的明文連線串,無法配置資料來源;從DB得到的資料有時候需要經過處理以後才能推送給ES,簡單的基於配置的方式無法滿足;ES的使用者名稱、密碼等,我們不希望出現在配置中,需要有統一的地方對連線資訊進行管理,以保證安全及便於維護;資料的組裝有的場景比較複雜,目前這些工具的配置寫起來可能比程式碼還繁瑣;增量的資料來源,有時候是MQ,無法使用配置進行更新;有的工具是單獨的命令列,無法和我們的JOB結合(基於JAVA)。評估下來,發現這幾個工具更適用於簡單的DB資料,或者已經有了DB平表的場景。一則我們場景有比較複雜的,一則有平表的話同步到ES已經不是多麻煩的事情了。我們需要造一個車輪才能解決全部需求,除了滿足以上提到的開源工具不支援的場景,還要保留他們的基於配置的機制,達到只完成必要的SQL、ES Mapping,和增加必要的資料來源即可完成索引的建立工作。
三、實現思路
整體同步模組的結構如下:
我們從不同的維度來介紹元件的設計思路:
1、根據索引方式
1)第一種是全量同步,全量由於是對整個索引的變更,所以既要保證穩定,又要確保不會建立有問題的索引。
全量會從零建立一個全新的索引。同步開始前,會修改索引mapping的number_of_replicas為0、refresh_interval為-1等引數,以提高索引速度。同時增加了一個_indexTime的欄位,表示本資料的更新時間,用於後續的查錯、比對等;索引全部完成以後,增加了force merge、恢復mapping修改、_refresh等操作,保證建立的索引是儘可能緊湊的;透過定時確保索引的health索引是綠色的,以免未完全準備好切換後耗時不穩定;最後檢查本次索引最終的有效文件數和線上有效索引的文件數的差值是否在配置的可接受範圍內,如果不在則認為本次索引出錯,刪除本次索引內容。
2)第二種是MQ增量,攜程內部使用QMQ接收了來自Otter的MySql變更,MQ裡會有變化的資料庫表、欄位的資訊,因此可以針對性地實現對MQ的解析的規則,從而可以輕而易舉地達到透過MQ對ES裡的文件進行全部、部分的索引。由於大部分情況MQ裡的資訊都比較少,因此更多的情況下,建議收到MQ以後,採用第三種方法進行增量。
4)最後一種是時間增量,元件會維護每個索引的更新時間,以確保增量JOB滾動執行的時候,新的資料總是能儘快地進入ES。由於該方案會要求定期查指定表裡的最新資料,因此對DB不是很友好,大部分情況下我們並不是很建議使用該方法維護索引。
以上幾種場景,全量、Id和時間增量,都要求配置能拼裝出完整的文件,確保每次傳送給ES的都是完整的文件。實現上需要確保建立全量的過程裡發生的增量,在新建的索引切換到線上使用之前,能夠同步到增量,這一步通常是在MQ裡進行處理。
1)基於SQL的配置,完成類似文章標籤的關聯,這種組裝場景非常簡單,有時候也是為了簡化SQL或者最佳化SQL效能,而單獨拎出來的查詢;由於邏輯的通用型,因此內建了程式碼外掛實現該型別資料的讀取和組裝。
2)基於程式碼的處理,適用於類似SQL不方便完成、需要從SOA服務查詢資料,或者資料需要進行復雜處理的場景;這種需要使用自己根據元件的要求實現自定義外掛,以提供資料給元件進行統一處理。
3、根據模組
1)Runner,是元件的呼叫總入口,負責引數的解析、Executor的生成、Rule等模組的初始化等。藉助封裝的友好型,Runner可以配合分散式JOB完成同一索引的並行建立,以加快整個索引的建立速度,這種並行方式在內部已經廣泛使用。
2)Query,是整個內部流程的控制中心,負責根據Runner傳入的引數,進行SQL的拼裝、DB的讀取、Executor和Plugin的排程等。不同的索引方式,需要對SQL進行不同的預處理,類似時間增量需要維護增量的時間等,也在該模組內完成。為了簡化開發成本,Query裡也實現了執行配置裡指定的Groovy指令碼,在資料進入Executor前可以在指令碼中進行處理,某些簡單的場景裡可以非常輕便地實現資料過濾和處理。
3)PluginManager,負責外掛配置的解析、外掛例項的生成、外掛的呼叫管理等。我們歸納了常見的資料組裝的方式,提供了幾個內建的外掛,基本上就能完成大部分基於DB的資料獲取和組裝。比如Assoc Plugin可以完成類似文章標籤這種聚合場景,Map Plugin可以完成類似Map的對映場景,而Filter則支援對每一條資料進行簡單的過濾處理,類似去Html、去重等。為了減少對DB的壓力,內建的外掛都支援設定資料的快取時間,有效時間內,優先取記憶體裡儲存的資料。
4)Executor,用於接收來自Query的資料,完成真正落地動作。內建了兩種Executor,使用方可以按照具體情況選擇:
IndexExecutor,將資料透過Bulk方式提交資料到ES,從而更新ES索引;同時也要完成新索引的建立、索引狀態的更新、別名切換等動作;PersistExecutor,會把來自Query的資料,寫入指定的資料庫裡的平表。這也能看出有平表的話,同步到ES已經很簡單了。5)RuleManager/Rule Loader,用於完成規則的載入和管理,支援從公司統一的QConfig或者當前工程的資原始檔夾讀取配置。外掛實現了像ES一樣的檢測,對不符規範的配置會提供相應的報錯,以減少因配置問題造成的資料錯誤。
四、小結
目前已經有數個業務幾十個索引使用了該元件維護索引,讓研發人員最大程度的關注在業務邏輯上,而不被繁瑣的重複程式碼所幹擾。透過該元件,可以將不同資料來源的資料,透過組裝匯出到ES索引中,也能匯出到DB平表中,因此在部分資料同步的場景裡也可以使用。
作者丨dot,攜程技術經理,專注高併發、高效能領域