首頁>技術>

目錄前言canal架構安裝配置高可用架構控制檯總結(原始碼bug)前言

之前3篇文章老顧都介紹了基於binlog的一些應用場景,今天我們來分享一下Canal元件的具體的使用方法,以及叢集的部署。

Canal架構單應用

上面中我們需要啟動一個Canal Server,負責偽裝mysql的slave訂閱binlog的;我們還需要一個Client監聽Server。Canal最近的版本提供了一個client的實現----Canal Adapter。然後由Client解析後把相關資料同步的DB/ES/Redis。

上面的架構可以在開發環境進行,但不適合生產環境;因為由幾個問題存在。

1)Canal Server和Client 都存在單點問題

2)如果我們流量很大,Canal Server收到的binlog量太大,導致Canal Adapter來不及處理,很有可能會把Client搞崩潰掉。

下面我們會介紹高可用的架構,我們先了解一下Canal基本用法

安裝配置環境

Canal的github的地址https://github.com/alibaba/canal/releases,目前最新版本1.1.4

注意:如果小夥伴們需要adapter client同步的es7,需要使用1.1.5版本才行哦

老顧正在使用的是1.1.5。老顧有特殊需求

java jdk1.8需要提前安裝哦。

安裝canal server

下載canal.deployer-1.1.4.tar.gz

wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.adapter-1.1.4.tar.gz

解壓檔案

tar -zxvf canal.deployer-1.1.4.tar.gz

進入解壓後的資料夾,有5個目錄

 bin         //可執行的指令碼檔案 conf        //配置檔案 lib         //需要用到的核心jar包 logs        //啟動執行的日誌檔案 plugin      //支援的適配外掛

我們需要修改一些配置檔案,進入conf

canal.properties
canal.id = 1 # 每個canal server例項的唯一標識,暫無實際意義canal.ip = 192.111.112.103 # canal server繫結的本地IP資訊,如果不配置,預設選擇一個本機IP進行啟動服務canal.port = 11111 # canal server提供socket服務的埠canal.metrics.pull.port = 11112canal.zkServers = 192.168.1.111:2181 #canal server連結zookeeper叢集的連結資訊,叢集模式需要用到,單機模式可以不配置# flush data to zkcanal.zookeeper.flush.period = 1000 #canal持久化資料到zookeeper上的更新頻率,單位毫秒canal.withoutNetty = false # tcp, kafka, RocketMQcanal.serverMode = tcp #這個非常重要,代表是binlog的變化資訊是直接用tcp方式放送給下游的client,還是先放到MQ中,暫時支援kafka、RocketMQ# flush meta cursor/parse position to filecanal.file.data.dir = ${canal.conf.dir}canal.file.flush.period = 1000## memory store RingBuffer size, should be Math.pow(2,n)canal.instance.memory.buffer.size = 16384## memory store RingBuffer used memory unit size , default 1kbcanal.instance.memory.buffer.memunit = 1024 ## meory store gets mode used MEMSIZE or ITEMSIZEcanal.instance.memory.batch.mode = MEMSIZEcanal.instance.memory.rawEntry = true## detecing configcanal.instance.detecting.enable = false#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()canal.instance.detecting.sql = select 1canal.instance.detecting.interval.time = 3canal.instance.detecting.retry.threshold = 3canal.instance.detecting.heartbeatHaEnable = false# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions deliverycanal.instance.transaction.size =  1024# mysql fallback connected to new master should fallback timescanal.instance.fallbackIntervalInSeconds = 60# network configcanal.instance.network.receiveBufferSize = 16384canal.instance.network.sendBufferSize = 16384canal.instance.network.soTimeout = 30# binlog filter configcanal.instance.filter.druid.ddl = truecanal.instance.filter.query.dcl = falsecanal.instance.filter.query.dml = false #這個也非常重要,true表示不監聽queue查詢命令;只監聽insert、update、delete命令;false即全部監聽canal.instance.filter.query.ddl = falsecanal.instance.filter.table.error = falsecanal.instance.filter.rows = falsecanal.instance.filter.transaction.entry = false# binlog format/image checkcanal.instance.binlog.format = ROW,STATEMENT,MIXED canal.instance.binlog.image = FULL,MINIMAL,NOBLOB# binlog ddl isolationcanal.instance.get.ddl.isolation = false# parallel parser configcanal.instance.parser.parallel = true## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()#canal.instance.parser.parallelThreadSize = 16## disruptor ringbuffer size, must be power of 2canal.instance.parser.parallelBufferSize = 256# table meta tsdb infocanal.instance.tsdb.enable = falsecanal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;canal.instance.tsdb.dbUsername = canalcanal.instance.tsdb.dbPassword = password# dump snapshot interval, default 24 hourcanal.instance.tsdb.snapshot.interval = 24# purge snapshot expire , default 360 hour(15 days)canal.instance.tsdb.snapshot.expire = 360# aliyun ak/sk , support rds/mqcanal.aliyun.accessKey =canal.aliyun.secretKey =##########################################################               destinations            ############# #################################################canal.destinations = example_01,example_02  # 當前server上部署的instance列表,instance很重要的概念,就是需要監聽的表例項。# conf root dircanal.conf.dir = ../conf# auto scan instance dir add/remove and start/stop instancecanal.auto.scan = truecanal.auto.scan.interval = 5#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xmlcanal.instance.global.mode = spring # 全域性配置載入方式canal.instance.global.lazy = false#canal.instance.global.manager.address = 127.0.0.1:1099#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml#canal.instance.global.spring.xml = classpath:spring/file-instance.xmlcanal.instance.global.spring.xml = classpath:spring/default-instance.xml###########################################################                    MQ                      ################################################################如果監聽模式canal.serverMode為 kafka、RocketMq,需要配置以下配置canal.mq.servers = 127.0.0.1:6667canal.mq.retries = 0canal.mq.batchSize = 16384canal.mq.maxRequestSize = 1048576canal.mq.lingerMs = 100canal.mq.bufferMemory = 33554432canal.mq.canalBatchSize = 50canal.mq.canalGetTimeout = 100canal.mq.flatMessage = truecanal.mq.compressionType = nonecanal.mq.acks = all# use transaction for kafka flatMessage batch producecanal.mq.transaction = false#canal.mq.
配置說明

上面核心的配置,需要注意的

canal.serverModecanal.instance.filter.query.dmlcanal.destinationscanal.mq.servers #如果需要同步到MQ

注意:MQ的配置,1.1.5版本會有點區別,分開了kafka和rocketmq;小夥伴們一看就知道

針對例項instance資料位點,儲存配置canal.instance.global.spring.xml

memory-instance.xml: 所有的元件(parser , sink , store)都選擇了記憶體版模式,記錄位點的都選擇了memory模式,重啟後又會回到初始位點進行解析default-instance.xml: store選擇了記憶體模式,其餘的parser/sink依賴的位點管理選擇了持久化模式,目前持久化的方式主要是寫入zookeeper,保證資料叢集共享.group-instance.xml: 主要針對需要進行多庫合併時,可以將多個物理instance合併為一個邏輯instance,提供客戶端訪問多個destination配置在canal.properties裡邊配置canal.destinations , 用英文逗號分隔在conf路徑下建立對應的路徑並新增對應的instance.properties

配置多個destination, 需要在conf下建立對應的目錄

mkdir conf/example_01mkdir conf/example_02

在對應的目錄下邊編寫配置檔案instance.properties

#canal.instance.mysql.slaveId=canal.instance.gtidon=false  #這個在迴環問題存在的時候開啟true,可以看老顧前一篇文章# position infocanal.instance.master.address=    #資料庫連線地址canal.instance.master.journal.name=  #資料庫的binlog檔名canal.instance.master.position=      #資料庫的binlog位點canal.instance.master.timestamp=canal.instance.master.gtid=# rds oss binlogcanal.instance.rds.accesskey=canal.instance.rds.secretkey=canal.instance.rds.instanceId=# table meta tsdb infocanal.instance.tsdb.enable=false# username/passwordcanal.instance.dbUsername=usernamecanal.instance.dbPassword=passwordcanal.instance.defaultDatabaseName=dbNamecanal.instance.connectionCharset = UTF-8# enable druid Decrypt database passwordcanal.instance.enableDruid=false# table regexcanal.instance.filter.regex=.*\\..*# mq configcanal.mq.topic=example# dynamic topic route by schema or table regex#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*canal.mq.partition=0# hash partition config#canal.mq.partitionsNum=3#canal.mq.partitionHash=test.table:id^name,.*
配置說明canal.instance.master.address 資料庫連線地址canal.instance.master.journal.name + canal.instance.master.position : 精確指定一個binlog位點,進行啟動canal.instance.master.timestamp : 指定一個時間戳。如果上面的位點不配置,時間戳生效;canal會自動遍歷mysql binlog,找到對應時間戳的binlog位點後,進行啟動如果時間戳也不配置,即不指定任何資訊:預設從當前資料庫的位點,進行啟動canal.instance.dbUsername 資料庫賬戶canal.instance.dbPassword 資料庫密碼canal.instance.connectionCharset 連線字元型別canal.instance.filter.regex
1.  所有表:.*   or  .*\\..*2.  canal schema下所有表: canal\\..*3.  canal下的以canal打頭的表:canal\\.canal.*4.  canal schema下的一張表:canal.test15.  多個規則組合使用:canal\\..*,mysql.test1,mysql.test2 (逗號分隔)
canal.mq.topic 如果傳送到MQ,那此例項的訊息傳送到哪個主題,也可以支援動態主題

根據canal.mq.dynamicTopic設定

canal.mq.partition 指定分割槽佇列號,預設為0;即所有訊息傳送到0號佇列canal.mq.partitionsNum + canal.mq.partitionHash 如果需要分散到其他佇列,可以提供訊息消費的速度;即可用到這2個配置,partitionsNum佇列數,partitionHash分配規則啟動

進入到路徑bin下邊,有幾個指令碼

canal.pid     # 記錄服務的程序IDrestart.sh    # 重啟服務startup.sh    # 啟動指令碼stop.sh       # 停止服務

執行./startup.sh就可以啟動了

檢視日誌服務啟動日誌(logs/canal/canal.log)例項執行日誌 (logs/example/example.log)canal-adapter的安裝

下載安裝包

wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.adapter-1.1.4.tar.gz

解壓

tar xzvf canal.adapter-1.1.4.tar.gz

修改配置檔案

修改conf/application.yml
server:  port: 8081spring:  jackson:    date-format: yyyy-MM-dd HH:mm:ss    time-zone: GMT+8    default-property-inclusion: non_nullcanal.conf:  mode: tcp #這個就對應server端配置的 同步mode  zookeeperHosts: 192.111.111.173:2181#  mqServers: 127.0.0.1:9092 #or rocketmq#  flatMessage: true  batchSize: 500  syncBatchSize: 1000  retries: 0  timeout:  accessKey:  secretKey:  srcDataSources: #源資料庫地址,可以多個    defaultDS:      url: jdbc:mysql://192.168.1.100:3306/test?useUnicode=true      username: username      password: password    defaultDS2:      url: jdbc:mysql://192.168.1.101:3306/test?useUnicode=true      username: username      password: password  canalAdapters: #可以配置多個例項配置  - instance: example_01    groups:    - groupId: g1      outerAdapters: #輸出介面卡,可以配置多個      - name: logger  #日誌列印介面卡      - name: es    #es同步介面卡        hosts: 192.168.1.110:9300        properties:          cluster.name: okami-application  - instance: example_02    groups:    - groupId: g1      outerAdapters:      - name: logger      - name: es        hosts: 192.168.1.111:9300        properties:          cluster.name: okami-application

在conf/es/路徑下新增配置檔案example_01.yml

vi conf/es/example_01.ymldataSourceKey: defaultDSdestination: example_01groupId: g1esMapping:  _index: indexName  _type: typeName  _id: _id  upsert: true#  pk: id  sql: "select a.id as _id, a.name as _name, a.role_id as _role_id, b.role_name as _role_name,        a.c_time as _c_time from user a        left join role b on b.id=a.role_id"#  objFields:#    _labels: array:;#  etlCondition: "where a.c_time>='{0}'"  commitBatch: 3000

example_02.yml

dataSourceKey: defaultDS2destination: example_02groupId: g1esMapping:  _index: indexName  _type: typeName  _id: _id  upsert: true#  pk: id  sql: "select a.id as _id, a.name as _name, a.role_id as _role_id, b.role_name as _role_name,        a.c_time as _c_time from user a        left join role b on b.id=a.role_id"#  objFields:#    _labels: array:;#  etlCondition: "where a.c_time>='{0}'"  commitBatch: 300
配置說明

一份資料可以被多個group同時消費, 多個group之間會是一個並行執行, 一個group內部是一個序列執行多個outerAdapters

啟動

進入到路徑bin下邊,有幾個指令碼

adapter.pid     # 記錄的程序IDrestart.sh    # 重啟服務startup.sh    # 啟動指令碼stop.sh       # 停止服務
檢視日誌
tail -f logs/adapter/adapter.log 

上面介紹了canal的server和adapter的啟動應用。下面我們看看搞可用架構的思路

高可用架構

canal server的高可用方案是結合了zookeeper,主要原理是:

需求:如果有3臺Canal server,同步10個例項instance的配置。

1)canal server啟動後,會把相關啟動資訊註冊到zk上面。

即可以透過zk知道現在一共有幾個canal server

2)並且會把10個instance 分配給這臺canal server

3)再啟動第2臺canal server,又會註冊到zk上面。發現線上有1臺canal server訂閱了10個instance。第2臺會分擔第1臺的工作,會分配5個instance給自己,留下另外5個給第一臺

2臺canal server均勻分配了instance同步binlog工作

4)再啟動第3臺,流程和第2臺一樣,繼續分擔instance同步工作

最後形成1臺同步4個instance,1臺同步3個instance,1臺同步3個instance

5)如果有一臺canal server掛了;透過zk的協調機制,通知另外2臺canal server;分擔那個掛掉server的instance工作。

這樣就整體實現了canal server高可用了哦

Canal Admin控制檯

我們發現配置canal server的時候,是比較麻煩的;需要配置 application配置,以及各個instance目錄的配置,1臺canal server配置工作還行,如果叢集模式下,多臺canal server都需要配置,那就比較痛苦了。

還好canal最近的版本提供了admin控制檯,可以進行遠端配置

這個可以在官網上面找到如何安裝,還是比較簡單的。小夥伴可以自行去檢視

注意:如果canal server採用 admin 遠端配置,那conf目錄下就用使用 canal_local.perproies這個配置檔案,在啟動canal server時需加上引數local;才會啟動遠端配置

./startup.sh local
總結

今天老顧整體介紹了canal的安裝以及配置,其實這些東西都比較簡單;老顧只是介紹了一些核心的知識點;方便小夥伴們在使用canal的時候,可以快速理解。

其實canal最核心的就是 adapter client的配置,如何結合業務?老顧在使用了時候,為了符合公司的業務,修改了幾處adapter的原始碼【涉及到動態es索引,主鍵id字首,以及只同步每一個命令的業務】;小夥伴可以研究一下adapter原始碼,值得一看。謝謝!!!

注意:canal-server同步資訊到kafka時,會產生異常日誌exception=java.io.IOException: Connection reset by peer 但不影響同步業務;已經同步給官方issue;希望官方儘快解決

---End---

老顧的微服務閘道器分享課程,請大家多多支援

推薦閱讀

大廠如何基於binlog解決多機房同步mysql資料(一)?

大廠如何基於binlog解決多機房同步mysql資料(二)?

可用於大型應用的微服務生態灰度釋出如何實現?

一線大廠級別公共Redis叢集監控,細化到每個專案例項

Sharding-jdbc的實戰入門之水平分表(一)

Sharding-Jdbc之水平分庫和讀寫分離(二)

a、dubbo如何處理業務異常,這個一定要知道哦!

b、企業級SpringBoot應用多個子專案配置檔案規劃、多環境支援(一)

c、企業級SpringBoot應用多個子專案配置檔案規劃、多環境支援(二)

d、企業級SpringBoot應用多個子專案配置檔案之配置中心(三)

e、利用阿里開源工具進行排查線上CPU居高問題

f、阿里二面:如何快速排查死鎖?如何避免死鎖?

g、微服務分散式架構中,如何實現日誌鏈路跟蹤?

h、閘道器如何聚合各個微服務的介面文件?

i、Kubernetes之POD、容器之間的網路通訊

j、K8S中的Service的存在理由

k、企業微服務專案如何進入K8S的全過程

l、阿里開源專案Sentinel限流、降級的統一處理

m、大廠二面:Redis的分散式布隆過濾器是什麼原理?

1基於RocketMq的SpringCloud Stream框架實戰入門

2、如何搭建訊息中介軟體應用框架之SpringCloud Stream

3面試必備:閘道器異常了怎麼辦?如何做全域性異常處理?

4Gateway網關係列(二):SpringCloud Gateway入門實戰,路由規則

5Gateway網關係列開篇:SpringCloud的官方閘道器Gateway介紹

6API閘道器在微服務架構中的應用,這一篇就夠了

7學習Lambda表示式看這篇就夠了,不會讓你失望的哦(續篇)

8Lambda用在哪裡?幾種場景?

9、為什麼會出現Lambda表示式,你知道嗎?

10、不說“分散式事務”理論,直接上大廠阿里的解決方案,絕對實用

11、女程式設計師問到這個問題,讓我思考了半天,Mysql的“三高”架構

12、大廠二面:CAP原則為什麼只能滿足其中兩項?而不能同時滿足

13、阿里P7二面:聊聊零複製的原理

14、秒殺系統的核心點都在這裡,快來取

15、你瞭解如何利用token方式實現分散式Session嗎?

16、Mysql索引結構演變,為什麼最終會是那個結構呢?讓你一看就懂

17、一場比賽涉及到的知識,用通俗易通的方式介紹併發協調

18、企業實戰Redis全方面思考,你思考了嗎?

19、面試題:Thread的start和run的區別

20、面試題:什麼是CAS?CAS的作用以及缺點

21、如何訪問redis中的海量資料?避免事故產生

22、如何解決Redis熱點問題?以及如何發現熱點?

23、如何設計API介面,實現統一格式返回?

24、你真的知道在生產環境下如何部署tomcat嗎?

25、分享一線網際網路大廠分散式唯一ID設計 之 snowflake方案

26、分享大廠分散式唯一ID設計方案,快來圍觀

27、你想了解一線大廠的分散式唯一ID生成方案嗎?

28、你知道如何處理大資料量嗎?(資料拆分篇)

29、如何永不遷移資料和避免熱點? 根據伺服器指標分配資料量(揭秘篇)

30、你知道怎麼分庫分表嗎?如何做到永不遷移資料和避免熱點嗎?

31、你瞭解大型網站的頁面靜態化嗎?

32、你知道如何更新快取嗎?如何保證快取和資料庫雙寫一致性?

33、你知道怎麼解決DB讀寫分離,導致資料不一致問題嗎?

34、DB讀寫分離情況下,如何解決快取和資料庫不一致性問題?

35、你真的知道怎麼使用快取嗎?

36、如何利用鎖,防止快取擊穿?重構思想的重要性

37、海量訂單產生的業務高峰期,如何避免訊息的重複消費?

38、你知道如何保障生產端100%訊息投遞成功嗎?

39、微服務下的分散式session該如何管理?

40、阿里二面:filter、interceptor、aspect應如何選擇?很多人中招

41、網際網路架構重要組員CDN,很多高階開發都沒有實操過,來看這裡

42、阿里二面:CDN快取控制原理,看看能不能難住你

43、SpringCloud Alibaba之Nacos多環境多專案管理

44、SpringCloud Alibaba系列之Nacos配置中心玩法

45、SpringCloud Alibaba之Nacos註冊中心

46、SpringCloud Plus版本之SpringCloud Alibaba

47、SpringCloud Alibaba之Nacos叢集、持久化

48、SpringCloud Alibaba之Nacos共享配置、灰度配置

49、SpringCloud Alibaba之Sentinel工作原理

50、SpringCloud Alibaba之Sentinel流控管理

51、SpringCloud Alibaba之Sentinel降級管理

52、SpringCloud Alibaba之Sentinel熱點引數限流

53、SpringCloud Alibaba之Sentinel的API實戰

8
  • BSA-TRITC(10mg/ml) TRITC-BSA 牛血清白蛋白改性標記羅丹明
  • Lazy Predict:一行程式碼,擬合和評估所有模型