--- title: "如何使用 DataX 连接 Easysearch" date: 2025-02-06 lastmod: 2025-02-06 description: "DataX是阿里开源的离线数据同步工具,支持多种异构数据源间的高效数据传输。本文介绍如何使用DataX将数据写入Easysearch,包括任务配置、运行及优化方法,示例中实现50万个文档10秒内写入。" tags: ["Easysearch"] summary: "DataX # DataX 是阿里开源的一款离线数据同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。 本篇主要介绍 DataX 如何将数据写入到 Easysearch,对于各种数据源的连接不会做深入的探讨,感兴趣的小伙伴可以访问 DataX 的 Github 仓库查看详情。 下载与安装 # DataX 无需安装,下载后解压即可使用。 系统需求: JDK 1.8 及以上 Python2 或 3 创建任务配置文件 # 每个数据同步的操作可称为一个任务,任务的配置文件定义了数据源(reader)、数据目的(writer) ,以及任务的设置信息,如并发数、速度控制等。DataX 集成了如此多的数据源,如果靠纯手工编写任务配置显然不现实。官方也出了个命令可以根据指定的数据源和数据目的帮助大家生成任务配置。 python datax.py -r {YOUR_READER} -w {YOUR_WRITER} 测试配置文件 此次演示使用 streamreader 和 elasticsearchwriter 作为数据源和数据目的,任务配置如下: { "job": { "content": [ { "reader": { "name": "streamreader", "parameter": { "sliceRecordCount": 10000, "column": [ { "type": "long", "value": "10" }, { "type": "string", "value": "hello,你好,世界-DataX" }, { "type": "string", "value": "hello,你好,Easysearch" } ] } }, "writer": { "name": "elasticsearchwriter", "parameter": { "endpoint": "http://localhost:9200", "accessId": "admin", "accessKey": "1ef0c661d8562aaa06be", "index": "yf-test", "column": [ {"name": "no", "type": "long"}, { "name": "content","type": "keyword" }, { "name": "content2","type": "keyword" } ] } } } ], "setting": { "speed": { "channel": 50 } } } } streamreader 是一个从内存读取数据的插件, 它主要用来快速生成期望的数据并对写入插件进行测试。" --- ## DataX DataX 是阿里开源的一款离线数据同步工具,致力于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间稳定高效的数据同步功能。 本篇主要介绍 DataX 如何将数据写入到 Easysearch,对于各种数据源的连接不会做深入的探讨,感兴趣的小伙伴可以访问 [DataX](https://github.com/alibaba/DataX/blob/master/introduction.md) 的 Github 仓库查看详情。 ## 下载与安装 DataX 无需安装,下载后解压即可使用。 系统需求: 1. JDK 1.8 及以上 2. Python2 或 3 ## 创建任务配置文件 每个数据同步的操作可称为一个任务,任务的配置文件定义了数据源(reader)、数据目的(writer) ,以及任务的设置信息,如并发数、速度控制等。DataX 集成了如此多的数据源,如果靠纯手工编写任务配置显然不现实。官方也出了个命令可以根据指定的数据源和数据目的帮助大家生成任务配置。 ```plain python datax.py -r {YOUR_READER} -w {YOUR_WRITER} ``` **测试配置文件** 此次演示使用 streamreader 和 elasticsearchwriter 作为数据源和数据目的,任务配置如下: ```plain { "job": { "content": [ { "reader": { "name": "streamreader", "parameter": { "sliceRecordCount": 10000, "column": [ { "type": "long", "value": "10" }, { "type": "string", "value": "hello,你好,世界-DataX" }, { "type": "string", "value": "hello,你好,Easysearch" } ] } }, "writer": { "name": "elasticsearchwriter", "parameter": { "endpoint": "http://localhost:9200", "accessId": "admin", "accessKey": "1ef0c661d8562aaa06be", "index": "yf-test", "column": [ {"name": "no", "type": "long"}, { "name": "content","type": "keyword" }, { "name": "content2","type": "keyword" } ] } } } ], "setting": { "speed": { "channel": 50 } } } } ``` streamreader 是一个从内存读取数据的插件, 它主要用来快速生成期望的数据并对写入插件进行测试。 我们用 streamreader 构造了 10000 个文档,文档含三个字段,任务启动了 50 个 channel 进行数据发送,结果就是共计发送 50w 个文档。 elasticssearchwriter 指定了 Easysearch 的连接信息: + endpoint: Easysearch 的地址和端口 + accessId: 用户名 + accessKey: 密码 + index: 写入索引名 + column: 对 reader 发来数据的 schema 定义 + batchsize: 默认 1000 这次我们 Easysearch 开启的 http 服务,因为 DataX 的 elasticsearchwriter 无法跳过证书验证。对于必须使用 https 的场景,可使用 [INFINI Gateway](https://infinilabs.cn/docs/latest/gateway/) 代理 ES 服务,提供 http 通道给离线数据同步专用。 ⚠️注意: 不同的 reader、writer 对 sliceRecordCount 和 channel 会有不同的行为。 ## Easysearch 本次测试使用的 [Easysearch](https://infinilabs.cn/download/) 版本是 1.9.0,需要注意是 Easysearch 要开启兼容性参数: ```plain elasticsearch.api_compatibility: true ``` 否则创建索引报错退出。(实际索引创建成功了但是 mapping 信息是空的) {{% load-img "/img/blog/2025/how-to-connect-easysearch-using-datax/1.png" "" %}} ## 运行任务 编辑好任务配置文件后,下一步就是执行任务。 ```plain python3 datax.py yf-test.json ``` {{% load-img "/img/blog/2025/how-to-connect-easysearch-using-datax/2.png" "" %}} 写入数据时索引不存在,Datax 根据 schema 定义创建了索引。 {{% load-img "/img/blog/2025/how-to-connect-easysearch-using-datax/3.png" "" %}} OK 任务执行完毕,写入 50w 个文档耗时 10 秒。如果有其他问题欢迎与我联系。 {{% load-img "/img/blog/banner/about_yangf.png" "" %}}