本文档主要围绕某客户关于如何基于 Elasticsearch 网关来实现 Elasticsearch 集群的异地容灾解决方案及提供一份性能测试报告作为参考。
方案介绍
# 传统的容灾方案存在各种弊端,这里针对客户的实际需求实现了一套基于极限网关的多集群异地容灾方案,可以完美的解决传统容灾方案在实际运行中的各种问题,并结合客户的实际需求,对双机房资源的利用,网络带宽是限制等方面进行了更加全面的考虑。
总体架构
# 当前客户的容灾需求为建设一个双机房的高可用业务架构,当一端机房出现不可用的情况下,另外的机房可以随时接管业务,针对 Elasticsearch 集群的总体架构设计如下图所示:
如上图,Gateway 组件将承载业务的查询和写入,并且在日常的业务运行过程中,通过 Gateway 还可以将一半的查询流量转发给机房 B 的 Elasticsearch 集群,从而充分利用所有在线的计算资源,降低 A 机房的负载,提升整体的查询体验。 客户目前的数据采用的是每天晚上完全重建的方式,使用 CDP 平台的离线任务,将最新业务数据按照新的结构在 Elasticsearch 创建一份新的索引,待索引重建完成之后通过别名进行切换,从而完成新旧索引的替换。 通过在 CDP Platform 和 Elasticsearch 集群之间增加一层网关,可以让重建的写入请求无缝复制一份到机房 B 的对象存储,对于异地跨专线的数据复制,这个方案有两个优点:
基于文档操作进行复制,而不是生成的最终索引文件,机房 AB 没有版本和架构的依赖 复制的内容更小,更利于压缩,压缩比 ~16:1,对专线带宽的消耗更低,长距离传输时间更短 复制速度更快,将消息批量压缩之后上传到对象存储,机房 B 本地解压再批量进行消费 两个机房分别独立部署业务系统、Elasticsearch 网关和 Elasticsearch 集群,业务系统正常情况下通过本机房的网关来访问 Elasticsearch,备机房的网关和业务系统正常情况下没有流量,当故障发生之后,可以随时启用备机房的应用和集群,主备机房的数据变更分别记录并管理,当主集群恢复之后,会立即将队列里面的数据进行同步,并最终保持一致。
数据复制方式
# 通过极限网关进行数据复制的方式,只需要将业务端代码里面访问的 Elasticsearch 集群地址替换为极限网关的访问地址即可,业务端通过访问网关而不是直接访问 Elasticsearch 来进行相关读写操作,网关会进行读写操作进行分离,读操作会直接转发给主集群,同步操作,和正常查询 Elasticsearch 集群没有任何区别。
如果是写操作,会走单独的复制流程,也就是会进行操作级别的复制,请求会先记录到消息队列持久化,然后异步的方式复制到备集群,也就是同步操作主集群,异步操作备集群的方式。
业务端收到的是正常的 Elasticsearch 的响应,操作逻辑和直接访问 Elasticsearch 主集群没有任何区别。
服务高可用设计
# 极限网关自带实现了基于 VRRP 协议的浮动 IP,无需依赖第三方外部组件,即可实现双网关节点的主备热切换。
客户端访问的是一个虚拟的浮动 IP,当浮动 IP 所在的网关节点出现网络故障或者宕机情况下,浮动 IP 会自动漂移到另外一台备份网关节点上,切换过程迅速,业务系统无感知,整个切换过程不会造成服务中断。 此外,极限网关也支持无状态的方式部署,通过使用分布式消息队列,极限网关可以水平扩展,如下图:
通过在极限网关前面再加一层四层流量负载均衡,每个网关都是独立承担业务流量,可以按需水平扩展。
数据一致性设计
# 极限网关在设计过程中,妥善考虑了各种场景下的数据不一致问题,比如使用本地磁盘队列的双节点互备模式,如下图:
业务端通过浮动 IP 访问网关,网关本地会通过磁盘来记录请求变更,如果变更中同步到备集群之前出现宕机,备网关节点接管之后会承担流量的转发,也会本地记录变更,不过备网关不会立即进行消费,而是会先暂停消费,并尝试和主网关进行通信,当主网关在线且消费完毕之后才会自动进行消费,从而确保主备本地磁盘队列数据是时间轴上面的一致性。主备网关在切换过程中一般会结合告警系统触发消息通知。
此外,网关在代理 Elasticsearch 写入操作的过程中,会对请求的响应做各种检测,如下图:
网关会检测各种写入操作的返回信息,根据不同的状态码和错误信息来判断是否需要记录变更和是否需要重试,并最终按照顺序依次复制到备集群。对于故障的请求会有单独的故障队列进行处理,如下图:
正常请求会检测是否存在前序故障消息待处理,确保消息处理的先后顺序,可以认为请求最终会在备集群依次复制,如下图:
复制时效性设计
# 在变更顺序复制的过程中,如何提高复制的速度也是极限网关在考虑的事情,极限网关实现了 Elasticsearch 的非标准 Hash 算法,并通过获取 Elasticsearch 内部的元数据信息,感知 Elasticsearch 集群内部的拓扑结构,可以提前进行文档的分拆合并,如下图:
通过对相同目标节点的文档请求合并再一起,然后按照固定的批次直接转发给对应的目标节点,如下图:
从而实现了请求的精准投递,并实现索引操作的快慢分离,避免常规批次请求因为再次分发而造成的性能瓶颈,提升整体的写入效率,一般都可以提高 30%~50% 的写入速度。
极限网关对于单个队列内数据的消费速度优化,使用了运行时动态分配 Slice 的技术,可以再提高写入并发的前提下,不影响数据的一致性,从而提高写入速度。
另外,极限网关在传输过程中通过压缩流量,可以节省异地机房直接的带宽占用,从而进一步提高批量写入效率。
最后,在网络延迟良好,并且两侧 Elasticsearch 集群性能大致相当的情况下,主备集群基本上可以做到秒级的实时同步。
系统容错性设计
# 极限网关充分考虑到了日常运行过程中可能出现的情形,当主 Elasticsearch 集群出现故障的情况下,我们可能希望通过其自愈或者预定容错策略来降低故障的影响,从而提高服务的总体可用性和改善业务质量。
极限网关提供了两种容错策略,可以根据需要灵活配置。
策略一:写入降级,查询不影响
当这种策略配置之后,如果主集群不可用,网关会自动转发查询请求到备集群,业务查询不受任何影响,但是写入则因为主集群不可用会进行降级,适合写入场景一致性要求比较高或者需要写入操作需要同步返回的场景。
策略二:查询写入不中断
该策略以延迟为代价,当主集群不可用的情况下,网关会自动转发查询请求到备集群,业务查询无影响,但是写入会分别写入到主和备两个队列,主集群不可用,索引队列数据会先堆积起来,备集群在线并实时消费,所以查询操作理论上也可以很快进行查询,不过因为业务操作不是同步操作写入和返回,会对业务操作存在一定的影响。
以上两种策略适合主集群故障发生后,网关进行的自动处理,如果主集群故障时间比较长或者未知,还是需要进行常规的容灾切换。
容灾切换流程
# 当主集群发生故障且时间比较长的情况下,我们可能需要将备集群直接启用,也就让备集群来进行请求的透明转发,写入操作同步转发给备集群,即通过网关和正常访问备集群行为一致,具体切换逻辑流程如下:
通过配置切换,业务写入变成了同步写备集群,异步进主集群,主集群不在线,变更正常记录,主集群恢复之后可以进行追加,并最终保持一致。
容灾回切流程
# 如果主集群的故障处理好,并且可以进行回切了,我们只需要将配置进行调整,如下:
也就是恢复之前的同步写主,异步写备的方式,即可。
数据一致性保障
# 极限网关除了在转发和复制过程中保障数据的一致,极限网关也实现了完整的校验检测机制,如下图:
通过增量+实时的方式来对新增的数据进行分块比对,并将比对结果保存起来,方便及时发现数据不一致的问题并进行更正,如下图:
流量转发规则
# 为了充分利用两个机房的集群资源,只需要在网关配置好响应的转发规则即可,总体架构如下图:
参考的配置文件如下:
flow:
- name: primary-flow
filter:
- http:
schema: "http"
hosts:
- "localhost:8080"
- name: secondary-flow
filter:
- http:
schema: "http"
hosts:
- "localhost:8081"
- name: ratio_traffic_split-flow
filter:
- ratio:
ratio: 0.4 # 40% of traffics will go to localhost:8081
flow: secondary-flow
continue: false
- flow:
flow: primary-flow
entry:
- name: my_entry
enabled: true
router: my_router
max_concurrency: 10000
network:
binding: 0.0.0.0:8000
router:
- name: my_router
default_flow: ratio_traffic_split-flow
限流限速规则
# 客户的业务可能存在突发流量发给 Elasticsearch 集群的可能,如果不进行妥善的管控,可能造成 Elasticsearch 集群的过载,从而造成整体服务的不可用甚至节点内存溢出,可以在网关里面配置一些限速规则来进行管控:
不管是查询流量还是写入流量,都需要进行管控,除了保障 Elasticsearch 集群工作中稳定压力下,另外还可以保障专线流量的稳定。
Elasticsearch 网关支持以下限流方式:
Elasticsearch 节点级别的总体请求次数/流量限制 Elasticsearch 索引级别的总体请求次数/流量限制 根据访问来源的 IP 地址来进行请求次数/流量限流 根据访问目标的主机名来进行请求次数/流量的限制 根据请求头信息进行请求次数/流量的限制 等等 通过网关来统一进行读、写流量的限流,避免一方流量打满整个专线带宽。另外,根据业务的高峰和空闲时段可以灵活调整流量的限制比例。
实施计划
# 针对客户的 Elasticsearch 异地容灾建设的实施计划指定如下:
事项 说明 预计时间 搭建测试环境 需要准备两套 ES 集群、配置网关服务器和对象存储服务 5 天 POC 测试演练 模拟数据导入、查询分流、索引切换、数据校验等能力 5 天 编写上线脚本 按照生产环境编写时间的实施流程和变更脚本 3 天 实施变更上线 备机房部署搭建,网关搭建配置,数据迁移,应用配置变更 2 天 项目试运行 验证导入和查询分流等功能,项目试运行 7 天 验收完结 验收完结,文档整理编写 1 天
具体实施计划以实际为准。
性能测试
# 测试准备
# 部署网关程序
# 系统调优
参考这里的文档:
系统调优
下载程序
修改配置
将网关提供的示例配置拷贝,并根据实际集群的信息进行相应的修改,如下:
[root@iZbp1gxkifg8uetb33pvcoZ gateway]# cp sample-configs/cross-cluster-replication-for-logging gateway.yml
修改集群的注册信息,如下:
如果主备集群的配置信息不一样,需要覆盖集群请求的身份信息,如下:
根据需要修改网关监听的端口,以及是否开启 TLS(如果应用客户端通过http://协议访问ES,请将entry.tls.enabled值改为false),如下:
不同的集群可以使用不同的配置,分别监听不同的端口,用于业务的分开访问。
启动网关 启动网关并指定刚刚创建的配置,如下:
[root@iZbp1gxkifg8uetb33pvcoZ gateway]# ./gateway-linux-amd64 -config gateway.yml
___ _ _____ __ __ __ _
/ _ \ /_\ /__ \/__\/ / /\ \ \/_\ /\_/\
/ /_\///_\\ / /\/_\ \ \/ \/ //_\\\_ _/
/ /_\\/ _ \/ / //__ \ /\ / _ \/ \
\____/\_/ \_/\/ \__/ \/ \/\_/ \_/\_/
[GATEWAY] A light-weight, powerful and high-performance elasticsearch gateway.
[GATEWAY] 1.6.0_SNAPSHOT, 2022-05-18 11:09:54, 2023-12-31 10:10:10, 73408e82a0f96352075f4c7d2974fd274eeafe11
[05-19 13:35:43] [INF] [app.go:174] initializing gateway.
[05-19 13:35:43] [INF] [app.go:175] using config: /opt/gateway/gateway.yml.
[05-19 13:35:43] [INF] [instance.go:72] workspace: /opt/gateway/data1/gateway/nodes/ca2tc22j7ad0gneois80
[05-19 13:35:43] [INF] [app.go:283] gateway is up and running now.
[05-19 13:35:50] [INF] [actions.go:358] elasticsearch [primary] is available
[05-19 13:35:50] [INF] [api.go:262] api listen at: http://0.0.0.0:2900
[05-19 13:35:50] [INF] [reverseproxy.go:261] elasticsearch [primary] hosts: [] => [192.168.0.19:9200]
[05-19 13:35:50] [INF] [reverseproxy.go:261] elasticsearch [backup] hosts: [] => [xxxxxxxx-backup:9200]
[05-19 13:35:50] [INF] [reverseproxy.go:261] elasticsearch [primary] hosts: [] => [192.168.0.19:9200]
[05-19 13:35:50] [INF] [reverseproxy.go:261] elasticsearch [backup] hosts: [] => [xxxxxxxx-primary:9200]
[05-19 13:35:50] [INF] [reverseproxy.go:261] elasticsearch [primary] hosts: [] => [192.168.0.19:9200]
[05-19 13:35:50] [INF] [entry.go:322] entry [my_es_entry/] listen at: https://0.0.0.0:8000
[05-19 13:35:50] [INF] [module.go:116] all modules are started
启动服务 快速安装网关为系统服务,操作方式如下:
[root@iZbp1gxkifg8uetb33pvcpZ console]# ./gateway-linux-amd64 -service install
Success
[root@iZbp1gxkifg8uetb33pvcpZ console]# ./gateway-linux-amd64 -service start
Success
部署管理后台
# 为了方便在多个集群之间快速切换,使用 Console 来进行管理。
下载安装 将提供的安装程序解压即可完成安装,如下:
[root@iZbp1gxkifg8uetb33pvcpZ console]# tar vxzf console-0.3.0_SNAPSHOT-596-linux-amd64.tar.gz
console-linux-amd64
console.yml
修改配置 使用 http://10.0.1.2:9200
作为 Console 的系统集群,保留监控指标和元数据信息,修改配置如下:
[root@iZbp1gxkifg8uetb33pvcpZ console]# cat console.yml
elasticsearch:
- name: default
enabled: true
monitored: false
endpoint: http://10.0.1.2:9200
basic_auth:
username: elastic
password: xxxxx
discovery:
enabled: false
...
启动服务 [root@iZbp1gxkifg8uetb33pvcpZ console]# ./console-linux-amd64 -service install
Success
[root@iZbp1gxkifg8uetb33pvcpZ console]# ./console-linux-amd64 -service start
Success
访问后台 访问该主机的 9000 端口,即可打开 Console 后台,http://10.0.x.x:9000/#/cluster/overview
打开菜单 [System ][Cluster ] ,注册当前需要管理的 Elasticsearch 集群和网关地址,用来快速管理,如下:
注册网关 打开 GATEWAY 的注册功能,设置为网关的 API 地址来进行管理,如下:
测试 Gateway
# 为了验证网关是否正常工作,我们通过 Console 来快速验证一下。
首先通过走网关的接口来创建一个索引,并写入一个文档,如下:
首先查看主集群的数据情况,如下:
继续查看备集群的数据情况,如下:
两边集群都返回相同的数据,说明网关配置都正常,验证结束。
安装 Loadgen
# 测试机器同样需要调优,参考网关的调优说明。
在测试机上面,下载安装 Loadgen,如下: [root@vm10-0-0-69 opt]# tar vxzf loadgen-1.4.0_SNAPSHOT-50-linux-amd64.tar.gz
下载一个 Nginx 日志样本,保存为 nginx.log
,如下:展开查看
...
[root@vm10-0-0-69 opt]# head nginx.log
175.10.75.216 - - [28/Jul/2020:21:20:26 +0800] "GET / HTTP/1.1" 200 8676 "-" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/84.0.4147.89 Safari/537.36"
175.10.75.216 - - [28/Jul/2020:21:20:26 +0800] "GET /vendor/bootstrap/css/bootstrap.css HTTP/1.1" 200 17235 "http://dl-console.elasticsearch.cn/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/84.0.4147.89 Safari/537.36"
175.10.75.216 - - [28/Jul/2020:21:20:26 +0800] "GET /vendor/daterangepicker/daterangepicker.css HTTP/1.1" 200 1700 "http://dl-console.elasticsearch.cn/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/84.0.4147.89 Safari/537.36"
175.10.75.216 - - [28/Jul/2020:21:20:26 +0800] "GET /vendor/fork-awesome/css/v5-compat.css HTTP/1.1" 200 2091 "http://dl-console.elasticsearch.cn/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/84.0.4147.89 Safari/537.36"
175.10.75.216 - - [28/Jul/2020:21:20:26 +0800] "GET /assets/font/raleway.css HTTP/1.1" 200 145 "http://dl-console.elasticsearch.cn/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/84.0.4147.89 Safari/537.36"
175.10.75.216 - - [28/Jul/2020:21:20:26 +0800] "GET /vendor/fork-awesome/css/fork-awesome.css HTTP/1.1" 200 8401 "http://dl-console.elasticsearch.cn/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/84.0.4147.89 Safari/537.36"
175.10.75.216 - - [28/Jul/2020:21:20:26 +0800] "GET /assets/css/overrides.css HTTP/1.1" 200 2524 "http://dl-console.elasticsearch.cn/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/84.0.4147.89 Safari/537.36"
175.10.75.216 - - [28/Jul/2020:21:20:26 +0800] "GET /assets/css/theme.css HTTP/1.1" 200 306 "http://dl-console.elasticsearch.cn/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/84.0.4147.89 Safari/537.36"
175.10.75.216 - - [28/Jul/2020:21:20:26 +0800] "GET /vendor/fancytree/css/ui.fancytree.css HTTP/1.1" 200 3456 "http://dl-console.elasticsearch.cn/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/84.0.4147.89 Safari/537.36"
175.10.75.216 - - [28/Jul/2020:21:20:26 +0800] "GET /syncthing/development/logbar.js HTTP/1.1" 200 486 "http://dl-console.elasticsearch.cn/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/84.0.4147.89 Safari/537.36"
修改 Loadgen 的配置文件 修改其中的变量,将 message 指向刚刚准备好的 nginx 日志,并修改 es 的地址和身份信息,Loadgen 将随机构造写入请求,具体配置如下:
启动 Loadgen 进行测试 指定相关运行时间参数 -d
和 并发参数 -c
,开启请求压缩,如下:
[root@vm10-0-0-117 opt]# ./loadgen-linux-amd64 -d 60000 -c 200 --compress
__ ___ _ ___ ___ __ __
/ / /___\/_\ / \/ _ \ /__\/\ \ \
/ / // ///_\\ / /\ / /_\//_\ / \/ /
/ /__/ \_// _ \/ /_// /_\\//__/ /\ /
\____|___/\_/ \_/___,'\____/\__/\_\ \/
[LOADGEN] A http load generator and testing suit.
[LOADGEN] 1.4.0_SNAPSHOT, 2022-06-01 09:58:17, 2023-12-31 10:10:10, b6a73e2434ac931d1d43bce78c0f7622a1d08b2e
[06-14 18:47:29] [INF] [app.go:174] initializing loadgen.
[06-14 18:47:29] [INF] [app.go:175] using config: /opt/loadgen.yml.
[06-14 18:47:29] [INF] [module.go:116] all modules are started
[06-14 18:47:30] [INF] [instance.go:72] workspace: /opt/data/loadgen/nodes/cajfdg0ke012ka748j30
[06-14 18:47:30] [INF] [app.go:283] loadgen is up and running now.
[06-14 18:47:30] [INF] [loader.go:320] warmup started
[06-14 18:47:30] [INF] [loader.go:329] [POST] http://10.0.128.58:8000/_bulk -{"took":115,"errors":false,"items":[{"create":{"_index":"test-11","_type":"_doc","_id":"cak6eggke0184a2dcc70","_version":1,"result":"created","_shards":{"total":1,"successful":1,"failed":0},"_seq_no":39707421,"_primary_term":1,"status":201}},{"create":{"_i
[06-14 18:47:30] [INF] [loader.go:330] status: 200,<nil>,{"took":115,"errors":false,"items":[{"create":{"_index":"test-11","_type":"_doc","_id":"cak6eggke0184a2dcc70","_version":1,"result":"created","_shards":{"total":1,"successful":1,"failed":0},"_seq_no":39707421,"_primary_term":1,"status":201}},{"create":{"_i
[06-14 18:47:30] [INF] [loader.go:338] warmup finished
在另外一台压测机执行同样的安装操作,不重复描述。
测试环境
# 主集群:http://10.0.1.2:9200 , 用户名: elastic 密码:*** ,9节点 , 硬件规格:12C64GB (31GB JVM) 备集群:http://10.0.1.15:9200 , 用户名: elastic 密码:*** ,9节点 , 硬件规格:12C64GB (31GB JVM) 网关服务器1(公网 IP:x.x.x.x,内网 IP:192.168.0.24) 硬件规格:40C 256GB 3.7T NVME SSD 压测服务器1(内网 IP: 10.0.0.117) 硬件规格:24C 48GB 压测服务器2(内网 IP: 10.0.0.69) 硬件规格:24C 48GB 测试说明
# 本次测试主要验证网关复制的可操作性,以及评估达到不同复制性能所需要的硬件规格,用于实际生产环境的部署配置参考。
场景描述
# 将已有的 ELK 日志集群写入操作同步复制到备份集群,业务不停写、不做代码改动。采用 Nginx 日志作为数据源。使用 Loadgen 来随机化生成测试样板,确保字段内容的随机组合,避免数据的过度集中。
数据描述
# 以 Loadgen 自动生成的 Nginx 数据为例来介绍,如何从主集群 7.10.0 迁移到备份集群进行说明,执行步骤依次说明。数据样例:
{
"_index": "test-10",
"_type": "_doc",
"_id": "cak5emoke01flcq9q760",
"_source": {
"batch_number": "2328917",
"id": "cak5emoke01flcq9r19g",
"ip": "192.168.0.1",
"message": "175.10.75.216 - webmaster [29/Jul/2020:17:01:26 +0800] \"GET /rest/system/status HTTP/1.1\" 200 1838 \"http://dl-console.elasticsearch.cn/\" \"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/84.0.4147.89 Safari/537.36\"",
"now_local": "2022-06-14 17:39:39.420724895 +0800 CST",
"now_unix": "1655199579",
"random_no": "13",
"routing_no": "cak5emoke01flcq9pvu0"
}
}
数据架构
# 通过将应用端流量走网关的方式,请求同步转发给备份 ES,网关记录所有的写入请求,并确保顺序在备份 ES 上重放请求,两侧集群的各种故障都妥善进行了处理,从而实现透明的集群双写,实现安全无缝的数据迁移。
因为是日志场景的集群,数据主要是 append 新增,且写入量比较大,考虑到操作的顺序性不是主要考虑的问题,可以考虑部署多台网关来负载写入流量,客户端如果支持配置多个 ip 地址,可以将多个网关的地址都配置进去。
测试方法
# 准备模板
# 创建一个默认的索引模板,用于优化写入性能:
PUT _template/test
{
"index_patterns": [
"test*"
],
"settings": {
"index.translog.durability": "async",
"refresh_interval": "-1",
"number_of_shards": 3,
"number_of_replicas": 0
},
"mappings": {
"dynamic_templates": [
{
"strings": {
"mapping": {
"ignore_above": 256,
"type": "keyword"
},
"match_mapping_type": "string"
}
}
]
}
}
开启压测
# 分别在压测机器上面执行压测工具:
[root@vm10-0-0-117 opt]# ./loadgen-linux-amd64 -d 60000 -c 200 --compress
观察吞吐
# 打开 Console 工具来查看两个集群的吞吐情况,打开监控菜单,点击顶部的下拉选项,可以快速切换不同集群,如下:
首先,查看主集群的吞吐情况,如下:
查看备集群的吞吐情况,如下:
可以看到集群的吞吐非常接近,我们暂停写入,然后查看索引的数量,如下:
可以看到两个集群的索引文档数量一致。
查看延迟
# 如果备集群的处理能力跟不上主集群,就会出现两边数据延迟的情况,我们可以查看当前网关的队列消费情况,打开网关的实例,如下:
点击 Queue 菜单,即可看到当前队列的列表和消费情况,如下:
右侧 Depth 为 0 即代表没有消息堆积。
限制CPU
# 为了测试不同 CPU资源下的网关性能,我们使用 taskset 来绑定进程的CPU, 如下:
测试过程
# 网关使用配置如下:
展开查看
...
allow_multi_instance: true
path.data: data
path.logs: log
entry:
- name: my_es_entry
enabled: true
router: my_router
max_concurrency: 10000
network:
binding: 0.0.0.0:8000
tls:
enabled: false
flow:
- name: set-auth-for-backup-flow
filter:
- set_basic_auth:
username: elastic
password: xxxx
- name: primary-read-flow
filter:
- if:
cluster_available: ["primary"]
then:
- elasticsearch:
elasticsearch: "primary"
refresh:
enabled: true
interval: 30s
else:
- elasticsearch:
elasticsearch: "backup"
refresh:
enabled: true
interval: 30s
- name: primary-write-flow #正常的主写流程
filter:
- if:
and:
- consumer_has_lag:
queue: "primary-failure"
group: primary
name: primary-failure
- cluster_available: ["primary"]
then: #集群可用但是集群有堆积的情况,不处理客户端请求,待服务恢复之后再提供服务
- set_response:
status: 503
content_type: "application/json"
body: '{"error":true,"message":"503 Service Unavailable"}'
- drop:
else: # 集群不可用或者集群可用且没有堆积的情况,都直接转发给集群先处理
- if: #集群如果已经变成不可用状态,则直接丢弃请求,让客户端选择处理,或者也可落地队列确保不丢数据
not:
cluster_available: ["primary"]
then: #如果集群不可用,则直接拒绝客户端请求
- set_response:
status: 503
content_type: "application/json"
body: '{"error":true,"message":"503 Service Unavailable"}'
- elasticsearch_health_check: #由请求触发的限速模式下的主动检查后端监控情况
elasticsearch: "primary"
- drop:
else:
#- bulk_request_mutate: #修复自动生成 id 的文档,主动生成 id
# fix_null_id: true
# fix_null_type: true
# default_type: _doc
# type_rename:
# "*": _doc #统一索引的 Type 类型,适合旧版本多 Type 迁移到新版本集群
# remove_pipeline: true
# generate_enhanced_id: true
- elasticsearch: #集群可用,直接处理请求
elasticsearch: "primary"
max_connection_per_node: 1000
max_retry_times: 0
refresh:
enabled: true
interval: 30s
- bulk_response_process: #bulk 出错不继续执行后续 flow,因为成功、失败、非法的请求都已经入队,可以直接退出
success_queue: "backup"
failure_queue: "primary-failure" #失败的请求,两边的集群都要做一次,避免脏数据
invalid_queue: "primary-invalid"
- flow: # 非 bulk 请求继续判断
flows:
- primary-response-check
- name: primary-response-check
filter:
- if: #不合法的请求
and:
- not:
in:
_ctx.response.status: [ 429 ] #400_500 之间但不包括 429
- range:
_ctx.response.status.gte: 400
_ctx.response.status.lt: 500
then:
- queue:
queue_name: "primary-invalid"
- drop:
- if: #正常的请求, 复制到备份集群
in:
_ctx.response.status: [200,201]
then: #仅正常处理的集群才转发给后端集群
- flow:
flows:
- set-auth-for-backup-flow
- queue:
queue_name: "backup"
- drop:
else: #集群可用的情况下但是失败了,可能存在脏写,将请求放入写入失败队列,后续可以选择两边集群都重做一次,最终确保一致性,写 translog,后续提供 UI 可以进行三方检查:主、备集群和本地日志
- queue:
queue_name: "primary-failure"
- name: primary-failure-primary-post-processing #主集群的故障处理,重试,处理通过 commit,处理失败重试,非法请求丢弃,失败的请求
filter:
- if:
not:
cluster_available: ["primary"]
then:
- elasticsearch_health_check:
elasticsearch: "primary"
- sleep:
sleep_in_million_seconds: 5000
- drop:
#- bulk_request_mutate: #修复自动生成 id 的文档,主动生成 id
# fix_null_id: true
# fix_null_type: true
# default_type: _doc
# type_rename:
# "*": _doc #统一索引的 Type 类型,适合旧版本多 Type 迁移到新版本集群
# remove_pipeline: true
# generate_enhanced_id: true
- elasticsearch:
elasticsearch: "primary"
max_connection_per_node: 1000
max_retry_times: 0
refresh:
enabled: true
interval: 30s
- if: #请求失败了,继续重试,不用复制给备份集群
in:
_ctx.response.status: [ 429,0,500,503 ]
then:
- drop:
- bulk_response_process: #bulk 成功执行了,非法消息都入队,其他消息都不用管,继续重试,只有全部成功或者失败的时候才 commit,请求丢给 backup,否则提前结束
invalid_queue: "primary-invalid" #保存用来查看消息日志
tag_on_all_invalid: [ "commit_message_allowed" ] #只有都是非法请求的情况下才 commit
tag_on_all_success: [ "commit_message_allowed" ] #只有都是成功的情况下才 commit
continue_on_all_error: true #bulk 请求整体响应不是200,继续交由后面的 filter 进行处理
- if: #其他的非 bulk 请求处理,先处理不合法的请求,主集群都失败了,副集群就不用考虑了
and:
- not:
in:
_ctx.response.status: [ 429 ] #400_500 之间但不包括 429
- range:
_ctx.response.status.gte: 400
_ctx.response.status.lt: 500
then:
- tag:
add: [ "commit_message_allowed" ]
- queue:
queue_name: "primary-invalid"
- drop:
- if:
in:
_ctx.response.status: [ 200,201 ]
then:
- tag:
add: [ "commit_message_allowed" ]
- name: primary-failure-backup-post-processing #备集群的故障处理,重试,处理通过 commit,处理失败重试,非法请求丢弃
filter:
- if:
not:
cluster_available: ["backup"]
then:
- elasticsearch_health_check:
elasticsearch: "backup"
- sleep:
sleep_in_million_seconds: 5000
- drop:
- flow:
flows:
- set-auth-for-backup-flow
#- bulk_request_mutate: #修复自动生成 id 的文档,主动生成 id
# fix_null_id: true
# fix_null_type: true
# default_type: _doc
# type_rename:
# "*": _doc #统一索引的 Type 类型,适合旧版本多 Type 迁移到新版本集群
# remove_pipeline: true
# generate_enhanced_id: true
- elasticsearch:
elasticsearch: "backup"
max_connection_per_node: 1000
max_retry_times: 0
refresh:
enabled: true
interval: 30s
- if: #请求失败了,继续重试
in:
_ctx.response.status: [ 429,0,500,503 ]
then:
- drop:
- bulk_response_process: #bulk 成功执行了,非法消息都入队,其他消息都不用管,继续重试,只有全部成功或者失败的时候才 commit,请求丢给 backup,否则提前结束
invalid_queue: "backup-invalid" #保存用来查看消息日志
tag_on_all_invalid: [ "commit_message_allowed" ] #只有都是非法请求的情况下才 commit
tag_on_all_success: [ "commit_message_allowed" ] #只有都是成功的情况下才 commit
continue_on_all_error: true #bulk 请求整体响应不是200,继续交由后面的 filter 进行处理
- if: #其他的非 bulk 请求处理,先处理不合法的请求,主集群都失败了,副集群就不用考虑了
and:
- not:
in:
_ctx.response.status: [ 429 ] #400_500 之间但不包括 429
- range:
_ctx.response.status.gte: 400
_ctx.response.status.lt: 500
then:
- tag:
add: [ "commit_message_allowed" ]
- queue:
queue_name: "backup-invalid"
- drop:
- if:
in:
_ctx.response.status: [ 200,201 ]
then:
- tag:
add: [ "commit_message_allowed" ]
- name: backup-flow-replicate-processing
filter:
- if:
not:
cluster_available: ["backup"]
then:
- elasticsearch_health_check:
elasticsearch: "backup"
- sleep:
sleep_in_million_seconds: 5000
- drop:
- retry_limiter:
queue_name: "backup-deadletter_requests"
max_retry_times: 60
tag_on_success: ["commit_message_allowed"]
- flow:
flows:
- set-auth-for-backup-flow
# - bulk_request_mutate: #修复自动生成 id 的文档,主动生成 id
# fix_null_id: true
# fix_null_type: true
# default_type: _doc
# type_rename:
# "*": _doc #统一索引的 Type 类型,适合旧版本多 Type 迁移到新版本集群
# # index_rename:
# # test: "test-backup"
# remove_pipeline: true
# generate_enhanced_id: true
# when:
# contains:
# _ctx.request.path: /_bulk
- elasticsearch:
elasticsearch: "backup"
max_connection_per_node: 1000
max_retry_times: 0
refresh:
enabled: true
interval: 30s
- bulk_response_process: # 如果部分请求出错,保存相关的消息到队列后,直接结束,不继续后续流程的处理
invalid_queue: "backup-invalid"
failure_queue: "backup-failure"
tag_on_all_success: ["commit_message_allowed"]
tag_on_all_invalid: ["commit_message_allowed"]
tag_on_any_error: ["commit_message_allowed"]
continue_on_all_error: true #bulk 请求整体响应不是200,继续交由后面的 filter 进行处理
when:
contains:
_ctx.request.path: /_bulk
- if:
in:
_ctx.response.status: [ 200,201,404,400 ]
then:
- tag:
add: ["commit_message_allowed"]
- name: backup-flow-reshuffle-replicate-processing
filter:
- if:
not:
cluster_available: ["backup"]
then:
- elasticsearch_health_check:
elasticsearch: "backup"
- sleep:
sleep_in_million_seconds: 5000
- drop:
# - bulk_request_mutate: #修复自动生成 id 的文档,主动生成 id
# fix_null_id: true
# fix_null_type: true
# default_type: _doc
# type_rename:
# "*": "_doc" #统一索引的 Type 类型,适合旧版本多 Type 迁移到新版本集群
# # index_rename:
# # test: "test-backup"
# remove_pipeline: true # pipeline should be enabled manually, please ensure each cluster has the same setup
# generate_enhanced_id: true
# when:
# contains:
# _ctx.request.path: /_bulk
# - record:
# stdout: true
- flow:
flows:
- set-auth-for-backup-flow
- bulk_reshuffle:
when:
contains:
_ctx.request.path: /_bulk
elasticsearch: "backup"
queue_name_prefix: "async_bulk"
skip_info_missing: true
level: node #cluster,node,index,shard
#partition_size: 3
fix_null_id: true
tag_on_success: ["commit_message_allowed"]
- elasticsearch:
elasticsearch: "backup"
max_retry_times: 0
max_connection_per_node: 1000
refresh:
enabled: true
interval: 30s
- bulk_response_process: # 如果部分请求出错,保存相关的消息到队列后,直接结束,不继续后续流程的处理
invalid_queue: "backup-invalid"
tag_on_all_invalid: ["commit_message_allowed"]
tag_on_all_success: ["commit_message_allowed"]
continue_on_all_error: true #bulk 请求整体响应不是200,继续交由后面的 filter 进行处理
when:
contains:
_ctx.request.path: /_bulk
- if: #不合法的请求
and:
- not:
in:
_ctx.response.status: [ 429 ] #400_500 之间但不包括 429
- range:
_ctx.response.status.gte: 400
_ctx.response.status.lt: 500
then:
- queue:
queue_name: "backup-invalid"
- tag:
add: [ "commit_message_allowed" ] # 非法请求不处理了,commit 继续往后处理
- drop:
- if:
in:
_ctx.response.status: [ 200,201 ]
then:
- tag:
add: ["commit_message_allowed"]
- name: request_logging # this flow is used for request logging, refer to `router`'s `tracing_flow`
filter:
- context_filter:
context: _ctx.request.path
exclude:
- /favicon.ico
- logging:
queue_name: request_logging
max_request_body_size: 102400
max_response_body_size: 102400
when:
or:
- equals:
_ctx.response.header.X-BulkRequest-Failed: "true"
- not:
in:
_ctx.response.status: [ 200,201,404 ]
- name: deny_flow # this flow is used for request logging, refer to `router`'s `tracing_flow`
filter:
- set_response:
body: "request not allowed"
status: 500
#_delete_by_query not supported
router:
- name: my_router
default_flow: primary-write-flow
# tracing_flow: request_logging
rules:
- method:
- "GET"
- "HEAD"
pattern:
- "/{any:*}"
flow:
- primary-read-flow
- method:
- "*"
pattern:
- "/_cat"
- "/_sql"
- "/_cluster"
- "/_refresh"
- "/_count"
- "/_search"
- "/_msearch"
- "/_mget"
- "/{any_index}/_eql/search"
- "/{any_index}/_count"
- "/{any_index}/_search"
- "/{any_index}/_msearch"
- "/{any_index}/_mget"
flow:
- primary-read-flow
- method:
- "*"
pattern:
- "/_reindex"
- "/_delete_by_query"
- "/_update_by_query"
- "/{any_index}/_reindex"
- "/{any_index}/_delete_by_query"
- "/{any_index}/_update_by_query"
flow:
- deny_flow
- method:
- "DELETE"
pattern:
- "/{any_index}"
- "/{any_index}/{any_type}"
flow:
- deny_flow
elasticsearch:
- name: backup
enabled: true
endpoints:
- http://10.0.1.15:9200
basic_auth:
username: elastic
password: xxx
discovery:
enabled: true
refresh:
enabled: true
interval: 60s
- name: primary
enabled: true
endpoints:
- http://10.0.1.2:9200
basic_auth:
username: elastic
password: xxx
discovery:
enabled: true
refresh:
enabled: true
interval: 60s
pipeline:
# pipelines for logging
- name: consume-request_logging_index-to-backup
auto_start: true
keep_running: true
processor:
- json_indexing:
index_name: "gateway_requests"
elasticsearch: "backup"
input_queue: "request_logging"
when:
cluster_available: [ "backup" ]
# pipelines for primary cluster
- name: consume-queue_primary-dead_retry-to-primary
auto_start: false
keep_running: false
processor:
- flow_runner:
input_queue: "primary-deadletter_requests"
flow: primary-failure-primary-post-processing
commit_on_tag: "commit_message_allowed"
when:
cluster_available: [ "primary" ]
- name: consume-queue_primary-failure-to-primary
auto_start: true
keep_running: true
processor:
- flow_runner:
input_queue: "primary-failure"
flow: primary-failure-primary-post-processing
commit_on_tag: "commit_message_allowed"
consumer:
group: primary
name: primary-failure
when:
cluster_available: [ "primary" ]
- name: consume-queue_primary-failure-to-backup
auto_start: true
keep_running: true
processor:
- flow_runner:
input_queue: "primary-failure"
flow: primary-failure-backup-post-processing
commit_on_tag: "commit_message_allowed"
consumer:
group: backup
name: primary-failure
when:
cluster_available: [ "backup" ]
# option B - pipelines for backup cluster, reshuffle bulk requests
- name: consume-queue_backup-to-backup
auto_start: true
keep_running: true
processor:
- disorder_flow_runner:
input_queue: "backup"
worker_size: 12
flow: backup-flow-reshuffle-replicate-processing
- name: consume-queue_backup-bulk_request_ingestion-to-backup
auto_start: true
keep_running: true
processor:
#- bulk_indexing:
- disorder_bulk_indexing:
worker_size: 6
max_worker_size: 1000
bulk:
compress: true
batch_size_in_mb: 20
batch_size_in_docs: 10000
consumer:
fetch_max_messages: 1000
waiting_after:
- "backup-failure"
queues:
type: bulk_reshuffle
when:
cluster_available: [ "backup" ]
- name: consume-queue_backup-failure-to-backup
auto_start: true
keep_running: true
processor:
- flow_runner:
input_queue: "backup-failure"
flow: backup-flow-replicate-processing
commit_on_tag: "commit_message_allowed"
when:
cluster_available: [ "backup" ]
disk_queue:
sync_every_records: 10000
sync_timeout_in_ms: 10000
retention:
max_num_of_local_files: 3
compress_on_message_payload:
enabled: true
两台 Loagen 的默认压测参数:
[root@vm10-0-0-117 opt]# ./loadgen-linux-amd64 -d 60000 -c 200 --compress
网关 16C
# 给网关分配 16C 的 CPU 资源,全部跑满,内存开销 ~8GB,稳定处理 750k eps。
[root@10-0-128-58 gateway]# taskset -a -pc 1-6 14054
主集群写入~750k eps:
备集群实时追平:
队列没有延迟:
网关的其他监控:
网关 8C
# 将网关的 CPU 减半,观察吞吐的变化,网关可以稳定处理 600k eps,内存~8GB,如下:
网关 4C
# CPU 调整为 4C,使用两个压测端:
复制出现较大延迟,关闭一个压测程序,降低写入,如下:
降低写入压力,我们需要测出 4C 下没有延迟情况下的稳定处理能力。
经过多轮测试,4C 情况下,压测端使用参数 -r 35
限流,网关可以稳定处理 320k eps,内存~8GB。
[root@vm10-0-0-69 opt]# ./loadgen-linux-amd64 -c 500 -d 66000 -compress -r 35
网关 6C
# 继续一个压测,提升为 6C
使用 -r 40
限流 40w,网关可以稳定在 40w,继续尝试 42w、45w,网关可以稳定处理 ~430k eps。
网关 16C
# 恢复 16C
备集群的堆积开始追。
网关 32C
#
网关升级配置已无性能提升,瓶颈都在后端 Elasticsearch,后端需要扩容。
最后再比对一下文档数。
两边数目完全一致。
网关 8C
# 处理能力稳定在 ~600k eps。
[root@vm10-0-0-69 opt]# ./loadgen-linux-amd64 -c 500 -d 66000 -compress -r 64
[root@vm10-0-0-117 opt]# ./loadgen-linux-amd64 -c 500 -d 66000 -compress -r 60
网关 2C
# 网关稳定处理 160k eps。
网关 1C
# ~79-81k eps.
测试结果
# 网关 CPU 核数 复制能力 (events per seond) 内存 备注 1C ~80k ~8GB 2C ~160k ~8GB 4C ~320k ~9GB 8C ~600k ~8GB 16C ~750k ~8GB 后端 ES 处理能力已接近饱和 32C ~750k ~8GB 后端 ES 处理能力已接近饱和
小结
# 极限网关功能简单,性能强悍,使用简单,通过使用极限网关,自建 Elasticsearch 集群可以安全无缝的复制到异地备份集群,在复制的过程中,两套集群通过网关进行了解耦,两套集群的版本也可以不一样,在迁移的过程中还能实现 Elasticsearch 版本的无缝升级。生产环境的硬件建议配备 2*16C32GB NVME RAID10 4TB 的规格,并结合实际的业务数据进行压测。