一 CDC概述
CDC的全稱是Change Data Capture,翻譯過來就是“變動資料捕獲”。它的核心思想就是,檢測並捕獲資料庫的變動(包括資料的插入、更新和刪除等操作),把這些資料變更按發生的時間順序記錄下來,寫入到訊息中介軟體(Kafka、Pulsar等)供其他應用訂閱、消費。
1.1 CDC的應用場景1.1.1 資料同步、備份和容災
在MySQL、TiDB、PostgreSQL等資料庫之間互相同步資料,可以透過CDC計算將這些資料同步到Kafka等訊息中介軟體中,然後再透過Flink、Spark、ES等技術消費Kafka中的資料,供資料分析使用。使用這些工具訂閱感興趣的資料表變更,而不需要直接把分析流程接入到業務系統,起到解耦的作用。
也可以透過CDC技術對資料庫進行備份。
1.1.2 資料採集
面向資料湖或者資料倉庫源資料的採集工作。
1.1.3 微服務之間共享資料狀態
CDC可以作為微服務資料之間共享的一種解決方案,可以透過CDC獲取其他微服務資料庫的變更,從而獲取資料狀態的更新,完成自己對應的邏輯。
1.2 常見的開源CDC技術對比1.2.1 maxwell
maxwell專門用來實時解析MySQL的Binlog日誌,並生成Json格式的訊息。作為生成者將訊息傳送到Kafka、Kinesis、RabbitMQ等訊息佇列中。它的常見應用場景有ETL、維護快取、收集表級別的DML操作。maxwell提供以下功能:
支援SELECT * FROM table的方式進行全量資料初始化。支援在主庫發生failover之後,自動恢復Binlog位置(GTID)。可以偽裝為Slave,接收binlog events,然後根據schemas資訊拼裝,可以接受ddl、xid、row等各種event。1.2.2 Debezium
Debezium 是一個變更資料捕獲 (CDC) 平臺,它透過重用 Kafka 和 Kafka Connect 來實現其永續性、可靠性和容錯質量。部署到 Kafka Connect 分散式、可擴充套件、容錯服務的每個聯結器監控單個上游資料庫伺服器,捕獲所有更改並將它們記錄在一個或多個 Kafka 主題中(通常每個資料庫表一個主題)。支援監聽 MySQL,MongoDB,PostgreSQL,Oracle,SQL Server 等資料庫的變化。
1.2.3 Canal
主要用途是基於 MySQL 資料庫增量日誌解析,提供增量資料訂閱和消費。在canal1.1.4版本迎來最重要的WebUI能力,引入canal-admin工程,支援面向WebUI的canal動態管理能力,支援配置、任務、日誌等線上白屏運維能力。
1.2.4 Flink CDC
Flink CDC Connectors內部封裝了Debezium特性,可以使用Flink CDC的方式替代canal+kafka的方式,直接透過sql的方式來實現對mysql資料的同步。
二 Flink CDC2.0簡單上手例子
Flink在1.11版本開始引入Flink CDC功能,並且同時支援Table和SQL兩種形式,Flink SQL CDC基於Debezium實現的,能夠對CDC資料進行實時解析同步。當啟動MySQL CDC源時,它將獲取一個全域性讀取鎖(FLUSH TABLES WITH READ LOCK),該鎖將阻止其他資料庫的寫入,然後讀取當前binlog位置以及資料庫和表的schema,之後將釋放全域性讀取鎖。然後它掃描資料庫表並從先前記錄的位置讀取binlog,Flink將定期執行checkpoints以記錄binlog位置。如果發生故障,作業將重新啟動並從checkpoint完成的binlog位置恢復,因此它保證了僅一次的語義。
Dynamic Table是Flink內部定義的表,它和流式可以相互轉化的。可以簡單的理解為:每張MySQL表對應一個Binlog日誌,Binlog日誌隨著MySQL表的變化而變化,Dynamic Table相當於Binlog日誌流在某一時刻的物化結果。在Flink中,資料從一個運算元流向另外一個運算元的時候,都是以Changelog Stream的格式傳送到下游,在下游我們可以將其翻譯成一張表或者一條流進行操作。
2.1 社群支援的資料庫連線2.2 CDC和Flink對應版本介紹2.3 FlinkCDC2.0的maven配置
2.3 FlinkCDC2.0的maven配置
我們使用的是Flink1.13.0,因此選用FlinkCDC2.0
<dependency> <groupId>com.ververica</groupId> <artifactId>flink-connector-mysql-cdc</artifactId> <version>2.0.0</version></dependency>
2.4 MySQL資料來源的準備工作2.4.1 新建MySQL的DataBase2.4.2 建立使用者
2.4.2 建立使用者
我們知道MySQL使用者的密碼長度是由validate_password_length決定的,而validate_password_length的計算公式是:
validate_password_length = validate_password_number_count + validate_password_special_char_count + (2 * validate_password_mixed_case_count)
因為這是開發測試環境,我們設定validate_password_policy=0(僅僅對密碼長度開啟認證);另外,再設定validate_password_length=1(密碼長度只要大於等於1即可)。
set global validate_password_policy=0;set global validate_password_length=1;CREATE USER 'flinkuser' IDENTIFIED BY 'flinkpwd';GRANT SELECT,INSERT,UPDATE,DELETE,CREATE,DROP,RELOAD,SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'flinkuser' IDENTIFIED BY 'flinkpwd';FLUSH PRIVILEGES;
2.4.3 開啟MySQL資料庫的binlog功能
編輯/etc/my.conf,開啟flinkcdc這個Database的flinkcdc功能。
[bigdata@bigdata12 ~]$ sudo vi /etc/my.cnf# 開啟flinkcdc這個Database的flinkcdc功能binlog-do-db=flinkcdc
開啟binlog之後,需要重啟MySQL資料庫
# 重啟MySQL服務[bigdata@bigdata12 ~]$ sudo systemctl restart mysqld.service
檢視當前的最新binlog資料
建立表
create table dept( deptno int primary key, dname varchar(20), loc varchar(30));
再次檢視binlog日誌大小,說明binlog日誌生效
2.5 完整的maven配置
<properties> <flink-version>1.13.0</flink-version> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink-version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.12</artifactId> <version>${flink-version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.12</artifactId> <version>${flink-version}</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.44</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_2.12</artifactId> <version>${flink-version}</version> </dependency> <dependency> <groupId>com.ververica</groupId> <artifactId>flink-connector-mysql-cdc</artifactId> <version>2.0.0</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.75</version> </dependency> </dependencies>
2.6 Flink 流式程式碼public class MySqlBinlogFlinkCDCStream { public static void main(String[] args) throws Exception { // 1 透過FlinkCDC構建sourceDatabase DebeziumSourceFunction<String> sourceDatabase = MySqlSource.<String>builder() .hostname("bigdata12") .port(3306) // 需要監控的database .databaseList("flinkcdc") .username("flinkuser") .password("flinkpwd") // 反序列化 .deserializer(new StringDebeziumDeserializationSchema()) .startupOptions(StartupOptions.initial()) .build(); // 2 建立執行環境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.enableCheckpointing(50_000); DataStreamSource<String> dataStreamSource = env.addSource(sourceDatabase); // 3 列印資料 dataStreamSource.print(); // 4 啟動任務 env.execute(); }}
2.6.1 閱讀位置(Position)設定
<properties> <flink-version>1.13.0</flink-version> </properties> <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-java</artifactId> <version>${flink-version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java_2.12</artifactId> <version>${flink-version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients_2.12</artifactId> <version>${flink-version}</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.44</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-blink_2.12</artifactId> <version>${flink-version}</version> </dependency> <dependency> <groupId>com.ververica</groupId> <artifactId>flink-connector-mysql-cdc</artifactId> <version>2.0.0</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.75</version> </dependency> </dependencies>
public class MySqlBinlogFlinkCDCStream { public static void main(String[] args) throws Exception { // 1 透過FlinkCDC構建sourceDatabase DebeziumSourceFunction<String> sourceDatabase = MySqlSource.<String>builder() .hostname("bigdata12") .port(3306) // 需要監控的database .databaseList("flinkcdc") .username("flinkuser") .password("flinkpwd") // 反序列化 .deserializer(new StringDebeziumDeserializationSchema()) .startupOptions(StartupOptions.initial()) .build(); // 2 建立執行環境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); env.enableCheckpointing(50_000); DataStreamSource<String> dataStreamSource = env.addSource(sourceDatabase); // 3 列印資料 dataStreamSource.print(); // 4 啟動任務 env.execute(); }}
2.6.1 閱讀位置(Position)設定
對於消費MySQL的CDC程式而言,StartupOptions有兩種選擇:
initial():預設方式,從snapshot開始初始化,一直讀取到最新的binlog日誌。latest():程式在啟動時,僅僅從最新的binlog日誌開始讀取資料。2.6.2 ExactlyOnce處理
Flink CDC首先從Snapshot開始讀取解析,一直解析到binlog日誌最新位置,因此能夠提供ExactlyOnce支援。
2.6.3 資料流解析
-- 1 插入兩條資料INSERT INTO dept VALUES (10,'ACCOUNTING','NEW YORK'); INSERT INTO dept VALUES (20,'RESEARCH','DALLAS'); -- 2 更新deptno=10的資料UPDATE dept SET loc='BEIJING' WHERE deptno=10;-- 3 刪除deptno=20的資料DELETE FROM dept WHERE deptno=20;
2.6.3.1 INSERT結構
-- 1 插入兩條資料INSERT INTO dept VALUES (10,'ACCOUNTING','NEW YORK'); INSERT INTO dept VALUES (20,'RESEARCH','DALLAS'); -- 2 更新deptno=10的資料UPDATE dept SET loc='BEIJING' WHERE deptno=10;-- 3 刪除deptno=20的資料DELETE FROM dept WHERE deptno=20;
op的c表示是insert操作識別符號,此時有after,沒有before。
value = Struct { after = Struct { deptno = 10, dname = ACCOUNTING, loc = NEW YORK }, source = Struct { ts_ms = 1630733740000, db = flinkcdc, table = dept, pos = 1209, row = 0 }, op = c, ts_ms = 1630733715411 }
2.6.3.2 UPDATE結構
op的u表示是update操作識別符號,此時既有before又有after。
value = Struct { before = Struct { deptno = 10, dname = ACCOUNTING, loc = NEW YORK }, after = Struct { deptno = 10, dname = ACCOUNTING, loc = BEIJING }, source = Struct { ts_ms = 1630733763000, db = flinkcdc, table = dept, pos = 1783, row = 0 }, op = u, ts_ms = 1630733738618 }
2.6.3.3 DELETE結構
op的d表示是delete操作識別符號,此時只有before,沒有after。
value = Struct { before = Struct { deptno = 20, dname = RESEARCH, loc = DALLAS }, source = Struct { ts_ms = 1630733777000, db = flinkcdc, table = dept, pos = 2097, row = 0 }, op = d, ts_ms = 1630733752100 }
2.7 欄位型別介紹
MySQL欄位型別 |
FlinkSQL欄位型別 |
TINYINT |
TINYINT |
SMALLINTTINYINT UNSIGNED |
SMALLINT |
INTMEDIUMINTSMALLINT UNSIGNED | INT |
BIGINTINT UNSIGNED |
BIGINT |
BIGINT UNSIGNED |
DECIMAL(20, 0) |
BIGINT |
BIGINT |
FLOAT |
FLOAT |
DOUBLEDOUBLE PRECISION |
DOUBLE |
NUMERIC(p, s)DECIMAL(p, s) |
DECIMAL(p, s) |
BOOLEANTINYINT(1) |
BOOLEAN |
DATE |
DATE |
TIME [(p)] |
TIME [(p)] [WITHOUT TIMEZONE] |
DATETIME [(p)] |
TIMESTAMP [(p)] [WITHOUT TIMEZONE] |
TIMESTAMP [(p)] |
TIMESTAMP [(p)]TIMESTAMP [(p)] WITH LOCAL TIME ZONE |
CHAR(n)VARCHAR(n)TEXT |
STRING |
BINARYVARBINARYBLOB |
BYTES |
2.8 Flink SQL版本原始碼public class MySqlBinlogFlinkCDCSQL { public static void main(String[] args) throws Exception { // 1 建立Flink的執行環境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // 2 使用FlinkSQL DDL方式建立CDC表 tableEnv.executeSql("CREATE TABLE dept(" + "deptno INT primary key," + "dname STRING," + "loc STRING" + ") WITH (" + "'connector' = 'mysql-cdc'," + "'hostname' = 'bigdata12'," + "'port' = '3306'," + "'username' = 'flinkuser'," + "'password' = 'flinkpwd'," + "'database-name' = 'flinkcdc'," + "'table-name' = 'dept'"+ ")"); // 3 查詢資料並轉換為流輸出 Table table = tableEnv.sqlQuery("SELECT * FROM dept"); DataStream<Tuple2<Boolean, Row>> deptStream = tableEnv.toRetractStream(table, Row.class); deptStream.print(); // 4 啟動Flink程式 env.execute(); }}
三 FlinkSQL和Flink DataStream在CDC2.0方面的對比3.1 FlinkSQLFlinkSQL只能在Flink1.13版本使用。只能監控單個表的變更。反序列化功能已經自動完成,可以非常方便轉換為JavaBean使用,或者直接透過SQL訪問。3.2 Flink DataStreamFlink DataStream可以在1.12和1.13兩個版本使用。可以同時監控多庫(database)、多表(table)的變更。預設的反序列化器StringDebeziumDeserializationSchema使用起來不是特別方便,我們通常需要自定義反序列化器。四 總結
public class MySqlBinlogFlinkCDCSQL { public static void main(String[] args) throws Exception { // 1 建立Flink的執行環境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // 2 使用FlinkSQL DDL方式建立CDC表 tableEnv.executeSql("CREATE TABLE dept(" + "deptno INT primary key," + "dname STRING," + "loc STRING" + ") WITH (" + "'connector' = 'mysql-cdc'," + "'hostname' = 'bigdata12'," + "'port' = '3306'," + "'username' = 'flinkuser'," + "'password' = 'flinkpwd'," + "'database-name' = 'flinkcdc'," + "'table-name' = 'dept'"+ ")"); // 3 查詢資料並轉換為流輸出 Table table = tableEnv.sqlQuery("SELECT * FROM dept"); DataStream<Tuple2<Boolean, Row>> deptStream = tableEnv.toRetractStream(table, Row.class); deptStream.print(); // 4 啟動Flink程式 env.execute(); }}
3.1 FlinkSQLFlinkSQL只能在Flink1.13版本使用。只能監控單個表的變更。反序列化功能已經自動完成,可以非常方便轉換為JavaBean使用,或者直接透過SQL訪問。3.2 Flink DataStreamFlink DataStream可以在1.12和1.13兩個版本使用。可以同時監控多庫(database)、多表(table)的變更。預設的反序列化器StringDebeziumDeserializationSchema使用起來不是特別方便,我們通常需要自定義反序列化器。四 總結
四 總結
這篇文章介紹瞭如何開啟和使用Flink-CDC2.0,並附有測試透過的原始碼,歡迎大家評論、轉發。