首頁>技術>

一 CDC概述

CDC的全稱是Change Data Capture,翻譯過來就是“變動資料捕獲”。它的核心思想就是,檢測並捕獲資料庫的變動(包括資料的插入、更新和刪除等操作),把這些資料變更按發生的時間順序記錄下來,寫入到訊息中介軟體(Kafka、Pulsar等)供其他應用訂閱、消費。

1.1 CDC的應用場景

1.1.1 資料同步、備份和容災

在MySQL、TiDB、PostgreSQL等資料庫之間互相同步資料,可以透過CDC計算將這些資料同步到Kafka等訊息中介軟體中,然後再透過Flink、Spark、ES等技術消費Kafka中的資料,供資料分析使用。使用這些工具訂閱感興趣的資料表變更,而不需要直接把分析流程接入到業務系統,起到解耦的作用。

也可以透過CDC技術對資料庫進行備份。

1.1.2 資料採集

面向資料湖或者資料倉庫源資料的採集工作。

1.1.3 微服務之間共享資料狀態

CDC可以作為微服務資料之間共享的一種解決方案,可以透過CDC獲取其他微服務資料庫的變更,從而獲取資料狀態的更新,完成自己對應的邏輯。

1.2 常見的開源CDC技術對比

1.2.1 maxwell

maxwell專門用來實時解析MySQL的Binlog日誌,並生成Json格式的訊息。作為生成者將訊息傳送到Kafka、Kinesis、RabbitMQ等訊息佇列中。它的常見應用場景有ETL、維護快取、收集表級別的DML操作。maxwell提供以下功能:

支援SELECT * FROM table的方式進行全量資料初始化。支援在主庫發生failover之後,自動恢復Binlog位置(GTID)。可以偽裝為Slave,接收binlog events,然後根據schemas資訊拼裝,可以接受ddl、xid、row等各種event。

1.2.2 Debezium

Debezium 是一個變更資料捕獲 (CDC) 平臺,它透過重用 Kafka 和 Kafka Connect 來實現其永續性、可靠性和容錯質量。部署到 Kafka Connect 分散式、可擴充套件、容錯服務的每個聯結器監控單個上游資料庫伺服器,捕獲所有更改並將它們記錄在一個或多個 Kafka 主題中(通常每個資料庫表一個主題)。支援監聽 MySQL,MongoDB,PostgreSQL,Oracle,SQL Server 等資料庫的變化。

1.2.3 Canal

主要用途是基於 MySQL 資料庫增量日誌解析,提供增量資料訂閱和消費。在canal1.1.4版本迎來最重要的WebUI能力,引入canal-admin工程,支援面向WebUI的canal動態管理能力,支援配置、任務、日誌等線上白屏運維能力。

1.2.4 Flink CDC

Flink CDC Connectors內部封裝了Debezium特性,可以使用Flink CDC的方式替代canal+kafka的方式,直接透過sql的方式來實現對mysql資料的同步。

二 Flink CDC2.0簡單上手例子

Flink在1.11版本開始引入Flink CDC功能,並且同時支援Table和SQL兩種形式,Flink SQL CDC基於Debezium實現的,能夠對CDC資料進行實時解析同步。當啟動MySQL CDC源時,它將獲取一個全域性讀取鎖(FLUSH TABLES WITH READ LOCK),該鎖將阻止其他資料庫的寫入,然後讀取當前binlog位置以及資料庫和表的schema,之後將釋放全域性讀取鎖。然後它掃描資料庫表並從先前記錄的位置讀取binlog,Flink將定期執行checkpoints以記錄binlog位置。如果發生故障,作業將重新啟動並從checkpoint完成的binlog位置恢復,因此它保證了僅一次的語義。

Dynamic Table是Flink內部定義的表,它和流式可以相互轉化的。可以簡單的理解為:每張MySQL表對應一個Binlog日誌,Binlog日誌隨著MySQL表的變化而變化,Dynamic Table相當於Binlog日誌流在某一時刻的物化結果。在Flink中,資料從一個運算元流向另外一個運算元的時候,都是以Changelog Stream的格式傳送到下游,在下游我們可以將其翻譯成一張表或者一條流進行操作。

2.1 社群支援的資料庫連線

2.2 CDC和Flink對應版本介紹

2.3 FlinkCDC2.0的maven配置

我們使用的是Flink1.13.0,因此選用FlinkCDC2.0

<dependency>  <groupId>com.ververica</groupId>  <artifactId>flink-connector-mysql-cdc</artifactId>  <version>2.0.0</version></dependency>

2.4 MySQL資料來源的準備工作

2.4.1 新建MySQL的DataBase

2.4.2 建立使用者

我們知道MySQL使用者的密碼長度是由validate_password_length決定的,而validate_password_length的計算公式是:

validate_password_length = validate_password_number_count + validate_password_special_char_count + (2 * validate_password_mixed_case_count)

因為這是開發測試環境,我們設定validate_password_policy=0(僅僅對密碼長度開啟認證);另外,再設定validate_password_length=1(密碼長度只要大於等於1即可)。

set global validate_password_policy=0;set global validate_password_length=1;CREATE USER 'flinkuser' IDENTIFIED BY 'flinkpwd';GRANT SELECT,INSERT,UPDATE,DELETE,CREATE,DROP,RELOAD,SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'flinkuser' IDENTIFIED BY 'flinkpwd';FLUSH PRIVILEGES;

2.4.3 開啟MySQL資料庫的binlog功能

編輯/etc/my.conf,開啟flinkcdc這個Database的flinkcdc功能。

[bigdata@bigdata12 ~]$ sudo vi /etc/my.cnf# 開啟flinkcdc這個Database的flinkcdc功能binlog-do-db=flinkcdc

開啟binlog之後,需要重啟MySQL資料庫

# 重啟MySQL服務[bigdata@bigdata12 ~]$ sudo systemctl restart mysqld.service 

檢視當前的最新binlog資料

建立表

create table dept(    deptno      int primary key,    dname       varchar(20),    loc         varchar(30));

再次檢視binlog日誌大小,說明binlog日誌生效

2.5 完整的maven配置
    <properties>        <flink-version>1.13.0</flink-version>    </properties>		<dependencies>        <dependency>            <groupId>org.apache.flink</groupId>            <artifactId>flink-java</artifactId>            <version>${flink-version}</version>        </dependency>        <dependency>            <groupId>org.apache.flink</groupId>            <artifactId>flink-streaming-java_2.12</artifactId>            <version>${flink-version}</version>        </dependency>        <dependency>            <groupId>org.apache.flink</groupId>            <artifactId>flink-clients_2.12</artifactId>            <version>${flink-version}</version>        </dependency>        <dependency>            <groupId>mysql</groupId>            <artifactId>mysql-connector-java</artifactId>            <version>5.1.44</version>        </dependency>        <dependency>            <groupId>org.apache.flink</groupId>            <artifactId>flink-table-planner-blink_2.12</artifactId>            <version>${flink-version}</version>        </dependency>        <dependency>            <groupId>com.ververica</groupId>            <artifactId>flink-connector-mysql-cdc</artifactId>            <version>2.0.0</version>        </dependency>        <dependency>            <groupId>com.alibaba</groupId>            <artifactId>fastjson</artifactId>            <version>1.2.75</version>        </dependency>    </dependencies>

2.6 Flink 流式程式碼
public class MySqlBinlogFlinkCDCStream {    public static void main(String[] args) throws Exception {        // 1 透過FlinkCDC構建sourceDatabase        DebeziumSourceFunction<String> sourceDatabase = MySqlSource.<String>builder()                .hostname("bigdata12")                .port(3306)                // 需要監控的database                .databaseList("flinkcdc")                .username("flinkuser")                .password("flinkpwd")                // 反序列化                .deserializer(new StringDebeziumDeserializationSchema())                .startupOptions(StartupOptions.initial())                .build();        // 2 建立執行環境        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        env.setParallelism(1);        env.enableCheckpointing(50_000);        DataStreamSource<String> dataStreamSource = env.addSource(sourceDatabase);        // 3 列印資料        dataStreamSource.print();        // 4 啟動任務        env.execute();    }}

2.6.1 閱讀位置(Position)設定

對於消費MySQL的CDC程式而言,StartupOptions有兩種選擇:

initial():預設方式,從snapshot開始初始化,一直讀取到最新的binlog日誌。latest():程式在啟動時,僅僅從最新的binlog日誌開始讀取資料。

2.6.2 ExactlyOnce處理

Flink CDC首先從Snapshot開始讀取解析,一直解析到binlog日誌最新位置,因此能夠提供ExactlyOnce支援。

2.6.3 資料流解析
-- 1 插入兩條資料INSERT INTO dept VALUES (10,'ACCOUNTING','NEW YORK'); INSERT INTO dept VALUES (20,'RESEARCH','DALLAS'); -- 2 更新deptno=10的資料UPDATE dept SET loc='BEIJING' WHERE deptno=10;-- 3 刪除deptno=20的資料DELETE FROM dept WHERE deptno=20;

2.6.3.1 INSERT結構

op的c表示是insert操作識別符號,此時有after,沒有before。

value = Struct {		after = Struct {			deptno = 10, dname = ACCOUNTING, loc = NEW YORK		}, source = Struct {			ts_ms = 1630733740000, db = flinkcdc, table = dept, pos = 1209, row = 0		}, op = c, ts_ms = 1630733715411	}

2.6.3.2 UPDATE結構

op的u表示是update操作識別符號,此時既有before又有after。

value = Struct {		before = Struct {			deptno = 10, dname = ACCOUNTING, loc = NEW YORK		}, after = Struct {			deptno = 10, dname = ACCOUNTING, loc = BEIJING		}, source = Struct {			ts_ms = 1630733763000, db = flinkcdc, table = dept, pos = 1783, row = 0		}, op = u, ts_ms = 1630733738618	}

2.6.3.3 DELETE結構

op的d表示是delete操作識別符號,此時只有before,沒有after。

value = Struct {		before = Struct {			deptno = 20, dname = RESEARCH, loc = DALLAS		}, source = Struct {			ts_ms = 1630733777000, db = flinkcdc, table = dept, pos = 2097, row = 0		}, op = d, ts_ms = 1630733752100	}

2.7 欄位型別介紹

MySQL欄位型別

FlinkSQL欄位型別

TINYINT

TINYINT

SMALLINTTINYINT UNSIGNED

SMALLINT

INTMEDIUMINTSMALLINT UNSIGNED

INT

BIGINTINT UNSIGNED

BIGINT

BIGINT UNSIGNED

DECIMAL(20, 0)

BIGINT

BIGINT

FLOAT

FLOAT

DOUBLEDOUBLE PRECISION

DOUBLE

NUMERIC(p, s)DECIMAL(p, s)

DECIMAL(p, s)

BOOLEANTINYINT(1)

BOOLEAN

DATE

DATE

TIME [(p)]

TIME [(p)] [WITHOUT TIMEZONE]

DATETIME [(p)]

TIMESTAMP [(p)] [WITHOUT TIMEZONE]

TIMESTAMP [(p)]

TIMESTAMP [(p)]TIMESTAMP [(p)] WITH LOCAL TIME ZONE

CHAR(n)VARCHAR(n)TEXT

STRING

BINARYVARBINARYBLOB

BYTES

2.8 Flink SQL版本原始碼
public class MySqlBinlogFlinkCDCSQL {    public static void main(String[] args) throws Exception {        // 1 建立Flink的執行環境        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();        env.setParallelism(1);        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);        // 2 使用FlinkSQL DDL方式建立CDC表        tableEnv.executeSql("CREATE TABLE dept(" +            "deptno INT primary key," +            "dname STRING," +            "loc STRING" +            ") WITH (" +            "'connector' = 'mysql-cdc'," +            "'hostname' = 'bigdata12'," +            "'port' = '3306'," +            "'username' = 'flinkuser'," +            "'password' = 'flinkpwd'," +            "'database-name' = 'flinkcdc'," +            "'table-name' = 'dept'"+            ")");        // 3 查詢資料並轉換為流輸出        Table table = tableEnv.sqlQuery("SELECT * FROM dept");        DataStream<Tuple2<Boolean, Row>> deptStream = tableEnv.toRetractStream(table, Row.class);        deptStream.print();        // 4 啟動Flink程式        env.execute();    }}

三 FlinkSQL和Flink DataStream在CDC2.0方面的對比

3.1 FlinkSQLFlinkSQL只能在Flink1.13版本使用。只能監控單個表的變更。反序列化功能已經自動完成,可以非常方便轉換為JavaBean使用,或者直接透過SQL訪問。

3.2 Flink DataStreamFlink DataStream可以在1.12和1.13兩個版本使用。可以同時監控多庫(database)、多表(table)的變更。預設的反序列化器StringDebeziumDeserializationSchema使用起來不是特別方便,我們通常需要自定義反序列化器。

四 總結

這篇文章介紹瞭如何開啟和使用Flink-CDC2.0,並附有測試透過的原始碼,歡迎大家評論、轉發。

10
最新評論
  • BSA-TRITC(10mg/ml) TRITC-BSA 牛血清白蛋白改性標記羅丹明
  • python3 mysql簡單封裝