--- title: "使用 INFINI Console 实现 Elasticsearch 的增量数据迁移" date: 2023-06-09 lastmod: 2023-06-09 description: "INFINI Console 1.3.0新增增量数据迁移功能,支持日志场景下的数据迁移,解决延迟、更新与删除问题,通过配置更新字段和延迟确保数据一致性,操作简便且兼容性好。" tags: ["Elasticsearch", "INFINI Console"] summary: "功能介绍 # 在 INFINI Console 1.3.0 版本里,数据迁移功能增加了对增量迁移的支持。这篇文章将会介绍增量迁移的具体使用方法和实现原理。 场景介绍 # 以常见的日志场景为例,假设 A 集群有一个用来记录线上 HTTP 请求记录的索引 request-logs,数据结构如下: { "request_body": {...}, "request_header": {...}, "method": "POST", "request_time": "2023-06-09 12:30:09+800" // 客户端记录请求的时间 "@timestamp": "2023-06-09 12:30:11+800" // 请求写入ES的时间 } 我们希望完整导入这个索引的数据到另外一个集群 B 的 request-logs。要想确保导入的数据完整,我们首先需要考虑数据写入的延迟问题: A 集群的数据写入可能会有延迟。日志往往是从不同的节点收集异步上传的,考虑到网络环境波动等情况,最终日志写入 Elasticserach 的时间会有差异。 写入 Elasticsearch 的数据并不会立刻对查询请求可见,Elasticsearch 会异步刷新写入的数据。 也就是说,我们假设每一条请求日志从采集到写入 ES 到最终可以被查询的延迟为d,每次进行增量迁移的时候,我们可以完整迁移的数据范围就是[当前时间 - 上次迁移的时间 - d, 当前时间 - d)。只要数据写入的延迟不超过d,我们就可以从集群 A 查询到完整的数据集写入集群 B。 集群 A 的数据有更新操作? # 在上述的日志场景里,我们通常不会对写入的日志文档进行后续的更新操作,每一条文档写入后都是不可变的,我们只需要筛选@timestamp字段就可以找到需要迁移的数据了,而且每条数据只需要迁移一次就可以确保目标集群的数据一致。 如果源数据有更新,那我们应该如何进行增量迁移呢?通常情况下,每次更新操作我们都会记录文档更新的时间到update_time字段,这样我们就可以使用update_time字段来进行增量数据的迁移。 假设在第一次迁移的时候,索引 A 存有以下数据,我们在进行第一次迁移操作后,数据可以完整写入目标索引:" --- ## 功能介绍 在 INFINI Console 1.3.0 版本里,数据迁移功能增加了对增量迁移的支持。这篇文章将会介绍增量迁移的具体使用方法和实现原理。 ## 场景介绍 以常见的日志场景为例,假设 A 集群有一个用来记录线上 HTTP 请求记录的索引 `request-logs`,数据结构如下: ``` { "request_body": {...}, "request_header": {...}, "method": "POST", "request_time": "2023-06-09 12:30:09+800" // 客户端记录请求的时间 "@timestamp": "2023-06-09 12:30:11+800" // 请求写入ES的时间 } ``` 我们希望完整导入这个索引的数据到另外一个集群 B 的 request-logs。要想确保导入的数据完整,我们首先需要考虑数据写入的延迟问题: 1. A 集群的数据写入可能会有延迟。日志往往是从不同的节点收集异步上传的,考虑到网络环境波动等情况,最终日志写入 Elasticserach 的时间会有差异。 2. 写入 Elasticsearch 的数据并不会立刻对查询请求可见,Elasticsearch 会异步刷新写入的数据。 也就是说,我们假设每一条请求日志从采集到写入 ES 到最终可以被查询的延迟为`d`,每次进行增量迁移的时候,我们可以完整迁移的数据范围就是`[当前时间 - 上次迁移的时间 - d, 当前时间 - d)`。只要数据写入的延迟不超过`d`,我们就可以从集群 A 查询到完整的数据集写入集群 B。 ### 集群 A 的数据有更新操作? 在上述的日志场景里,我们通常不会对写入的日志文档进行后续的更新操作,每一条文档写入后都是不可变的,我们只需要筛选`@timestamp`字段就可以找到需要迁移的数据了,而且每条数据只需要迁移一次就可以确保目标集群的数据一致。 如果源数据有更新,那我们应该如何进行增量迁移呢?通常情况下,每次更新操作我们都会记录文档更新的时间到`update_time`字段,这样我们就可以使用`update_time`字段来进行增量数据的迁移。 假设在第一次迁移的时候,索引 A 存有以下数据,我们在进行第一次迁移操作后,数据可以完整写入目标索引: {{% load-img "/img/blog/2023/data-migration/update-1.png" "Migration step 1" %}} 如果在第二次迁移之前,索引 A 中有一条旧的记录被更新,这个时候,迁移流程可以通过`update_time`字段检测到这一条数据,复制并覆盖目标集群的旧数据记录: {{% load-img "/img/blog/2023/data-migration/update-2.png" "Migraiton step 2" %}} 可以看到,即使源数据有更新,只要我们记录了每一条数据的更新时间,迁移过程最后写入集群 B 的数据依然是完整且一致的。 ### 集群 A 的数据有删除操作? 如果源集群的数据有删除操作,基于上述的数据迁移逻辑,第二次迁移过程是无法判断已经迁移的数据是否被删除的: {{% load-img "/img/blog/2023/data-migration/delete-hard.png" "Migration process with hard delete" %}} 如果我们希望迁移的数据完整,需要避免对源集群的数据进行删除操作。我们可以标记文档为`deleted`,但是不做实际删除操作,这样我们就可以通过文档删除(更新)操作的时间来进行完整的数据迁移: {{% load-img "/img/blog/2023/data-migration/delete-soft.png" "Migration process with soft delete" %}} ## 使用 INFINI Console 来进行增量数据的迁移 在 INFINI Console 里,我们可以使用 数据工具 - 数据迁移 功能来对数据进行增量迁移。作为示例,我们新建一个数据迁移任务,把`.infini_requests_logging-000002`索引的数据迁移到目标集群的`request`索引。 {{% load-img "/img/blog/2023/data-migration/step-1.png" "Create migration task in console" %}} 我们需要迁移到一个新创建的索引,在`Initialize Configuration`步骤,根据提示配置目标索引的`mapping`和`setting`信息。如果不需要特殊配置,可以使用`Auto Optimize`功能自动填充。 {{% load-img "/img/blog/2023/data-migration/step-2.png" "Update target index setting" %}} 接下来我们需要配置数据写入的更新字段和写入的延迟。请求日志通过`timestamp`字段记录请求时间,通常延迟不会超过 1 分钟,我们在`Migrate Setting`步骤里配置相应的`Incremental`信息。如果历史数据量比较大或者增量任务的运行间隔较长,我们也可以配置`Partition`分区规则来拆分任务为更细的粒度,避免因单个任务长时间导出数据对 ES 产生过高的负载。 {{% load-img "/img/blog/2023/data-migration/step-3.png" "Update migration settings" %}} 最后,在创建任务时,我们勾选`Detect Incremental Data`,然后设置任务每 15 分钟检测一次增量数据。 {{% load-img "/img/blog/2023/data-migration/step-4.png" "Update execution settings" %}} 点击开始后,增量任务就会开始运行。第一次运行时会对历史数据进行全量迁移,后续每 15 分钟会自动检测新数据并迁移到目标索引。 {{% load-img "/img/blog/2023/data-migration/step-5.png" "Task execution detail" %}} ## 总结 除了数据一致性,迁移功能的设计也需要兼顾性能、稳定性、ES 版本兼容性等,INFINI Console 提供了一套操作简便的数据迁移解决方案,可以用于各种场景的数据迁移需求。