Canal的介紹Canal的歷史由來在早期的時候,阿里巴巴公司因為杭州和美國兩個地方的機(jī)房都部署了數(shù)據(jù)庫實例,但因為跨機(jī)房同步數(shù)據(jù)的業(yè)務(wù)需求 ,,便孕育而生出了Canal,,主要是基于trigger(觸發(fā)器)的方式獲取增量變更,。從2010年開始,,阿里巴巴公司開始逐步嘗試數(shù)據(jù)庫日志解析,,獲取增量變更的數(shù)據(jù)進(jìn)行同步,由此衍生出了增量訂閱和消費業(yè)務(wù),。 Canal的應(yīng)用場景目前普遍基于日志增量訂閱和消費的業(yè)務(wù),,主要包括:
Canal的工作原理在介紹Canal的原理之前,,我們先來了解下MySQL主從復(fù)制的原理,。 MySQL主從復(fù)制原理
Canal工作原理
Canal的Docker環(huán)境準(zhǔn)備因為目前容器化技術(shù)的火熱,本文通過使用Docker來快速搭建開發(fā)環(huán)境,,而傳統(tǒng)方式的環(huán)境搭建,,在我們學(xué)會了Docker容器環(huán)境搭建后,也能自行依葫蘆畫瓢搭建成功,。由于本篇主要講解Canal,,所以關(guān)于Docker的內(nèi)容不會涉及太多,主要會介紹Docker的基本概念和命令使用,。如果你想和更多容器技術(shù)專家交流,,可以加我微信liyingjiese,,備注『加群』。群里每周都有全球各大公司的最佳實踐以及行業(yè)最新動態(tài),。 什么是Docker
相信絕大多數(shù)人都使用過虛擬機(jī)VMware,,在使用VMware進(jìn)行環(huán)境搭建的時候,只需提供了一個普通的系統(tǒng)鏡像并成功安裝,,剩下的軟件環(huán)境與應(yīng)用配置還是如我們在本機(jī)操作一樣在虛擬機(jī)里也操作一遍,,而且VMware占用宿主機(jī)的資源較多,容易造成宿主機(jī)卡頓,,而且系統(tǒng)鏡像本身也占用過多空間,。
Docker的網(wǎng)絡(luò)介紹Docker的網(wǎng)絡(luò)類型有三種:
docker network create --subnet=172.18.0.0/16 mynetwork
搭建Canal環(huán)境
附上Docker的下載安裝地址==> Docker Download。
##生成mysql容器 docker run -d --name mysql --net mynetwork --ip 172.18.0.6 -p 3306:3306 -e MYSQL_ROOT_PASSWORD=root mysql ##生成canal-server容器 docker run -d --name canal-server --net mynetwork --ip 172.18.0.4 -p 11111:11111 canal/canal-server ## 命令介紹 --net mynetwork #使用自定義網(wǎng)絡(luò) --ip #指定分配ip
MySQL的配置修改
以上只是初步準(zhǔn)備好了基礎(chǔ)的環(huán)境,,但是怎么讓Canal偽裝成Salve并正確獲取MySQL中的binary log呢? [mysqld] log-bin=mysql-bin # 開啟binlog binlog-format=ROW # 選擇ROW模式 server_id=1 # 配置MySQL replaction需要定義,不要和Canal的slaveId重復(fù)
mysql -uroot -proot # 創(chuàng)建賬號 CREATE USER canal IDENTIFIED BY 'canal'; # 授予權(quán)限 GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ; # 刷新并應(yīng)用 FLUSH PRIVILEGES;
show variables like 'log_bin'; show variables like 'log_bin'; show master status;
canal-server的配置修改
進(jìn)入canal-server容器docker exec -it canal-server bash,。
docker exec -it canal-server bash tail -100f canal-server/logs/example/example.log
拉取數(shù)據(jù)并同步保存到ElasticSearch本文的ElasticSearch也是基于Docker環(huán)境搭建,,所以讀者可執(zhí)行如下命令: # 下載對鏡像 docker pull elasticsearch:7.1.1 docker pull mobz/elasticsearch-head:5-alpine # 創(chuàng)建容器并運行 docker run -d --name elasticsearch --net mynetwork --ip 172.18.0.2 -p 9200:9200 -p 9300:9300 -e "discovery.type=single-node" elasticsearch:7.1.1 docker run -d --name elasticsearch-head --net mynetwork --ip 172.18.0.5 -p 9100:9100 mobz/elasticsearch-head:5-alpine
Student.javapackage com.example.canal.study.pojo; import lombok.Data; import java.io.Serializable; // @Data 用戶生產(chǎn)getter,、setter方法 @Data public class Student implements Serializable { private String id; private String name; private int age; private String sex; private String city; }
CanalConfig.javapackage com.example.canal.study.common; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.client.CanalConnectors; import org.apache.http.HttpHost; import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestHighLevelClient; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.net.InetSocketAddress; /** * @author haha */ @Configuration public class CanalConfig { // @Value 獲取 application.properties配置中端內(nèi)容 @Value("${canal.server.ip}") private String canalIp; @Value("${canal.server.port}") private Integer canalPort; @Value("${canal.destination}") private String destination; @Value("${elasticSearch.server.ip}") private String elasticSearchIp; @Value("${elasticSearch.server.port}") private Integer elasticSearchPort; @Value("${zookeeper.server.ip}") private String zkServerIp; // 獲取簡單canal-server連接 @Bean public CanalConnector canalSimpleConnector() { CanalConnector canalConnector = CanalConnectors.newSingleConnector(new InetSocketAddress(canalIp, canalPort), destination, "", ""); return canalConnector; } // 通過連接zookeeper獲取canal-server連接 @Bean public CanalConnector canalHaConnector() { CanalConnector canalConnector = CanalConnectors.newClusterConnector(zkServerIp, destination, "", ""); return canalConnector; } // elasticsearch 7.x客戶端 @Bean public RestHighLevelClient restHighLevelClient() { RestHighLevelClient client = new RestHighLevelClient( RestClient.builder(new HttpHost(elasticSearchIp, elasticSearchPort)) ); return client; } }
CanalDataParser.java由于這個類的代碼較多,,文中則摘出其中比較重要的部分,其它部分代碼可從GitHub上獲?。?/p> public static class TwoTuple<A, B> { public final A eventType; public final B columnMap; public TwoTuple(A a, B b) { eventType = a; columnMap = b; } } public static List<TwoTuple<EventType, Map>> printEntry(List<Entry> entrys) { List<TwoTuple<EventType, Map>> rows = new ArrayList<>(); for (Entry entry : entrys) { // binlog event的事件事件 long executeTime = entry.getHeader().getExecuteTime(); // 當(dāng)前應(yīng)用獲取到該binlog鎖延遲的時間 long delayTime = System.currentTimeMillis() - executeTime; Date date = new Date(entry.getHeader().getExecuteTime()); SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); // 當(dāng)前的entry(binary log event)的條目類型屬于事務(wù) if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) { if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN) { TransactionBegin begin = null; try { begin = TransactionBegin.parseFrom(entry.getStoreValue()); } catch (InvalidProtocolBufferException e) { throw new RuntimeException("parse event has an error , data:" + entry.toString(), e); } // 打印事務(wù)頭信息,,執(zhí)行的線程id,事務(wù)耗時 logger.info(transaction_format, new Object[]{entry.getHeader().getLogfileName(), String.valueOf(entry.getHeader().getLogfileOffset()), String.valueOf(entry.getHeader().getExecuteTime()), simpleDateFormat.format(date), entry.getHeader().getGtid(), String.valueOf(delayTime)}); logger.info(" BEGIN ----> Thread id: {}", begin.getThreadId()); printXAInfo(begin.getPropsList()); } else if (entry.getEntryType() == EntryType.TRANSACTIONEND) { TransactionEnd end = null; try { end = TransactionEnd.parseFrom(entry.getStoreValue()); } catch (InvalidProtocolBufferException e) { throw new RuntimeException("parse event has an error , data:" + entry.toString(), e); } // 打印事務(wù)提交信息,,事務(wù)id logger.info("----------------\n"); logger.info(" END ----> transaction id: {}", end.getTransactionId()); printXAInfo(end.getPropsList()); logger.info(transaction_format, new Object[]{entry.getHeader().getLogfileName(), String.valueOf(entry.getHeader().getLogfileOffset()), String.valueOf(entry.getHeader().getExecuteTime()), simpleDateFormat.format(date), entry.getHeader().getGtid(), String.valueOf(delayTime)}); } continue; } // 當(dāng)前entry(binary log event)的條目類型屬于原始數(shù)據(jù) if (entry.getEntryType() == EntryType.ROWDATA) { RowChange rowChage = null; try { // 獲取儲存的內(nèi)容 rowChage = RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException("parse event has an error , data:" + entry.toString(), e); } // 獲取當(dāng)前內(nèi)容的事件類型 EventType eventType = rowChage.getEventType(); logger.info(row_format, new Object[]{entry.getHeader().getLogfileName(), String.valueOf(entry.getHeader().getLogfileOffset()), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType, String.valueOf(entry.getHeader().getExecuteTime()), simpleDateFormat.format(date), entry.getHeader().getGtid(), String.valueOf(delayTime)}); // 事件類型是query或數(shù)據(jù)定義語言DDL直接打印sql語句,,跳出繼續(xù)下一次循環(huán) if (eventType == EventType.QUERY || rowChage.getIsDdl()) { logger.info(" sql ----> " + rowChage.getSql() + SEP); continue; } printXAInfo(rowChage.getPropsList()); // 循環(huán)當(dāng)前內(nèi)容條目的具體數(shù)據(jù) for (RowData rowData : rowChage.getRowDatasList()) { List<CanalEntry.Column> columns; // 事件類型是delete返回刪除前的列內(nèi)容,否則返回改變后列的內(nèi)容 if (eventType == CanalEntry.EventType.DELETE) { columns = rowData.getBeforeColumnsList(); } else { columns = rowData.getAfterColumnsList(); } HashMap<String, Object> map = new HashMap<>(16); // 循環(huán)把列的name與value放入map中 for (Column column: columns){ map.put(column.getName(), column.getValue()); } rows.add(new TwoTuple<>(eventType, map)); } } } return rows; }
ElasticUtils.javapackage com.example.canal.study.common; import com.alibaba.fastjson.JSON; import com.example.canal.study.pojo.Student; import lombok.extern.slf4j.Slf4j; import org.elasticsearch.client.RestHighLevelClient; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.elasticsearch.action.DocWriteRequest; import org.elasticsearch.action.delete.DeleteRequest; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.get.GetRequest; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.RequestOptions; import org.elasticsearch.common.xcontent.XContentType; import java.io.IOException; import java.util.Map; /** * @author haha */ @Slf4j @Component public class ElasticUtils { @Autowired private RestHighLevelClient restHighLevelClient; /** * 新增 * @param student * @param index 索引 */ public void saveEs(Student student, String index) { IndexRequest indexRequest = new IndexRequest(index) .id(student.getId()) .source(JSON.toJSONString(student), XContentType.JSON) .opType(DocWriteRequest.OpType.CREATE); try { IndexResponse response = restHighLevelClient.index(indexRequest, RequestOptions.DEFAULT); log.info("保存數(shù)據(jù)至ElasticSearch成功:{}", response.getId()); } catch (IOException e) { log.error("保存數(shù)據(jù)至elasticSearch失敗: {}", e); } } /** * 查看 * @param index 索引 * @param id _id * @throws IOException */ public void getEs(String index, String id) throws IOException { GetRequest getRequest = new GetRequest(index, id); GetResponse response = restHighLevelClient.get(getRequest, RequestOptions.DEFAULT); Map<String, Object> fields = response.getSource(); for (Map.Entry<String, Object> entry : fields.entrySet()) { System.out.println(entry.getKey() + ":" + entry.getValue()); } } /** * 更新 * @param student * @param index 索引 * @throws IOException */ public void updateEs(Student student, String index) throws IOException { UpdateRequest updateRequest = new UpdateRequest(index, student.getId()); updateRequest.upsert(JSON.toJSONString(student), XContentType.JSON); UpdateResponse response = restHighLevelClient.update(updateRequest, RequestOptions.DEFAULT); log.info("更新數(shù)據(jù)至ElasticSearch成功:{}", response.getId()); } /** * 根據(jù)id刪除數(shù)據(jù) * @param index 索引 * @param id _id * @throws IOException */ public void DeleteEs(String index, String id) throws IOException { DeleteRequest deleteRequest = new DeleteRequest(index, id); DeleteResponse response = restHighLevelClient.delete(deleteRequest, RequestOptions.DEFAULT); log.info("刪除數(shù)據(jù)至ElasticSearch成功:{}", response.getId()); } }
BinLogElasticSearch.javapackage com.example.canal.study.action; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message; import com.example.canal.study.common.CanalDataParser; import com.example.canal.study.common.ElasticUtils; import com.example.canal.study.pojo.Student; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Component; import java.io.IOException; import java.util.List; import java.util.Map; /** * @author haha */ @Slf4j @Component public class BinLogElasticSearch { @Autowired private CanalConnector canalSimpleConnector; @Autowired private ElasticUtils elasticUtils; //@Qualifier("canalHaConnector")使用名為canalHaConnector的bean @Autowired @Qualifier("canalHaConnector") private CanalConnector canalHaConnector; public void binLogToElasticSearch() throws IOException { openCanalConnector(canalHaConnector); // 輪詢拉取數(shù)據(jù) Integer batchSize = 5 * 1024; while (true) { Message message = canalHaConnector.getWithoutAck(batchSize); // Message message = canalSimpleConnector.getWithoutAck(batchSize); long id = message.getId(); int size = message.getEntries().size(); log.info("當(dāng)前監(jiān)控到binLog消息數(shù)量{}", size); if (id == -1 || size == 0) { try { // 等待2秒 Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } else { //1. 解析message對象 List<CanalEntry.Entry> entries = message.getEntries(); List<CanalDataParser.TwoTuple<CanalEntry.EventType, Map>> rows = CanalDataParser.printEntry(entries); for (CanalDataParser.TwoTuple<CanalEntry.EventType, Map> tuple : rows) { if(tuple.eventType == CanalEntry.EventType.INSERT) { Student student = createStudent(tuple); // 2,。將解析出的對象同步到elasticSearch中 elasticUtils.saveEs(student, "student_index"); // 3.消息確認(rèn)已處理 // canalSimpleConnector.ack(id); canalHaConnector.ack(id); } if(tuple.eventType == CanalEntry.EventType.UPDATE){ Student student = createStudent(tuple); elasticUtils.updateEs(student, "student_index"); // 3.消息確認(rèn)已處理 // canalSimpleConnector.ack(id); canalHaConnector.ack(id); } if(tuple.eventType == CanalEntry.EventType.DELETE){ elasticUtils.DeleteEs("student_index", tuple.columnMap.get("id").toString()); canalHaConnector.ack(id); } } } } } /** * 封裝數(shù)據(jù)至Student * @param tuple * @return */ private Student createStudent(CanalDataParser.TwoTuple<CanalEntry.EventType, Map> tuple){ Student student = new Student(); student.setId(tuple.columnMap.get("id").toString()); student.setAge(Integer.parseInt(tuple.columnMap.get("age").toString())); student.setName(tuple.columnMap.get("name").toString()); student.setSex(tuple.columnMap.get("sex").toString()); student.setCity(tuple.columnMap.get("city").toString()); return student; } /** * 打開canal連接 * * @param canalConnector */ private void openCanalConnector(CanalConnector canalConnector) { //連接CanalServer canalConnector.connect(); // 訂閱destination canalConnector.subscribe(); } /** * 關(guān)閉canal連接 * * @param canalConnector */ private void closeCanalConnector(CanalConnector canalConnector) { //關(guān)閉連接CanalServer canalConnector.disconnect(); // 注銷訂閱destination canalConnector.unsubscribe(); } }
CanalDemoApplication.java(Spring Boot啟動類)package com.example.canal.study; import com.example.canal.study.action.BinLogElasticSearch; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.ApplicationArguments; import org.springframework.boot.ApplicationRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; /** * @author haha */ @SpringBootApplication public class CanalDemoApplication implements ApplicationRunner { @Autowired private BinLogElasticSearch binLogElasticSearch; public static void main(String[] args) { SpringApplication.run(CanalDemoApplication.class, args); } // 程序啟動則執(zhí)行run方法 @Override public void run(ApplicationArguments args) throws Exception { binLogElasticSearch.binLogToElasticSearch(); } }
application.propertiesserver.port=8081 spring.application.name = canal-demo canal.server.ip = 192.168.124.5 canal.server.port = 11111 canal.destination = example zookeeper.server.ip = 192.168.124.5:2181 zookeeper.sasl.client = false elasticSearch.server.ip = 192.168.124.5 elasticSearch.server.port = 9200
Canal集群高可用的搭建通過上面的學(xué)習(xí),,我們知道了單機(jī)直連方式的Canala應(yīng)用。在當(dāng)今互聯(lián)網(wǎng)時代,,單實例模式逐漸被集群高可用模式取代,,那么Canala的多實例集群方式如何搭建呢! 基于ZooKeeper獲取Canal實例準(zhǔn)備ZooKeeper的Docker鏡像與容器: docker pull zookeeper docker run -d --name zookeeper --net mynetwork --ip 172.18.0.3 -p 2181:2181 zookeeper docker run -d --name canal-server2 --net mynetwork --ip 172.18.0.8 -p 11113:11113 canal/canal-server
canal.port=11113 canal.zkServers=172.18.0.3:2181 canal.instance.global.spring.xml = classpath:spring/default-instance.xml
canal.instance.mysql.slaveId = 1235 #之前的canal slaveId是1234,保證slaveId不重復(fù)即可 canal.instance.master.address = 172.18.0.6:3306
[zk: localhost:2181(CONNECTED) 15] get /otter/canal/destinations/example/running {"active":true,"address":"172.18.0.4:11111","cid":1}
客戶端鏈接, 消費數(shù)據(jù)可以通過指定ZooKeeper地址和Canal的instance name,canal client會自動從ZooKeeper中的running節(jié)點獲取當(dāng)前服務(wù)的工作節(jié)點,,然后與其建立鏈接: [zk: localhost:2181(CONNECTED) 0] get /otter/canal/destinations/example/running {"active":true,"address":"172.18.0.4:11111","cid":1}
CanalConnector connector = CanalConnectors.newClusterConnector("172.18.0.3:2181", "example", "", "");
[zk: localhost:2181(CONNECTED) 4] get /otter/canal/destinations/example/1001/running {"active":true,"address":"192.168.124.5:59887","clientId":1001}
[zk: localhost:2181(CONNECTED) 5] get /otter/canal/destinations/example/1001/cursor {"@type":"com.alibaba.otter.canal.protocol.position.LogPosition","identity":{"slaveId":-1,"sourceAddress":{"address":"mysql.mynetwork","port":3306}},"postion":{"included":false,"journalName":"binlog.000004","position":2169,"timestamp":1562672817000}}
docker exec -it canal-server bash cd canal-server/bin sh stop.sh
[zk: localhost:2181(CONNECTED) 19] get /otter/canal/destinations/example/running {"active":true,"address":"172.18.0.8:11111","cid":1}
異常與總結(jié)elasticsearch-head無法訪問Elasticsearches與es-head是兩個獨立的進(jìn)程,當(dāng)es-head訪問es服務(wù)時,,會存在一個跨域問題,。所以我們需要修改es的配置文件,增加一些配置項來解決這個問題,,如下: [root@localhost /usr/local/elasticsearch-head-master]# cd ../elasticsearch-5.5.2/config/ [root@localhost /usr/local/elasticsearch-5.5.2/config]# vim elasticsearch.yml # 文件末尾加上如下配置 http.cors.enabled: true http.cors.allow-origin: "*"
elasticsearch-head查詢報406 Not Acceptable
#6886行 contentType: "application/x-www-form-urlencoded 改成 contentType: "application/json;charset=UTF-8" #7574行 var inspectData = s.contentType === "application/x-www-form-urlencoded" && 改成 var inspectData = s.contentType === "application/json;charset=UTF-8" &&
使用elasticsearch-rest-high-level-client報org.elasticsearch.action.index.IndexRequest.ifSeqNo#pom中除了加入依賴 <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> <version>7.1.1</version> </dependency> #還需加入 <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>7.1.1</version> </dependency>
為什么ElasticSearch要在7.X版本不能使用type?
參考:為什么ElasticSearch要在7.X版本去掉type,? 使用spring-data-elasticsearch.jar報org.elasticsearch.client.transport.NoNodeAvailableException由于本文使用的是elasticsearch7.x以上的版本,,目前spring-data-elasticsearch底層采用es官方TransportClient,而es官方計劃放棄TransportClient,,工具以es官方推薦的RestHighLevelClient進(jìn)行調(diào)用請求,。 可參考RestHighLevelClient API。 設(shè)置Docker容器開啟啟動如果創(chuàng)建時未指定 --restart=always ,可通過update 命令 docker update --restart=always [containerID]
Docker for Mac network host模式不生效
Host模式是為了性能,,但是這卻對Docker的隔離性造成了破壞,,導(dǎo)致安全性降低。 在性能場景下,,可以用--netwokr host開啟Host模式,,但需要注意的是,如果你用Windows或Mac本地啟動容器的話,,會遇到Host模式失效的問題,。原因是Host模式只支持Linux宿主機(jī)。 客戶端連接ZooKeeper報authenticate using SASL(unknow error)
如果更換canal.client.jar中依賴的zookeeper.jar的版本把Canal的官方源碼下載到本機(jī)git clone https://github.com/alibaba/canal.git,,然后修改client模塊下pom.xml文件中關(guān)于ZooKeeper的內(nèi)容,然后重新mvn install:
關(guān)于選型的取舍 |
|