--- title: "通过 Canal 将 MySQL 数据实时同步到 Easysearch" date: 2023-11-17 lastmod: 2023-11-17 description: "Canal是一款开源工具,可通过解析MySQL增量日志实现数据订阅与消费。本文介绍将其用于将MySQL增量数据同步至Easysearch的全流程,包括环境准备、配置Canal及Adapter、创建索引与验证同步等步骤。注意,Canal仅同步增量数据,存量数据需其他工具处理。" tags: ["Easysearch"] summary: "Canal 是阿里巴巴集团提供的一个开源产品,能够通过解析数据库的增量日志,提供增量数据的订阅和消费功能。使用 Canal 模拟成 MySQL 的 Slave,实时接收 MySQL 的增量数据 binlog,然后通过 RESTful API 将数据写入到 Easysearch 中。 前提条件 # 部署 Easysearch 集群。 部署 MySQL 数据库。 部署 Gateway,Canal Adapter 不支持使用 HTTPS 协议连接,使用 Gateway 代理 Easysearch 。 部署 Console,方便查看 Easysearch 数据。 对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下: [mysqld] log-bin=mysql-bin # 开启 binlog binlog-format=ROW # 选择 ROW 模式 server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复 创建 canal 用户,授权 canal 连接 MySQL 具有作为 MySQL slave 的权限。" --- Canal 是阿里巴巴集团提供的一个开源产品,能够通过解析数据库的增量日志,提供增量数据的订阅和消费功能。使用 Canal 模拟成 MySQL 的 Slave,实时接收 MySQL 的增量数据 binlog,然后通过 RESTful API 将数据写入到 Easysearch 中。 ## 前提条件 1. 部署 Easysearch 集群。 2. 部署 MySQL 数据库。 3. 部署 Gateway,Canal Adapter 不支持使用 HTTPS 协议连接,使用 Gateway 代理 Easysearch 。 4. 部署 Console,方便查看 Easysearch 数据。 对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下: ``` [mysqld] log-bin=mysql-bin # 开启 binlog binlog-format=ROW # 选择 ROW 模式 server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复 ``` 创建 canal 用户,授权 canal 连接 MySQL 具有作为 MySQL slave 的权限。 ``` CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ; FLUSH PRIVILEGES; ``` ## 操作步骤 在进行数据同步时支持自定义索引 Mapping,但需保证 Mapping 中定义的字段(名称+类型)与 MySQL 中一致。 ### 1. 准备 MySQL 数据源 ``` create database canal; use canal; CREATE TABLE `test` ( `id` bigint(32) NOT NULL, `name` text NOT NULL, `age` smallint NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARACTER SET=utf8; ``` ### 2. Easysearch 创建索引 ``` PUT test { "settings" : { "index" : { "number_of_shards" : "1", "number_of_replicas" : "1" } }, "mappings" : { "properties" : { "id": { "type": "integer" }, "name": { "type" : "text" }, "age" : { "type" : "integer" } } } } ``` ### 3. 安装并启动 Canal-server **下载**:[https://github.com/alibaba/canal/releases/download/canal-1.1.7/canal.deployer-1.1.7.tar.gz](https://github.com/alibaba/canal/releases/download/canal-1.1.7/canal.deployer-1.1.7.tar.gz) **修改配置文件** `vi conf/example/instance.properties` {{% load-img "/img/blog/2023/syncing-mysql-to-easysearch-using-canal/1.png" "" %}} **启动 canal** `sh bin/startup.sh` 启动成功日志信息,logs/canal/canal.log {{% load-img "/img/blog/2023/syncing-mysql-to-easysearch-using-canal/2.png" "" %}} **关闭 canal** `sh bin/stop.sh` ### 4. 安装并启动 Canal-adapter **下载**:[https://github.com/alibaba/canal/releases/download/canal-1.1.7/canal.adapter-1.1.7.tar.gz](https://github.com/alibaba/canal/releases/download/canal-1.1.7/canal.adapter-1.1.7.tar.gz) **修改配置文件**:application.yml ``` server: port: 8081 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 default-property-inclusion: non_null canal.conf: flatMessage: true syncBatchSize: 1000 retries: -1 timeout: accessKey: secretKey: consumerProperties: canal.tcp.server.host: 127.0.0.1:11111 canal.tcp.batch.size: 500 srcDataSources: defaultDS: url: jdbc:mysql://127.0.0.1:3306/canal?useUnicode=true username: canal password: canal canalAdapters: groups: - groupId: g1 outerAdapters: - name: logger - name: es7 properties: security.auth: admin:4ad8f8f792e81cd0a6de cluster.name: easysearch ``` **新增** canal-adapter/conf/es7/test.yml,配置索引和表的映射关系。 ``` dataSourceKey: defaultDS destination: example groupId: g1 esMapping: _index: test # es 的索引名称 _id: _id # es 的_id, 如果不配置该项必须配置下面的pk项_id则会由es自动分配 # sql映射 sql: " select a.id as _id,a.id,a.name,a.age from test a " etlCondition: "where a.id>={}" commitBatch: 3000 # 提交批大小 ``` **启动 canal-adapter** `./bin/startup.sh` {{% load-img "/img/blog/2023/syncing-mysql-to-easysearch-using-canal/3.png" "" %}} ### 5. 验证增量数据同步 在 MySQL 数据库中,对 test 表插入两条数据。 `insert `test`(`id`,`name`,`age`) values(1,'canal_test1',11);` `insert `test`(`id`,`name`,`age`) values(2,'canal_test2',22);` ### 6. 在 Console 中,执行以下命令查询数据 {{% load-img "/img/blog/2023/syncing-mysql-to-easysearch-using-canal/4.png" "" %}} ## 最后 Canal 同步的是增量数据,不会同步之前的存量数据。要同步存量数据可参考[《使用 Logstash 同步 MySQL 到 Easysearch》](https://infinilabs.cn/blog/2023/sync-mysql-to-es-using-logstash/)