首頁>技術>

平臺主要包括分散式資料訪問中介軟體(SDK、Proxy)、平滑擴容、資料整合、管控平臺等四部分。

一、分散式資料訪問中介軟體

資料拆分後,分散在多個庫與表中,但應用開發時怎樣才能準確訪問資料庫,換言之,如何才能拿到準確的資料庫連線,拼接出正確的sql(主要是實際表名),然後執行返回結果集呢?

為了儘可能減少業務侵入性,應用少做改造,往往都會抽象出一個數據訪問層負責上述功能。資料訪問層按實現方式不同,可分為應用自定義、資料中介軟體、分散式資料庫三種方式,在我們專案中採用的是中介軟體方式,其技術架構如下:

分散式資料訪問層

按照接入方式不同,資料訪問中介軟體可以分為SDK、Proxy(雲原生架構下可能還會有sidecar方式)。

一個典型的分庫分表中介軟體由JDBC介面實現(SDK模式)、MySQL報文解析(Proxy、Sider模式)、SQL解析器,路由計算、SQL重寫, SQL執行、聚合處理、結果集合並、資料來源管理、配置管理等部分構成。

1、JDBC介面實現

JDBC介面實現起來並不太難,資料庫連線池都是基於此實現,本質上就是一種裝飾器模式,主要就是java.sql與javax.sql包下DataSource、Connection、Statement,PreparedStatement,ResultSet、DatabaseMetaData、ResultSetMetaData等介面。這些介面也並不是都需要實現,不常用的介面可在整合一些框架時根據需要再實現。

2、MySQL報文解析

MySQL報文解析比JDBC介面複雜些,它包含了很多MySQL的命令,需要對照MySQL報文規範分別進行解析,另外由於proxy還要支援常見DBA工具接入,比如MySQL CLI、Navicat、Dbvisualizer、MySQL workbench等,這些工具甚至不同版本使用的MySQL報文都不完全一樣,這塊的相容性也是一個繁瑣的工作,考驗對Mysql報文的支援的完整度。這部分像Sharding-Proxy、Mycat等都有實現,如果要自行研發或者擴充套件優化,可參考其實現細節。

MySQL報文規範:https://dev.mysql.com/doc/internals/en/client-server-protocol.html

3、SQL解析

SQL解析是個繁瑣複雜的活兒,對應就是詞法Lexer與語法分析Parser,因為要最大程度相容各資料庫廠商SQL,這塊是需要不斷的迭代增強的。開源的手寫解析器有阿里開源的druid,也可以使用javacc、antlr等進行實現,相比手寫解析器速度要慢些,但擴充套件定製化能力更好。這類解析器在使用方式上,多采用vistor設計模式,如果需要可以編寫自己的vistor從而獲取所需AST(Abstract Syntax Tree)中的各類值。

4、路由計算

路由計算是根據SQL解析後AST,提取分庫分表列值(提取規則是預先配置好的),然後根據應用指定的運算表示式或者函式進行計算,分別得到資料庫與表對應的序號(一般就是一個整型數值,類似一個數組下標)或者是真正的物理表名。讀寫分離模式下,只涉及庫路由,會根據一個負載均衡演算法選取一個合適的物理庫,如果寫SQL則會選擇主庫,如果是讀則會按照隨機、輪詢或者權重等演算法選擇一個從庫。

5、SQL重寫

SQL重寫主要為表名新增字尾(應用寫SQL時是邏輯表名,實際表名往往是邏輯表名+序號),根據路由計算環節得到的物理表名,替換原SQL中的邏輯表名。另外SQL中有聚合函式、多庫表分頁等操作時,也會涉及到對SQL的改寫,這部分有的開源中介軟體裡也叫做SQL優化。注意這裡最好不要簡單的用字串匹配去替換表名,例如當存在列名與表名一樣的情況下會出現問題。

6、SQL執行

SQL執行負責SQL的真正執行,對應的就是執行連線池或資料庫驅動中Statement的execute、executeQuery、executeUpdate、executeBatch等方法。當然如果是涉及到多庫多表的SQL,例如where條件不包含分庫分表鍵,這時會涉及到庫表掃描,則需要考慮是連線優先還是記憶體優先,即採用多少個併發資料庫連線執行,連線數太大則會可能耗盡連線池,給記憶體以及資料庫帶來很大壓力;但連線數太小則會拉長SQL執行時間,很有可能帶來超時問題,所以一個強大的SQL執行器還會根據SQL型別、資料分佈、連線數等因素生成一個到合適的執行計劃。

7、資料來源管理

資料來源管理負責維護各資料庫的連線,這塊實現起來比較簡單,一般維護一個數據庫連線池DataSource物件的Map就可以,只要根據資料來源下標或者名稱可以拿到對應的資料庫連線即可。

8、聚合處理

聚合處理負責對聚合類函式的處理,因為分庫分表後,實際執行的SQL都是面向單庫的,而對於max、min、sum、count、avg等聚合操作,需要將各單庫返回的結果進行二次處理才能計算出準確的值,例如max、min、sum、count需要遍歷個各庫結果,然後分別取最大、最小、累加,對於avg操作,還需要將原SQL修改為select sum,count,然後分別累加,最後用累積後的sum除以累加後count才能得到準確值。另外對於多庫表的分頁操作,例如limit 1,10,則將單庫SQL的起始頁都修改為第一頁即limit 0,10,然後再整體排序取出前10個才是正確的資料。

9、結果集合並

結果集合並負責將多個SQL執行單元返回的資料集進行合併,然後返回給呼叫客戶端。一般當進行庫表遍歷、或者涉及多個庫SQL(例如使用in時)會需要進行合併。當然並不一定需要把資料全部讀到記憶體再合併,有時基於資料庫驅動實現的ResultSet.next()函式,逐條從資料庫獲取資料即可滿足要求。關於結果集合並,sharding-jdbc對此有一個更豐富的抽象與分類,支援流式歸併、記憶體歸併、分組歸併等,具體可參見歸併引擎。

歸併引擎:https://shardingsphere.apache.org/document/current/cn/features/sharding/principle/merge/

10、配置管理

配置管理負責分庫分表的規則以及資料來源的定義,這塊是面向應用開發者的,在使用體驗上應當簡單、易用、靈活。其中會涉及到物理資料來源(引數跟連線池類似)、邏輯表、路由規則(庫路由、表路由,庫表分佈,支援指定java函式或者groovy表示式),邏輯表->路由規則的對映關係。另外我們在實踐時還包括了一些元資料資訊,包括shardID->庫表序號,這樣做有個好處,業務在配置路由規則時只需要關注業務物件->shardID即可。配置管理在具體形式方面,可以支援xml、yaml、也支援在管控平臺上線上進行配置,後者會通過將配置同步到配置中心,進而支援資料訪問層進行編排(orchestration),例如線上擴容時需要動態增加資料來源、修改路由規則、元資料資訊等。

一個完整的分散式資料訪問中介軟體,在架構上和資料庫的計算層很像,尤其如果涉及到DB協議報文與SQL的解析,還是一個複雜和工作量較大的工程,因此一般應用團隊建議還是採用開源成熟的方案,基於此做定製優化即可,沒必要重複造輪子。

SDK和Proxy方式各有優缺點,在我們專案中分別用在不同的場景,簡單總結如下:

聯機交易 高頻、高併發,查詢帶拆分鍵,資料量小,sdk方式;運維 低頻、查詢條件靈活,資料量大,以查詢為主 proxy方式;批量 不攜帶分庫分表列,資料量大,查詢、更新、插入、刪除都有,通過API指定庫表方式。

接下來介紹下我們在開源中介軟體方面的實踐,分為三個階段:

第一階段

早些年這類開源中介軟體還挺多,但其實都沒有一個穩定的社群支援。2015年時我們基於一個類似TDDL的元件,對其事務、資料連線池、SQL解析等方面進行了優化,修復了數十個開發遇到的bug,實現SDK版本的資料訪問中介軟體,暫就叫做DAL。

第二階段

2017年,系統上線後發現,開發測試以及運維還需要一個執行分庫分表SQL的平臺,於是我們調研了Mycat,但當時1.6版本只支援單維度拆分(單庫內分表或者只分庫),因此我們重寫了其後端SQL路由模組,結合原SDK版本資料元件,利用Mycat的報文解析實現了Proxy的資料訪問層。

Proxy模式的資料訪問層上線後,可以很好的應對帶分庫分表鍵的SQL操作,但在涉及到庫表遍歷時,由於併發連線太多,經常會導致連線數不夠,但如果序列執行則經常導致執行時間太長,最後超時報錯。針對這個問題,我們做了個新的優化:

在將這類庫表遍歷的查詢在生成執行計劃時,通過union all進行了改寫,類似map-reduce,同一庫上的不同表的sql通過union all合併,然後發到資料庫執行,這樣連線數=物理資料庫總數,同時儘可能的利用了資料庫的計算能力,在損耗較少連線數的前提下,大大提升了這類SQL的執行效率。(注意order by 和limit需要加在union all的最後,為了不影響主庫,可以將這類查詢在從庫執行)。

例如user表拆分成1024表,分佈在4個庫,SQL拆分與合併示意圖如下:

通過union all實現庫表遍歷

第三階段

這兩個中介軟體在執行3年左右後,也暴露出來了很多問題,例如SQL限制太多,相容性太差,開源社群不活躍,部分核心程式碼設計結構不夠清晰等,這給後續更復雜場景的使用帶來了很多桎梏。因此在19年,我們決定對資料訪問層進行升級重構,將底層分庫分表元件與上層配置、編排進行剝離,改成插拔式設計,增加更加多元的分庫分表元件。在那時開源社群已經湧現了一些優秀的分庫分表專案,目前來看做的最好的就是shardingshpere(後面簡稱ss)了,ss的設計與使用手冊其官網都有詳細介紹,這裡主要簡單介紹下我們整合ss的一些實踐。

shardingsphere整體設計架構清晰,核心各個引擎設計職責明確,jdbc 與proxy版本共享核心,接入端支援的多種實現方式。治理、事務、SQL解析器分別單獨抽象出來,都可以hook方式進行整合,通過SPI進行擴充套件。這種靈活的設計也為我們定製帶來了很大的方便,程式碼實現上比較優雅。我們在整合時開始是3.0.0版本,後來升級到4.0.0-RC1版本,目前ss已釋出4.0.0的release版本。

1)配置相容

因為要在上層應用無感知的情況下更換底層分庫分表引擎,所以改造的第一個問題就是相容以前的配置。基於此,也就無法直接使用sharding-jdbc的spring或者yaml配置方式,而改用API方式,將原配置都轉換為sharding-jdbc的配置物件。這塊工作量時改造裡最大的,但如果專案之前並沒有分庫分表配置,則直接在sharding-jdbc提供的方式中選擇一種即可。由於我們專案中需要支援規則鏈、讀權重等ss不支援功能,所以我們是基於ComplexKeysShardingAlgorithm介面進行的實現。

更簡潔的yaml配置形式:

ds:  master_0:    blockingTimeoutMillis: 5000    borrowConnectionTimeout: 30    connectionProperties: {}    idleTimeoutMinutes: 30    jdbcUrl: jdbc:mysql://localhost:3306/shard_0    logAbandoned: false    maintenanceInterval: 60    maxConn: 10    maxIdleTime: 61    minConn: 1    userName: root    password: 123456    queryTimeout: 30    testOnBorrow: false    testOnReturn: false    testQuery: null    testWhileIdle: true    timeBetweenEvictionRunsMillis: 60000  master_1:    jdbcUrl: jdbc:mysql://localhost:3306/shard_1    parent: master_0groupRule: nullshardRule:  bindingTables:   - user,name   rules:    userTableRule:      dbIndexs: master_0,master_1      dataNodes: master_0.user_${['00','01']},master_1.user_${['02','03']}      dbRules:       - return test.dal.jdbc.shardingjdbc.YamlShardRuleParser.parserDbIndex(#user_id#)       - return test.dal.jdbc.shardingjdbc.YamlShardRuleParser.parserDbIndexByName(#name#,#address#)      tbRules:       - return test.dal.jdbc.shardingjdbc.YamlShardRuleParser.parserTbIndex(#user_id#)       - return test.dal.jdbc.shardingjdbc.YamlShardRuleParser.parserTbIndexByName(#name#,#address#)         tableRuleMap: {name: nameTableRule, user: userTableRule}

2)事務級別

sharding-jdbc的預設事務是local,即最大努力一階段提交,或者叫鏈式提交,這種方式的好處是對應用透明,效能也還不錯,網際網路中使用較多。但這種方式可能會由於網路等原因導致部分提交成功,部分失敗。雖然這種概率可能並不高,但一旦出現則會產生事務不一致的問題,這在金融關鍵場景下風險是很高的。所以我們在聯機交易場景下禁止使用這種方式,而是要求必須嚴格單庫事務,我們在先前SDK版本的資料訪問中介軟體增加了校驗,一旦跨庫就直接拋異常。因此切換到sharding-jdbc,這種事務級別也要繼續支援。實現程式碼片段:

/** * Single DB Transaction Manager *  SPI:  org.apache.shardingsphere.transaction.spi.ShardingTransactionManager */@NoArgsConstructorpublic class SingleDBTransactionManager implements ShardingTransactionManager {  private Map<String, DataSource> dataSources = new HashMap<String, DataSource>();  private ThreadLocal<String> targetDataSourceName = new ThreadLocal<String>() {    protected String initialValue() {      return null;    }  };  private ThreadLocal<Connection> connection = new ThreadLocal<Connection>() {    protected Connection initialValue() {      return null;    }  };  private ThreadLocal<Boolean> autoCommitted = new ThreadLocal<Boolean>() {    protected Boolean initialValue() {      return true;    }  };  @Override  public void close() throws Exception {    if (connection.get() != null) {      connection.get().close();    }  }  @Override  public void init(DatabaseType databaseType, Collection<ResourceDataSource> resourceDataSources) {    for (ResourceDataSource res : resourceDataSources) {      dataSources.put(res.getOriginalName(), res.getDataSource());    }  }  @Override  public TransactionType getTransactionType() {    return TransactionType.SINGLEDB;  }  @Override  public Connection getConnection(String dataSourceName) throws SQLException {    if (!ConditionChecker.getInstance().isMultiDbTxAllowed() && targetDataSourceName.get() != null        && !targetDataSourceName.get().equals(dataSourceName)) {      throw new TransactionException(          "Don't allow multi-db transaction currently.previous dataSource key="              + targetDataSourceName.get() + ", new dataSource key=" + dataSourceName);    }    targetDataSourceName.set(dataSourceName);    if (connection.get() == null) {      connection.set(dataSources.get(dataSourceName).getConnection());    }    return connection.get();  }…}

3)讀庫權重

雖然多個從庫(一個主一般都要掛兩個或者三個從,從庫的數量由RPO、多活甚至監管要求等因素決定)可以提供讀功能,但細分的話,這些從庫其實是有“差別”的,這種差異性有可能是由於機器硬體配置,也可能是由於所在機房、網路原因導致,這種時候就會需要支援讀許可權的權重配置,例如我們專案中有單元化的設計,需要根據當前所在單元及權重配置路由到當前機房的從庫。另外也可以通過調整權重,支援線上對資料庫進行維護或者升級等運維操作。實現程式碼片段:

/** * Weight based slave database load-balance algorithm. * SPI: org.apache.shardingsphere.spi.masterslave.MasterSlaveLoadBalanceAlgorithm */public final class WeightMasterSlaveLoadBalanceAlgorithm implements MasterSlaveLoadBalanceAlgorithm {  public final static String TYPE = "WEIGHT";      protected DataSource dataSource;  public WeightMasterSlaveLoadBalanceAlgorithm(DataSource ds) {    this.dataSource = ds;  }    public  WeightMasterSlaveLoadBalanceAlgorithm(){      }    @Override    public String getDataSource(final String name, final String masterDataSourceName, final List<String> slaveDataSourceNames) {      String selectReadDb = dataSource.getTableRuleContext().getGroupRule(name).selectReadDb();      return slaveDataSourceNames.contains(selectReadDb) ? selectReadDb : null;    }  @Override  public String getType() {    return TYPE;  }

4)SQL開關

SDK模式的資料訪問中介軟體,主要用在聯機交易中,在這類場景下,是沒有DDL操作需求的,也是不允許的,但shading-jdbc作為一個通用的資料分片中介軟體。對此並沒有相應的開關配置,因此我們增加開關功能,應用在預設情況下,對DDL、DCL等語句進行了校驗,不允許執行該類SQL,在技術層面杜絕了應用的誤用。實現程式碼片段:

//SPI: org.apache.shardingsphere.core.parse.hook.ParsingHookpublic class AccessPrevilegeCheckHook implements ParsingHook {  @Override  public void start(String sql) {  }  @Override  public void finishSuccess(SQLStatement sqlStatement, ShardingTableMetaData shardingTableMetaData) {    ConditionChecker.getInstance().checkDdlAndDcl(sqlStatement);  }…}//SPI:org.apache.shardingsphere.core.rewrite.hook.RewriteHook@NoArgsConstructorpublic class TableScanCheckHook implements RewriteHook {    private List<TableUnit> tableUnits = new LinkedList<TableUnit>();  @Override  public void start(TableUnit tableUnit) {    if(tableUnits.size() > 0 && !ConditionChecker.getInstance().isTableScanAllowed()){      throw new RouteException("Don't allow table scan.");    }    tableUnits.add(tableUnit);    }…}public class ConditionChecker {  private static ThreadLocal<SQLType> sqlTypeSnapshot = new ThreadLocal<SQLType>();  private boolean defalutTableScanAllowed = true;  private boolean defalutMultiDbTxAllowed = true;  private boolean defalutDdlAndDclAllowed = true;  private static ConditionChecker checker = new ConditionChecker();  public static ConditionChecker getInstance() {    return checker;  }  private ConditionChecker() {  }  private ThreadLocal<Boolean> tableScanAllowed = new ThreadLocal<Boolean>() {    protected Boolean initialValue() {      return defalutTableScanAllowed;    }  };  private ThreadLocal<Boolean> multiDbTxAllowed = new ThreadLocal<Boolean>() {    protected Boolean initialValue() {      return defalutMultiDbTxAllowed;    }  };  private ThreadLocal<Boolean> ddlAndDclAllowed = new ThreadLocal<Boolean>() {    protected Boolean initialValue() {      return defalutDdlAndDclAllowed;    }  };  public void setDefaultCondtion(boolean tableScanAllowed, boolean multiDbTxAllowed, boolean ddlAndDclAllowed) {    defalutTableScanAllowed = tableScanAllowed;    defalutMultiDbTxAllowed = multiDbTxAllowed;    defalutDdlAndDclAllowed = ddlAndDclAllowed;  }  public boolean isTableScanAllowed() {    return tableScanAllowed.get();  }  public void setTableScanAllowed(boolean tableScanAllowed) {    this.tableScanAllowed.set(tableScanAllowed);  }  public boolean isMultiDbTxAllowed() {    return multiDbTxAllowed.get();  }  public void setMultiDbTxAllowed(boolean multiDbTxAllowed) {    this.multiDbTxAllowed.set(multiDbTxAllowed);  }  public boolean isDdlAndDclAllowed() {    return ddlAndDclAllowed.get();  }  public void setDdlAndDclAllowed(boolean ddlAllowed) {    this.ddlAndDclAllowed.set(ddlAllowed);  }  public SQLType getSqlTypeSnapshot() {    return sqlTypeSnapshot.get();  }  public void checkTableScan(boolean isTableScan) {    if (!isTableScanAllowed())      throw new ConditionCheckException("Don't allow table scan.");  }  public void checkDdlAndDcl(SQLStatement sqlStatement) {    sqlTypeSnapshot.set(sqlStatement.getType());    if (!isDdlAndDclAllowed()        && (sqlStatement.getType().equals(SQLType.DDL) || sqlStatement.getType().equals(SQLType.DCL))) {      throw new ConditionCheckException("Don't allow DDL or DCL.");    }  }  public void checkMultiDbTx(Map<String, Connection> cachedConnections, String newDataSource) {    if (!isMultiDbTxAllowed() && cachedConnections.size() > 0 && !cachedConnections.containsKey(newDataSource)) {      throw new ConditionCheckException("Don't allow multi-db transaction currently.old connection key="          + cachedConnections.keySet() + "new connection key=" + newDataSource);    }  }}

5)路由規則鏈

在我們專案中,對於一張表,在不同場景下可能會使用不同的分庫分表列,例如有的是賬號、有的是客戶號(這兩列都可路由到同一庫表中),這時候就需要路由模組可以依次匹配搭配多個規則,例如SQL中有賬號則用account-rule,有客戶號則用customer-rule,因此我們支援了規則鏈配置功能,但sharding-jdbc只支援配置一個路由規則,因此在自定義路由演算法函式中,我們增加了對規則鏈的支援。實現程式碼片段:

public abstract class ChainedRuleShardingAlgorithm implements ComplexKeysShardingAlgorithm {  protected final DataSource dataSource;  public ChainedRuleShardingAlgorithm(DataSource ds) {    this.dataSource = ds;  }  @Override  public Collection<String> doSharding(Collection availableTargetNames, ComplexKeysShardingValue shardingValue) {    List<String> targets = new ArrayList<String>();    Set<String> actualNames = HintManager.isDatabaseShardingOnly() ? getHintActualName(shardingValue)        : calculateActualNames(shardingValue);    for (String each : actualNames) {      if (availableTargetNames.contains(each)) {        targets.add(each);      }    }    clear();    return targets;  }  @SuppressWarnings({ "serial", "unchecked" })  protected Set<String> calculateActualNames(ComplexKeysShardingValue shardingValue) {    Set<String> target = new HashSet<String>();    Map<String/* table */, Map<String/* column */, Collection/* value */>> shardingMap = new HashMap<String, Map<String, Collection>>();    String logicalTableName = shardingValue.getLogicTableName();    Map<String, Collection> shardingValuesMap = shardingValue.getColumnNameAndShardingValuesMap();    for (final Entry<String, Collection> entry : shardingValuesMap.entrySet()) {      if (shardingMap.containsKey(logicalTableName)) {        shardingMap.get(logicalTableName).put(entry.getKey(), entry.getValue());      } else {        shardingMap.put(logicalTableName, new HashMap<String, Collection>() {          {            put(entry.getKey(), entry.getValue());          }        });      }    }    // 遍歷規則鏈,查詢匹配規則    for (String tableName : shardingMap.keySet()) {      RuleChain ruleChain = dataSource.getTableRuleContext().getRuleChain(tableName);      for (GroovyListRuleEngine engine : getRuleEngine(ruleChain)) {        Set<String> parameters = engine.getParameters();        Map<String, Collection> columnValues = shardingMap.get(tableName);        Set<String> eval = eval(columnValues, parameters, engine, ruleChain);        if (eval.size() > 0) {// 匹配即中止          target.addAll(eval);          return target;        }      }    }    return target;  }  @SuppressWarnings("unchecked")  protected Set<String> eval(final Map<String, Collection> columnValues, Set<String> parameters,      GroovyListRuleEngine engine, RuleChain ruleChain) {    Set<String> targetNames = new HashSet<String>();    if (columnValues.keySet().containsAll(parameters)) {// 匹配      List<Set<Object>> list = new LinkedList<Set<Object>>();// 引數集合      List<String> columns = new LinkedList<String>();// 列名集合      for (final String requireParam : parameters) {        list.add(convertToSet(columnValues.get(requireParam)));        columns.add(requireParam);      }      Set<List<Object>> cartesianProduct = Sets.cartesianProduct(list);      for (List<Object> values : cartesianProduct) {        Map<String, Object> arugmentMap = createArugmentMap(values, columns);        int index = engine.evaluate(arugmentMap);        targetNames.add(getActualName(ruleChain, index));      }    }    return targetNames;  }  private Set<Object> convertToSet(final Collection<Object> values) {    return Sets.newLinkedHashSet(values);  }  private Map<String, Object> createArugmentMap(List<Object> values, List<String> columns) {    HashMap<String, Object> map = new HashMap<String, Object>();    for (int i = 0; i < columns.size(); i++) {      map.put(columns.get(i).toLowerCase(), values.get(i));    }    return map;  }  protected abstract List<GroovyListRuleEngine> getRuleEngine(RuleChain ruleChain);  protected abstract String getActualName(RuleChain ruleChain, int index);}/** * 庫路由演算法 */public class ChainedRuleDbShardingAlgorithm extends ChainedRuleShardingAlgorithm {    public ChainedRuleDbShardingAlgorithm(DataSource ds) {    super(ds);  }  @Override  protected List<GroovyListRuleEngine> getRuleEngine(RuleChain ruleChain) {    return ruleChain.getDbRuleList();  }  @Override  protected String getActualName(RuleChain ruleChain,int index) {    //add mapping from shard metadata    String dbIndex = dataSource.getHintSupport().getShardingDb(String.valueOf(index));    if(StringUtils.isEmpty(dbIndex)){      return ruleChain.getTableRule().getDbIndexArray()[index];    }else{      return ruleChain.getTableRule().getDbIndexArray()[Integer.valueOf(dbIndex)];    }      }}/** * 表路由演算法 */public class ChainedRuleTableShardingAlgorithm extends ChainedRuleShardingAlgorithm {  public ChainedRuleTableShardingAlgorithm(DataSource ds) {    super(ds);  }  @Override  protected List<GroovyListRuleEngine> getRuleEngine(RuleChain ruleChain) {    return ruleChain.getTableRuleList();  }  @Override  protected String getActualName(RuleChain ruleChain, int index) {    //add mapping from shard metadata    String tbShardIndex = dataSource.getHintSupport().getShardingTable(String.valueOf(index));    int tbIndex = index;    if(!StringUtils.isEmpty(tbShardIndex)){      tbIndex = Integer.valueOf(tbShardIndex);    }    SuffixManager suffixManager = ruleChain.getTableRule().getSuffixManager();    if(suffixManager.isInlineExpression()){      return ruleChain.getTbIndexs()[tbIndex];    }else{      Suffix suffix = suffixManager.getSuffix(0);      return String.format("%s%s%0"+suffix.getTbSuffixWidth() +"d", ruleChain.getLogicTable(),suffix.getTbSuffixPadding(), suffix.getTbSuffixFrom() + tbIndex);    }  }

6)管控平臺對接

我們提供了一個管控平臺,支援分散式資料相關元件線上配置,這些通過配置中心統一下發到各應用,而且支援動態變更。不管是SDK模式還是Proxy模式的資料訪問中介軟體都使用的是同一份分庫分表配置,只是接入方式不同而已。因此在整合ss的時候,還需要增加從配置中心獲取配置的功能,這塊主要涉及的呼叫配置中心API獲取配置,這裡就不貼具體程式碼了。

資料訪問中介軟體的發展演進方向,未來其將會是多種形態的混合存在。

二、平滑擴容

在設計篇中已經介紹了擴容的機制,簡單的說,平滑擴容就是通過非同步複製,等資料接近追平後禁寫,修改路由,然後恢復業務。主要目的是自動化、以及儘可能縮短停機視窗,目前一些雲產品比如阿里雲DRDS、騰訊雲TDSQL等的一鍵線上擴容本質上都是基於此機制。

但實踐中這個過程需要多個步驟,資料庫數量越多,操作風險越大,而且需要停機完成。為此我們與資料庫團隊一起設計與開發了平滑擴容功能。

我們將整個擴容環節,分為配置、遷移、校驗、切換、清理五個大的步驟,每個步驟裡又由多個任務構成。擴容任務在管控平臺上建立,平滑擴容模組自動依次觸發各個任務。

平滑擴容

配置環節,主要是應用系統方定義擴容後的分庫分表配置;遷移環節,依次自動完成從複製,同時進行資料校驗;切換環節,首先進行禁寫,斷開主從只從,然後修改路由規則,最後再解除擴容庫禁寫。整個過程應用無需停機,僅僅會有一段時間禁寫,這個時間一般來說也就十來秒;清理環節,清理環節是在後臺非同步處理,即清理資料庫冗餘表。

在分散式資料服務管控平臺定義好擴容前後分庫分表配置後,即可啟動一鍵線上擴容。在擴容過程中可實時監控擴容進度,同時支援擴容中斷恢復以及回滾。

三、資料整合

微服務架構下,有大量需要資料整合的場景:

業務系統之間,例如下訂單後需要通知庫存、商家,然後還要推送到大資料等下游系統;分庫分表後,為了應對其它維度查詢,會需要建立異構索引,這樣就需要資料傳輸到另外一套資料庫中;系統內應用與中介軟體之間,例如如果使用redis等快取,在操作完資料庫還要更新快取,類似這類資料整合需求,最樸素的解決方式就是雙寫,但雙寫一個問題是增加了應用複雜性,另外當發生不一致的情況是難以處理。

這類問題本質上也屬於分散式事務場景,一種簡單的方式就是基於MQ可靠訊息,即在應用端寫訊息表,然後通過MQ消費訊息進行資料整合處理。但這導致應用程式碼耦合大量雙寫邏輯,給應用開發帶來很多複雜度。

針對雙寫問題,業界一種更優雅、先進的設計是基於日誌的整合架構,在OLTP場景下,可以通過解析資料庫日誌類似CDC,這種方式的好處是資料整合工作從應用程式碼中進行了剝離。

關於雙寫以及基於日誌整合架構可參考Using logs to build a solid data infrastructure (or: why dual writes are a bad idea

雙寫以及基於日誌整合架構參考:https://www.confluent.io/blog/using-logs-to-build-a-solid-data-infrastructure-or-why-dual-writes-are-a-bad-idea/

這類CDC的開源軟體,java類的有shyiko、canal、debezium等。這類專案的實現原理是主要模擬從庫從主庫非同步獲取日誌事件,然後經過ETL傳送到MQ或者其它下游系統。

如何模擬一個從庫可參照MySQL複製協議:https://dev.mysql.com/doc/internals/en/replication-protocol.html

考慮功能完整性和社群活躍度,我們選擇了基於canal構建資料整合中介軟體,具體工作原理這裡就不介紹了,可參見canal github。這裡主要介紹下我們對其做的一些定製和優化。

canal github:https://github.com/alibaba/canal

1、線上配置

canal的配置非常繁雜,很容易配錯,所以最先開始做的就是提供了一個更簡單、易用的線上配置定義功能,使用者是隻需進行一些核心關鍵的配置,例如資料庫的IP、使用者、密碼、訂閱表、MQ地址,其它不常修改的配置通過模板形式提供,大大降低了配置複雜度和工作量,當然如果需要也完全支援自定義。

2、效能優化

資料庫binlog是有序的,但如果寫MQ或者目標庫,仍完全保證該順序,那麼則無法進行併發,這樣同步的TPS是肯定上不去的,因此如何在保證一定順序的前提下最大程度提高併發效能是一個需要結合業務場景解決的問題。

我們當時用的是canal1.1.3的版本,經過我們效能測試,資料寫入kafka的TPS也就5000+,這對於結息等大量資料變更的場景是不能滿足要求的。

另外canal寫入MQ的併發維度是表的主鍵,但我們專案中表的主鍵都是自增列(這個是我們專案中資料庫開發規範,主要目的是保證MySQL寫效能),如果根據此列進行併發控制,那麼則無法保證MQ寫入時的業務順序性。例如支付流水錶,如根據主鍵(自增列)則無法保證同一賬戶流水的順序性。

針對此問題,我們對canal進行了改造,將原來只支援根據主鍵進行併發控制,修改為支援應用指定,例如我們專案採用業務唯一鍵;原來流程是順序從canal server端讀取binlog->寫MQ->再確認->再讀取下一binlog事件;調整後改為併發讀取binlog,一旦在執行事件集中沒有當前業務唯一鍵,就可直接寫入MQ,後臺開啟一個執行緒,按照batchID依次進行ack,通過並行拉取binlog事件、分階段無阻塞處理,單庫資料同步kafak的TPS可以達到1.2W+,已可以滿足結息等場景。

3、Serverless化

如果通過安裝包部署,在使用者配置完資料整合相關引數後,需要手工將canal server以及adapter包上傳至伺服器上。考慮到高可用,還得在備機上進行部署,在分庫分表下,資料庫拆分成多個,需要部署多個實列到多個伺服器上(canal支援同個例項部署在統一server節點,但效能會受影響)。因此我們將canal server與adapter進行了容器化改造,然後部署到了統一的k8s叢集中,這樣使用者在配置完後,點選“啟動實列”按鈕,即可在k8s環境中自動部署高可用的canal叢集,從而實現了資料整合功能的serverless化。

通過資料整合中介軟體,可以在應用無侵入下解決分庫分表後一個很典型的問題:多維度拆分與多庫查詢。例如將分庫分表的資料再集中到一個彙總庫,然後一些複雜的查詢統計就可以放在彙總庫上;還有一些多維度拆分場景,類似電商裡的賣家庫、商家庫,需要建立“二級索引”,也可以通過資料中介軟體自動建立;另外也可以方便實現諸如小表廣播等需要保證資料一致性的功能。

四、管控平臺

前面提到的各種中介軟體,涉及到大量配置定義、例項管理、監控等功能,這些功能分散在各元件內部,缺少一個統一的檢視,而且應用開發人員需要重複定義。因此我們設計開發了資料服務管控平臺,將分庫分表配置定義、線上擴容、運維、監控等功能統一整合,最大程度降低開發以及運維人員對資料拆分帶來的複雜度。同時提供開放API,可以對接目前公司已有資料庫以及雲管理系統。

配置資訊統一存放在配置中心,各中介軟體直接從配置中心拉取配置,在管控平臺修改配置後,也可以實時通知各應用進行動態載入。管控平臺相當於一個數據服務雲管理平臺,提供多租戶,各應用無需自行部署,直接接入使用即可。在技術架構方面管控平臺是個前後端分離架構,前端基於vue.js,後端按照功能模組拆分成微服務,都部署在k8s叢集中。

五、 感受

上面介紹了我們在資料服務平臺建設中各技術元件的設計原理和實踐,限於篇幅,更多實現細節就不展開介紹。

在企業軟體這塊,有兩個不同的思路,一個是購買商業產品,一種是基於開源軟體自行構建。在金融領域,早些年以前者為主,近些年後者則變成了趨勢。開源軟體有點是開放,因為有原始碼所以有自主掌控的可能與條件,缺點是不像商業產品功能完整,往往需要自行定製、優化、擴充套件。作為軟體開發人員,我們更喜歡使用白盒而不是黑盒。

當然開源並不代表免費,有可能付出的成本比商業軟體更高。一方面需要投入精力學習開源專案,只有熟悉原始碼後才可能具備修改定製能力;另一方面要積極關注開源界技術的發展,與時俱進,要有開放的心態,吸取開源先進的設計的東西,大膽驗證,謹慎使用。

作者介紹:溫衛斌,就職於中國民生銀行資訊科技部,目前負責分散式技術平臺設計與研發,主要關注分散式資料相關領域。

相關文章

《All in Cloud 時代,下一代雲原生資料庫技術與趨勢》阿里巴巴集團副Quattroporte/達摩院首席資料庫科學家 李飛飛(飛刀)《AI和雲原生時代的資料庫進化之路》騰訊資料庫產品中心總經理 林曉斌(丁奇)《ICBC的MySQL探索之路》工商銀行軟體開發中心 魏亞東《金融行業MySQL高可用實踐》愛可生技術總監 明溪源《民生銀行在SQL稽核方面的探索和實踐》民生銀行 資深資料庫專家 李寧寧《OceanBase分散式資料庫在西安銀行的落地和實踐》螞蟻金服P9資深專家/OceanBase核心負責人 蔣志勇

讓我們9月11日在北京共同眺望資料庫發展變革更長遠的未來!

最新評論
  • BSA-TRITC(10mg/ml) TRITC-BSA 牛血清白蛋白改性標記羅丹明
  • 手把手從頭開始教你,徹底理解服務端渲染原理