--- title: "使用 Logstash 同步 MySQL 到 Easysearch" date: 2023-08-14 lastmod: 2023-08-14 description: "本文介绍使用Logstash将MySQL数据同步至Easysearch的方案,需确保MySQL表有主键和时间字段以支持增量同步。通过配置Logstash,实现定期同步、数据修改及新增的实时更新,并提供软删除和监控方案。" tags: ["Easysearch"] summary: "从 MySQL 同步数据到 ES 有多种方案,这次我们使用 ELK 技术栈中的 Logstash 来将数据从 MySQL 同步到 Easysearch 。 方案前提 # MySQL 表记录必须有主键,比如 id 字段。通过该字段,可将 Easysearch 索引数据与 MySQL 表数据形成一对一映射关系,支持修改。 MySQL 表记录必须有时间字段,以支持增量同步。 如果上述条件具备,便可使用 logstash 定期同步新写入或修改后的数据到 Easysearch 中。 方案演示 # 版本信息 # MySQL: 5.7 Logstash: 7.10.2 Easysearch: 1.5.0 MySQL 设置 # 创建演示用的表。 CREATE DATABASE es_db; USE es_db; DROP TABLE IF EXISTS es_table; CREATE TABLE es_table ( id BIGINT(20) UNSIGNED NOT NULL, PRIMARY KEY (id), UNIQUE KEY unique_id (id), client_name VARCHAR(32) NOT NULL, modification_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, insertion_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ); 说明" --- 从 MySQL 同步数据到 ES 有多种方案,这次我们使用 ELK 技术栈中的 Logstash 来将数据从 MySQL 同步到 Easysearch 。 ## 方案前提 1. MySQL 表记录必须有主键,比如 id 字段。通过该字段,可将 Easysearch 索引数据与 MySQL 表数据形成一对一映射关系,支持修改。 2. MySQL 表记录必须有时间字段,以支持增量同步。 如果上述条件具备,便可使用 logstash 定期同步新写入或修改后的数据到 Easysearch 中。 ## 方案演示 ### 版本信息 MySQL: 5.7 Logstash: 7.10.2 Easysearch: 1.5.0 ### **MySQL 设置** 创建演示用的表。 ```yaml CREATE DATABASE es_db; USE es_db; DROP TABLE IF EXISTS es_table; CREATE TABLE es_table ( id BIGINT(20) UNSIGNED NOT NULL, PRIMARY KEY (id), UNIQUE KEY unique_id (id), client_name VARCHAR(32) NOT NULL, modification_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, insertion_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP ); ``` **说明** - id 字段: 主键、唯一键,将作为 Easysearch 索引中的 doc id 字段。 - modification_time 字段: 表记录的插入和修改都会记录在此。 - client_name: 代表用户数据。 - insertion_time: 可省略,用来记录数据插入到 MySQL 数据的时间。 #### 插入数据 ```yaml INSERT INTO es_table (id, client_name) VALUES (1, 'test 1'); INSERT INTO es_table (id, client_name) VALUES (2, 'test 2'); INSERT INTO es_table (id, client_name) VALUES (3, 'test 3'); ``` ### **Logstash** 配置文件 {{< expand "展开查看配置文件" "..." >}} ```yaml input { jdbc { jdbc_driver_library => "./mysql-connector-j-8.1.0/mysql-connector-j-8.1.0.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://192.168.56.3:3306/es_db" jdbc_user => "root" jdbc_password => "password" jdbc_paging_enabled => true tracking_column => "unix_ts_in_secs" use_column_value => true tracking_column_type => "numeric" last_run_metadata_path => "./.mysql-es_table-sql_last_value.yml" schedule => "*/5 * * * * *" statement => "SELECT *, UNIX_TIMESTAMP(modification_time) AS unix_ts_in_secs FROM es_table WHERE (UNIX_TIMESTAMP(modification_time) > :sql_last_value AND modification_time < NOW()) ORDER BY modification_time ASC" } jdbc { jdbc_driver_library => "./mysql-connector-j-8.1.0/mysql-connector-j-8.1.0.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://192.168.56.3:3306/es_db" jdbc_user => "root" jdbc_password => "password" schedule => "*/5 * * * * *" statement => "SELECT count(*) AS count,'es_table' AS table_name from es_table" } } filter { if ![table_name] { mutate { copy => { "id" => "[@metadata][_id]"} remove_field => ["@version", "unix_ts_in_secs","@timestamp"] add_field => { "[@metadata][target_index]" => "mysql_es_table" } } } else { mutate { add_field => { "[@metadata][target_index]" => "table_counts" } remove_field => ["@version"] } uuid { target => "[@metadata][_id]" overwrite => true } } } output { # stdout { codec => rubydebug { metadata => true } } elasticsearch { hosts => ["https://localhost:9200"] user => "admin" password => "f0c6fc61fe5f7b084c00" ssl_certificate_verification => "false" index => "%{[@metadata][target_index]}" manage_template => "false" document_id => "%{[@metadata][_id]}" } } ``` {{< /expand >}} - 每 5 秒钟同步一次 es_table 表的数据到 mysql_sync_idx 索引。 - 每 5 秒统计一次 es_table 表的记录条数到 table_counts 索引,用于监控。 #### 启动 logstash ```yaml ./bin/logstash -f sync_es_table.conf ``` 查看同步结果, 3 条数据都已同步到索引。 {{% load-img "/img/blog/2023/sync-mysql-to-es-using-logstash/1.png" "" %}} Mysql 数据库新增记录 ```yaml INSERT INTO es_table (id, client_name) VALUES (4, 'test 4'); ``` Easysearch 确认新增 {{% load-img "/img/blog/2023/sync-mysql-to-es-using-logstash/2.png" "" %}} Mysql 数据库修改记录 ```yaml UPDATE es_table SET client_name = 'test 0001' WHERE id=1; ``` Easysearch 确认修改 {{% load-img "/img/blog/2023/sync-mysql-to-es-using-logstash/3.png" "" %}} ### **删除数据** Logstash 无法直接删除操作到 ES ,有两个方案: 1. 在表中增加 is_deleted 字段,实现软删除,可达到同步的目的。查询过滤掉 is_deleted : true 的记录,后续通过脚本等方式定期清理 is_deleted : true 的数据。 2. 执行删除操作的程序,删除完 MySQL 中的记录后,继续删除 Easysearch 中的记录。 ### **同步监控** 数据已经在 ES 中了,我们可利用 INFINI Console 的数据看板来监控数据是否同步,展示表记录数、索引记录数及其变化。 {{% load-img "/img/blog/2023/sync-mysql-to-es-using-logstash/4.png" "" %}}