极限网关案例分享(2):使用极限网关来加速索引写入速度
2024-12-28
背景 #
某些业务数据写入速度慢,需提高写入速度。
方案 #
设计方案如下:
- 将数据写入改造成 bulk 请求形式
- 前置网关将指定业务的 bulk 请求单独拆分出来,转发到后置网关
- 后置网关对 bulk 请求进行异步消费处理
前置网关拆分指定请求 #
网关配置如下:
path.data: data-front
path.logs: log-front
entry:
- name: my_es_entry
enabled: true
router: my_router
max_concurrency: 50000
network:
binding: :9243
tls:
enabled: true
cert_file: /etc/ssl.crt
key_file: /etc/ssl.key
skip_insecure_verify: false
flow:
- name: test-async-indexing
filter:
- elasticsearch:
elasticsearch: test-es
- flow:
when:
equals:
_ctx.response.status: 0
flows:
- default-flow
- drop:
- name: bulk_flow
filter:
- flow:
when:
and:
- contains:
_ctx.request.path: /_bulk
- prefix:
_ctx.request.host: 165ed1e903914b929746c642d92b5a21
flows:
- test-async-indexing
- flow:
flows:
- default-flow
- name: default-flow
filter:
- http:
schema: "https" #https or http
max_idle_conn_duration: "900s"
skip_failure_host: false
hosts:
- "192.168.200.209:9243"
- "192.168.200.210:9243"
- "192.168.200.211:9243"
- name: drop_flow
filter:
- drop:
- name: test-request_indexing
filter:
- ratio:
ratio: 0.5
flow: drop_flow
continue: false
- logging:
queue_name: test-request_logging
max_request_body_size: 4096
max_response_body_size: 4096
- drop:
- name: request_logging
filter:
- flow:
when:
prefix:
_ctx.request.host: 165ed1e903914b929746c642d92b5a21
flows:
- test-request_indexing
- logging:
queue_name: request_logging
max_request_body_size: 4096
max_response_body_size: 4096
when: #>1s or none-200 requests will be logged
or:
- not:
or:
- equals:
_ctx.request.path: "/favicon.ico"
- in:
_ctx.response.status: [404, 200, 201]
- in:
_ctx.request.path: ["/sw.js"]
- range:
_ctx.elapsed.gte: 1000
router:
- name: my_router
default_flow: default-flow
tracing_flow: request_logging
rules:
- method:
- "*"
pattern:
- "/_bulk"
- "/{any_index}/_bulk"
flow:
- bulk_flow
elasticsearch:
- name: logging-server
enabled: true
endpoints:
- https://d6794e84d46e4b7db21d364de10620c4.192.168.200.209.ip.es.io:9243
basic_auth:
username: elastic
password: 1qaz!QAZ
discovery:
enabled: false
- name: test-logging-server
enabled: true
endpoints:
- https://ccb82916fc594ccf94a241bba9a8d1b6.192.168.200.209.ip.es.io:9243
basic_auth:
username: elastic
password: 1qaz!QAZ
discovery:
enabled: false
- name: test-es
enabled: true
endpoints:
- https://165ed1e903914b929746c642d92b5a21.192.168.3.185.async.vip:8000
basic_auth:
username: elastic
password: 1qaz!QAZ
discovery:
enabled: false
pipeline:
- name: metrics_ingest
auto_start: true
keep_running: true
retry_delay_in_ms: 1000
processor:
- json_indexing:
index_name: ".infini_metrics"
elasticsearch: "logging-server"
input_queue: "metrics"
idle_timeout_in_seconds: 1
worker_size: 1
bulk_size_in_mb: 10 #in MB
when:
cluster_available: ["logging-server"]
- name: indexing_merge
auto_start: true
keep_running: true
processor:
- indexing_merge:
input_queue: "request_logging"
elasticsearch: "logging-server"
index_name: ".infini_requests_logging"
output_queue:
name: "gateway_requests"
label:
tag: "request_logging"
worker_size: 1
bulk_size_in_mb: 10
- name: logging_requests
auto_start: true
keep_running: true
processor:
- bulk_indexing:
bulk:
compress: true
batch_size_in_mb: 10
batch_size_in_docs: 5000
bulk_result_message_queue: ""
response_handle:
save_success_results: false
output_bulk_stats: false
queue_selector:
labels:
tag: request_logging
consumer:
fetch_max_messages: 100
queues:
type: indexing_merge
when:
cluster_available: ["logging-server"]
- name: test-logging_requests
auto_start: true
keep_running: true
processor:
- bulk_indexing:
bulk:
compress: true
batch_size_in_mb: 10
batch_size_in_docs: 5000
bulk_result_message_queue: ""
response_handle:
save_success_results: false
output_bulk_stats: false
queue_selector:
labels:
tag: test-request_logging
consumer:
fetch_max_messages: 100
queues:
type: indexing_merge
when:
cluster_available: ["test-logging-server"]
- name: test_indexing_merge
auto_start: true
keep_running: true
processor:
- indexing_merge:
input_queue: "test-request_logging"
elasticsearch: "test-logging-server"
index_name: ".infini_requests_logging"
output_queue:
name: "test-gateway_requests"
label:
tag: "test-request_logging"
worker_size: 1
bulk_size_in_mb: 10
metrics:
enabled: true
queue: metrics
instance:
enabled: true
network:
enabled: true
summary: true
badger:
value_log_max_entries: 1000000
value_log_file_size: 104857600
配置说明:
- name: test-async-indexing
filter:
- elasticsearch:
elasticsearch: test-es
- flow:
when:
equals:
_ctx.response.status: 0
flows:
- default-flow
- drop:
- name: bulk_flow
filter:
- flow:
when:
and:
- contains:
_ctx.request.path: /_bulk
- prefix:
_ctx.request.host: 165ed1e903914b929746c642d92b5a21
flows:
- test-async-indexing
- flow:
flows:
- default-flow
- 根据请求信息中的 request.host 将指定请求过滤到独立的处理流程中。
- name: test-request_indexing
filter:
- ratio:
ratio: 0.5
flow: drop_flow
continue: false
- logging:
queue_name: test-request_logging
max_request_body_size: 4096
max_response_body_size: 4096
- drop:
- name: request_logging
filter:
- flow:
when:
prefix:
_ctx.request.host: 165ed1e903914b929746c642d92b5a21
flows:
- test-request_indexing
- logging:
queue_name: request_logging
max_request_body_size: 4096
max_response_body_size: 4096
when: #>1s or none-200 requests will be logged
or:
- not:
or:
- equals:
_ctx.request.path: "/favicon.ico"
- in:
_ctx.response.status: [404, 200, 201]
- in:
_ctx.request.path: ["/sw.js"]
- range:
_ctx.elapsed.gte: 1000
- 将请求记录也按照指定的 request.host 过滤到单独的处理流程中。
后置网关异步消费请求数据 #
网关配置如下:
path.data: data-async
path.logs: log-async
pipeline:
- name: metrics_merge
auto_start: true
keep_running: true
processor:
- indexing_merge:
input_queue: "metrics"
elasticsearch: "logging-server"
index_name: ".infini_metrics"
output_queue:
name: "bulk_requests"
label:
tag: "metrics"
worker_size: 1
bulk_size_in_mb: 10
- name: request_logging_merge
auto_start: true
keep_running: true
processor:
- indexing_merge:
input_queue: "request_logging"
elasticsearch: "logging-server"
index_name: ".infini_requests_logging"
output_queue:
name: "bulk_requests"
label:
tag: "request_logging"
worker_size: 1
bulk_size_in_mb: 10
- name: failure_message_merge
auto_start: true
keep_running: true
processor:
- indexing_merge:
input_queue: "bulk_result_messages"
elasticsearch: "logging-server"
index_name: ".infini_async_bulk_results"
output_queue:
name: "bulk_requests"
label:
tag: "bulk_logging"
worker_size: 1
bulk_size_in_mb: 10
- name: ingest_merged_requests
auto_start: true
keep_running: true
processor:
- bulk_indexing:
bulk:
compress: true
batch_size_in_mb: 10
batch_size_in_docs: 5000
retry_rules:
retry_429: true
default: true
retry_4xx: false
consumer:
fetch_max_messages: 100
queues:
type: indexing_merge
when:
cluster_available: ["logging-server"]
- name: bulk_request_ingest
auto_start: true
keep_running: true
retry_delay_in_ms: 1000
processor:
- bulk_indexing:
max_connection_per_node: 1000
num_of_slices: 2
max_worker_size: 200
idle_timeout_in_seconds: 3
bulk:
compress: false
batch_size_in_mb: 20
batch_size_in_docs: 5000
dead_letter_queue: "bulk_dead_requests"
bulk_result_message_queue: "bulk_result_messages"
max_request_body_size: 10240
max_response_body_size: 10240
save_success_results: true
retry_rules:
denied:
keyword:
- illegal_state_exception
consumer:
fetch_max_messages: 1000
eof_retry_delay_in_ms: 500
queue_selector:
labels:
type: bulk_reshuffle
entry:
- name: my_es_entry
enabled: true
router: my_router
max_concurrency: 200000
network:
binding: 0.0.0.0:8000
tls:
enabled: true
cert_file: /etc/ssl.crt
key_file: /etc/ssl.key
skip_insecure_verify: false
flow:
- name: async_bulk
filter:
- bulk_reshuffle:
when:
contains:
_ctx.request.path: /_bulk
elasticsearch: prod
level: node
partition_size: 1
#shards: [1,3,5,7,9,11,13]
continue_metadata_missing: true
fix_null_id: true
- http:
schema: "https" #https or http
hosts:
- "165ed1e903914b929746c642d92b5a21.192.168.200.209.ip.es.a:9243"
- "165ed1e903914b929746c642d92b5a21.192.168.200.209.ip.es.b:9243"
- "165ed1e903914b929746c642d92b5a21.192.168.200.209.ip.es.c:9243"
router:
- name: my_router
default_flow: async_bulk
elasticsearch:
- name: prod
enabled: true
endpoints:
- https://165ed1e903914b929746c642d92b5a21.192.168.200.209.ip.es.a:9243
- https://165ed1e903914b929746c642d92b5a21.192.168.200.209.ip.es.b:9243
- https://165ed1e903914b929746c642d92b5a21.192.168.200.209.ip.es.c:9243
discovery:
enabled: false
basic_auth:
username: elastic
password: 1qaz!QAZ
- name: logging-server
enabled: true
endpoints:
- https://ccb82916fc594ccf94a241bba9a8d1b6.192.168.200.209.ip.es.io:9243
basic_auth:
username: elastic
password: 1qaz!QAZ
discovery:
enabled: false
request_timeout: 60
elastic:
enabled: true
remote_configs: false
health_check:
enabled: true
interval: 30s
availability_check:
enabled: true
interval: 60s
metadata_refresh:
enabled: true
interval: 30s
cluster_settings_check:
enabled: false
interval: 20s
disk_queue:
prepare_files_to_read: true
#max_bytes_per_file: 20971520
eof_retry_delay_in_ms: 500
compress:
delete_after_compress: true
idle_threshold: 20
metrics:
enabled: true
queue: metrics
instance:
enabled: true
network:
enabled: true
summary: true
badger:
value_log_max_entries: 1000000
value_log_file_size: 104857600
配置说明:
- name: async_bulk
filter:
- bulk_reshuffle:
when:
contains:
_ctx.request.path: /_bulk
elasticsearch: prod
level: node
partition_size: 1
#shards: [1,3,5,7,9,11,13]
continue_metadata_missing: true
fix_null_id: true
- http:
schema: "https" #https or http
hosts:
- "165ed1e903914b929746c642d92b5a21.192.168.200.209.ip.es.a:9243"
- "165ed1e903914b929746c642d92b5a21.192.168.200.209.ip.es.b:9243"
- "165ed1e903914b929746c642d92b5a21.192.168.200.209.ip.es.c:9243"
- 请求首先经过 bulk_reshuffle 过滤器,如果提交 buffer 成功,请求返回 200,数据在队列中等待消费;如果提交 buffer 失败,请求将进入 http 过滤器,直接发往 ECE Proxy。
经测试验证,改造后写入性能提升 20%。
更多极限网关配置信息请参考 官网。
关于极限网关(INFINI Gateway) #
INFINI Gateway 是一个面向搜索场景的高性能数据网关,所有请求都经过网关处理后再转发到后端的搜索业务集群。基于 INFINI Gateway,可以实现索引级别的限速限流、常见查询的缓存加速、查询请求的审计、查询结果的动态修改等等。
Gateway 现已开源(https://github.com/infinilabs/gateway),如有相关问题或建议,欢迎提交 PR 或 Issue,一起参与开源共建!