📣 极限科技诚招搜索运维工程师(Elasticsearch/Easysearch)- 全职/北京 👉 : 立即申请加入
通过 Canal 将 MySQL 数据实时同步到 Easysearch

Canal 是阿里巴巴集团提供的一个开源产品,能够通过解析数据库的增量日志,提供增量数据的订阅和消费功能。使用 Canal 模拟成 MySQL 的 Slave,实时接收 MySQL 的增量数据 binlog,然后通过 RESTful API 将数据写入到 Easysearch 中。

前提条件 #

  1. 部署 Easysearch 集群。
  2. 部署 MySQL 数据库。
  3. 部署 Gateway,Canal Adapter 不支持使用 HTTPS 协议连接,使用 Gateway 代理 Easysearch 。
  4. 部署 Console,方便查看 Easysearch 数据。

对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下:

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

创建 canal 用户,授权 canal 连接 MySQL 具有作为 MySQL slave 的权限。

CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

操作步骤 #

在进行数据同步时支持自定义索引 Mapping,但需保证 Mapping 中定义的字段(名称+类型)与 MySQL 中一致。

1. 准备 MySQL 数据源 #

create database canal;
use canal;
CREATE TABLE `test` (
    `id` bigint(32) NOT NULL,
    `name` text NOT NULL,
    `age` smallint  NOT NULL,
    PRIMARY KEY (`id`)
) ENGINE=InnoDB
DEFAULT CHARACTER SET=utf8;

2. Easysearch 创建索引 #

PUT test
{
    "settings" : {
      "index" : {
        "number_of_shards" : "1",
        "number_of_replicas" : "1"
      }
    },
    "mappings" : {
            "properties" : {
              "id": {
                   "type": "integer"
               },
               "name": {
                    "type" : "text"
                },
                "age" : {
                    "type" : "integer"
                }
            }
    }
}

3. 安装并启动 Canal-server #

下载https://github.com/alibaba/canal/releases/download/canal-1.1.7/canal.deployer-1.1.7.tar.gz
修改配置文件 vi conf/example/instance.properties

启动 canal
sh bin/startup.sh
启动成功日志信息,logs/canal/canal.log

关闭 canal
sh bin/stop.sh

4. 安装并启动 Canal-adapter #

下载https://github.com/alibaba/canal/releases/download/canal-1.1.7/canal.adapter-1.1.7.tar.gz
修改配置文件:application.yml

server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null

canal.conf:
  flatMessage: true
  syncBatchSize: 1000
  retries: -1
  timeout:
  accessKey:
  secretKey:
  consumerProperties:
    canal.tcp.server.host: 127.0.0.1:11111
    canal.tcp.batch.size: 500

  srcDataSources:
    defaultDS:
      url: jdbc:mysql://127.0.0.1:3306/canal?useUnicode=true
      username: canal
      password: canal
  canalAdapters:
    groups:
    - groupId: g1
      outerAdapters:
      - name: logger
      - name: es7
        properties:
          security.auth: admin:4ad8f8f792e81cd0a6de
          cluster.name: easysearch

新增 canal-adapter/conf/es7/test.yml,配置索引和表的映射关系。

dataSourceKey: defaultDS
destination: example
groupId: g1
esMapping:
  _index: test           # es 的索引名称
  _id: _id               # es 的_id, 如果不配置该项必须配置下面的pk项_id则会由es自动分配
  # sql映射
  sql: " select a.id as _id,a.id,a.name,a.age from test a "
  etlCondition: "where a.id>={}"
  commitBatch: 3000      # 提交批大小

启动 canal-adapter
./bin/startup.sh

5. 验证增量数据同步 #

在 MySQL 数据库中,对 test 表插入两条数据。
insert test(id,name,age) values(1,'canal_test1',11);
insert test(id,name,age) values(2,'canal_test2',22);

6. 在 Console 中,执行以下命令查询数据 #

最后 #

Canal 同步的是增量数据,不会同步之前的存量数据。要同步存量数据可参考 《使用 Logstash 同步 MySQL 到 Easysearch》

标签
2026 x
开源 x
赞助 x
开源生态 x
社区 x
低空经济 x
商业化 x
Easysearch x
数据分析 x
金猿奖 x
国产化 x
搜索引擎 x
Coco AI x
技术卓越奖 x
创新产品奖 x
IT168 x
APM x
Skywalking x
产品更新 x
Easy-Es x
Coco x
AI x
GitLab x
代码审核 x
人工智能 x
石油石化 x
performance x
Gitee x
投票 x
Meilisearch x
Rust x
轻量级 x
搜索百科 x
Docker x
Docker Compose x
Easyserach x
Console x
DevOps x
Elasticsearch x
国产替代 x
backup x
snapshot x
CCR x
Gateway x
esdump x
source_reuse x
ignore_above x
OpenSearch x
AWS x
Lucene x
Solr x
Easyearch x
发明专利 x
数据分区 x
国际专利 x
一等奖 x
人工智能应用创新大赛 x
bulk x
embedding x
OpenAI x
IK x
TDBC x
2025 x
信通院 x
可信数据库大会 x
搜索型数据库 x
中国数据库产业图谱 x
上海开源创新菁英荟 x
开源创新新星企业 x
Workshop x
AI 搜索 x
智能助手 x
Automation x
Logstash x
MongoDB x
开源中国 x
直播 x
merge x
Elasticsearch 9 x
GitCode x
AI搜索 x
Cloud x
rollup x
Kubernetes x
Operator x
Arm64 x
Snapshot x
S3 x
Grafana x
Opensearch x
Nginx x
直播活动 x
搜索客社区 x
Meetup x
ES x
企业搜索 x
DeepSeek x
RAG x
certificate x
windows x
Rollup x
TopN x
Filebeat x
Ubuntu x
请求限速 x
INFINI Console x
指标 x
Kibana x
多集群 x
client x
Spring Boot x
ECE x
ES Bulk x
vector database x
Postgres x
可搜索快照 x
SDK x
官网 x
Web 开发 x
Next.js x
React x
Three.js x
Metrics x
Helm x
filter x
querycache x
practice x
Agent x
localStorage x
响应式 x
时间组件 x
时区组件 x
极限科技 x
三周年 x
周年庆 x
国家高新技术企业 x
校园招聘 x
湖北工业大学 x
Tauri x
Web 开发人员 x
桌面应用开发 x
桌面端 x
Electron x
Pizza x
认证培训 x
报名 x
Scrapy x
爬虫 x
Rust开发者大会 x
docsearch x
文档搜索 x
Easyseach x
有奖征文 x
黑神话悟空 x
EKS x
征文系列 x
跨集群搜索 x
科技中小企业 x
白皮书 x
Python SDK x
数据库产业图谱 x
超大规模 x
分布式集群 x
写入限流 x
2024可信数据库发展大会 x
创新型中小企业 x
搜索数据库 x
正排索引 x
免费许可证 x
K8S x
DTC2024 x
实时搜索 x
ES国产化 x
Redis x
OOM x
测试 x
内存 x
趋势 x
AI绘画 x
Stable Diffusion x
Diffusion x
Model x
GAN x
语义搜索 x
知识图 x
向量数据库 x
中国信通院 x
星河(Galaxy) x
标杆案例 x
鲲鹏 x
鲲鹏技术认证 x
客户端 x
日志平台 x
LDAP x
Loadgen x
中国一汽 x
国内数据库 x
墨天轮 x
监控系统 x
集成测试 x
ZSTD x
Helm Charts x
国产适配 x
兆芯 x
Linux x
LoongArch x
信创适配 x
二维拆分算法 x
中国移动云 x
Vault x
加密 x
安全工具 x
kNN x
向量检索 x
图片搜索 x
Alerting x
SQL x
搜索 x
Embedding x
可信数据库 x
统信 x
海光 x
龙芯 x
restore x
Arm x
大数据企业证书 x
移动云大会 x
信通院产品评测 x
国内首家 x
数据可视化 x
北京软协 x
第十届理事会会员单位 x
Apache Arrow x
宣传片 x
大会分享 x
多集群管理 x
无缝数据迁移 x
Loadrun x
INFINI Gateway x
log4j x