--- title: "极限网关案例分享(2):使用极限网关来加速索引写入速度" date: 2024-12-28 lastmod: 2024-12-28 description: "通过改造业务数据写入为 bulk 请求形式,前置网关拆分指定业务请求转发至后置网关,后置网关异步消费处理,测试验证写入性能提升20%。" tags: ["Gateway", "ES Bulk"] summary: "背景 # 某些业务数据写入速度慢,需提高写入速度。 方案 # 设计方案如下: 将数据写入改造成 bulk 请求形式 前置网关将指定业务的 bulk 请求单独拆分出来,转发到后置网关 后置网关对 bulk 请求进行异步消费处理 前置网关拆分指定请求 # 网关配置如下: path.data: data-front path.logs: log-front entry: - name: my_es_entry enabled: true router: my_router max_concurrency: 50000 network: binding: :9243 tls: enabled: true cert_file: /etc/ssl.crt key_file: /etc/ssl.key skip_insecure_verify: false flow: - name: test-async-indexing filter: - elasticsearch: elasticsearch: test-es - flow: when: equals: _ctx.response.status: 0 flows: - default-flow - drop: - name: bulk_flow filter: - flow: when: and: - contains: _ctx." --- ## 背景 某些业务数据写入速度慢,需提高写入速度。 ## 方案 设计方案如下: - 将数据写入改造成 bulk 请求形式 - 前置网关将指定业务的 bulk 请求单独拆分出来,转发到后置网关 - 后置网关对 bulk 请求进行异步消费处理 ## 前置网关拆分指定请求 网关配置如下: ```yaml path.data: data-front path.logs: log-front entry: - name: my_es_entry enabled: true router: my_router max_concurrency: 50000 network: binding: :9243 tls: enabled: true cert_file: /etc/ssl.crt key_file: /etc/ssl.key skip_insecure_verify: false flow: - name: test-async-indexing filter: - elasticsearch: elasticsearch: test-es - flow: when: equals: _ctx.response.status: 0 flows: - default-flow - drop: - name: bulk_flow filter: - flow: when: and: - contains: _ctx.request.path: /_bulk - prefix: _ctx.request.host: 165ed1e903914b929746c642d92b5a21 flows: - test-async-indexing - flow: flows: - default-flow - name: default-flow filter: - http: schema: "https" #https or http max_idle_conn_duration: "900s" skip_failure_host: false hosts: - "192.168.200.209:9243" - "192.168.200.210:9243" - "192.168.200.211:9243" - name: drop_flow filter: - drop: - name: test-request_indexing filter: - ratio: ratio: 0.5 flow: drop_flow continue: false - logging: queue_name: test-request_logging max_request_body_size: 4096 max_response_body_size: 4096 - drop: - name: request_logging filter: - flow: when: prefix: _ctx.request.host: 165ed1e903914b929746c642d92b5a21 flows: - test-request_indexing - logging: queue_name: request_logging max_request_body_size: 4096 max_response_body_size: 4096 when: #>1s or none-200 requests will be logged or: - not: or: - equals: _ctx.request.path: "/favicon.ico" - in: _ctx.response.status: [404, 200, 201] - in: _ctx.request.path: ["/sw.js"] - range: _ctx.elapsed.gte: 1000 router: - name: my_router default_flow: default-flow tracing_flow: request_logging rules: - method: - "*" pattern: - "/_bulk" - "/{any_index}/_bulk" flow: - bulk_flow elasticsearch: - name: logging-server enabled: true endpoints: - https://d6794e84d46e4b7db21d364de10620c4.192.168.200.209.ip.es.io:9243 basic_auth: username: elastic password: 1qaz!QAZ discovery: enabled: false - name: test-logging-server enabled: true endpoints: - https://ccb82916fc594ccf94a241bba9a8d1b6.192.168.200.209.ip.es.io:9243 basic_auth: username: elastic password: 1qaz!QAZ discovery: enabled: false - name: test-es enabled: true endpoints: - https://165ed1e903914b929746c642d92b5a21.192.168.3.185.async.vip:8000 basic_auth: username: elastic password: 1qaz!QAZ discovery: enabled: false pipeline: - name: metrics_ingest auto_start: true keep_running: true retry_delay_in_ms: 1000 processor: - json_indexing: index_name: ".infini_metrics" elasticsearch: "logging-server" input_queue: "metrics" idle_timeout_in_seconds: 1 worker_size: 1 bulk_size_in_mb: 10 #in MB when: cluster_available: ["logging-server"] - name: indexing_merge auto_start: true keep_running: true processor: - indexing_merge: input_queue: "request_logging" elasticsearch: "logging-server" index_name: ".infini_requests_logging" output_queue: name: "gateway_requests" label: tag: "request_logging" worker_size: 1 bulk_size_in_mb: 10 - name: logging_requests auto_start: true keep_running: true processor: - bulk_indexing: bulk: compress: true batch_size_in_mb: 10 batch_size_in_docs: 5000 bulk_result_message_queue: "" response_handle: save_success_results: false output_bulk_stats: false queue_selector: labels: tag: request_logging consumer: fetch_max_messages: 100 queues: type: indexing_merge when: cluster_available: ["logging-server"] - name: test-logging_requests auto_start: true keep_running: true processor: - bulk_indexing: bulk: compress: true batch_size_in_mb: 10 batch_size_in_docs: 5000 bulk_result_message_queue: "" response_handle: save_success_results: false output_bulk_stats: false queue_selector: labels: tag: test-request_logging consumer: fetch_max_messages: 100 queues: type: indexing_merge when: cluster_available: ["test-logging-server"] - name: test_indexing_merge auto_start: true keep_running: true processor: - indexing_merge: input_queue: "test-request_logging" elasticsearch: "test-logging-server" index_name: ".infini_requests_logging" output_queue: name: "test-gateway_requests" label: tag: "test-request_logging" worker_size: 1 bulk_size_in_mb: 10 metrics: enabled: true queue: metrics instance: enabled: true network: enabled: true summary: true badger: value_log_max_entries: 1000000 value_log_file_size: 104857600 ``` 配置说明: ```yaml - name: test-async-indexing filter: - elasticsearch: elasticsearch: test-es - flow: when: equals: _ctx.response.status: 0 flows: - default-flow - drop: - name: bulk_flow filter: - flow: when: and: - contains: _ctx.request.path: /_bulk - prefix: _ctx.request.host: 165ed1e903914b929746c642d92b5a21 flows: - test-async-indexing - flow: flows: - default-flow ``` - 根据请求信息中的 request.host 将指定请求过滤到独立的处理流程中。 ```yaml - name: test-request_indexing filter: - ratio: ratio: 0.5 flow: drop_flow continue: false - logging: queue_name: test-request_logging max_request_body_size: 4096 max_response_body_size: 4096 - drop: - name: request_logging filter: - flow: when: prefix: _ctx.request.host: 165ed1e903914b929746c642d92b5a21 flows: - test-request_indexing - logging: queue_name: request_logging max_request_body_size: 4096 max_response_body_size: 4096 when: #>1s or none-200 requests will be logged or: - not: or: - equals: _ctx.request.path: "/favicon.ico" - in: _ctx.response.status: [404, 200, 201] - in: _ctx.request.path: ["/sw.js"] - range: _ctx.elapsed.gte: 1000 ``` - 将请求记录也按照指定的 request.host 过滤到单独的处理流程中。 ## 后置网关异步消费请求数据 网关配置如下: ```yaml path.data: data-async path.logs: log-async pipeline: - name: metrics_merge auto_start: true keep_running: true processor: - indexing_merge: input_queue: "metrics" elasticsearch: "logging-server" index_name: ".infini_metrics" output_queue: name: "bulk_requests" label: tag: "metrics" worker_size: 1 bulk_size_in_mb: 10 - name: request_logging_merge auto_start: true keep_running: true processor: - indexing_merge: input_queue: "request_logging" elasticsearch: "logging-server" index_name: ".infini_requests_logging" output_queue: name: "bulk_requests" label: tag: "request_logging" worker_size: 1 bulk_size_in_mb: 10 - name: failure_message_merge auto_start: true keep_running: true processor: - indexing_merge: input_queue: "bulk_result_messages" elasticsearch: "logging-server" index_name: ".infini_async_bulk_results" output_queue: name: "bulk_requests" label: tag: "bulk_logging" worker_size: 1 bulk_size_in_mb: 10 - name: ingest_merged_requests auto_start: true keep_running: true processor: - bulk_indexing: bulk: compress: true batch_size_in_mb: 10 batch_size_in_docs: 5000 retry_rules: retry_429: true default: true retry_4xx: false consumer: fetch_max_messages: 100 queues: type: indexing_merge when: cluster_available: ["logging-server"] - name: bulk_request_ingest auto_start: true keep_running: true retry_delay_in_ms: 1000 processor: - bulk_indexing: max_connection_per_node: 1000 num_of_slices: 2 max_worker_size: 200 idle_timeout_in_seconds: 3 bulk: compress: false batch_size_in_mb: 20 batch_size_in_docs: 5000 dead_letter_queue: "bulk_dead_requests" bulk_result_message_queue: "bulk_result_messages" max_request_body_size: 10240 max_response_body_size: 10240 save_success_results: true retry_rules: denied: keyword: - illegal_state_exception consumer: fetch_max_messages: 1000 eof_retry_delay_in_ms: 500 queue_selector: labels: type: bulk_reshuffle entry: - name: my_es_entry enabled: true router: my_router max_concurrency: 200000 network: binding: 0.0.0.0:8000 tls: enabled: true cert_file: /etc/ssl.crt key_file: /etc/ssl.key skip_insecure_verify: false flow: - name: async_bulk filter: - bulk_reshuffle: when: contains: _ctx.request.path: /_bulk elasticsearch: prod level: node partition_size: 1 #shards: [1,3,5,7,9,11,13] continue_metadata_missing: true fix_null_id: true - http: schema: "https" #https or http hosts: - "165ed1e903914b929746c642d92b5a21.192.168.200.209.ip.es.a:9243" - "165ed1e903914b929746c642d92b5a21.192.168.200.209.ip.es.b:9243" - "165ed1e903914b929746c642d92b5a21.192.168.200.209.ip.es.c:9243" router: - name: my_router default_flow: async_bulk elasticsearch: - name: prod enabled: true endpoints: - https://165ed1e903914b929746c642d92b5a21.192.168.200.209.ip.es.a:9243 - https://165ed1e903914b929746c642d92b5a21.192.168.200.209.ip.es.b:9243 - https://165ed1e903914b929746c642d92b5a21.192.168.200.209.ip.es.c:9243 discovery: enabled: false basic_auth: username: elastic password: 1qaz!QAZ - name: logging-server enabled: true endpoints: - https://ccb82916fc594ccf94a241bba9a8d1b6.192.168.200.209.ip.es.io:9243 basic_auth: username: elastic password: 1qaz!QAZ discovery: enabled: false request_timeout: 60 elastic: enabled: true remote_configs: false health_check: enabled: true interval: 30s availability_check: enabled: true interval: 60s metadata_refresh: enabled: true interval: 30s cluster_settings_check: enabled: false interval: 20s disk_queue: prepare_files_to_read: true #max_bytes_per_file: 20971520 eof_retry_delay_in_ms: 500 compress: delete_after_compress: true idle_threshold: 20 metrics: enabled: true queue: metrics instance: enabled: true network: enabled: true summary: true badger: value_log_max_entries: 1000000 value_log_file_size: 104857600 ``` 配置说明: ```yaml - name: async_bulk filter: - bulk_reshuffle: when: contains: _ctx.request.path: /_bulk elasticsearch: prod level: node partition_size: 1 #shards: [1,3,5,7,9,11,13] continue_metadata_missing: true fix_null_id: true - http: schema: "https" #https or http hosts: - "165ed1e903914b929746c642d92b5a21.192.168.200.209.ip.es.a:9243" - "165ed1e903914b929746c642d92b5a21.192.168.200.209.ip.es.b:9243" - "165ed1e903914b929746c642d92b5a21.192.168.200.209.ip.es.c:9243" ``` - 请求首先经过 bulk_reshuffle 过滤器,如果提交 buffer 成功,请求返回 200,数据在队列中等待消费;如果提交 buffer 失败,请求将进入 http 过滤器,直接发往 ECE Proxy。 经测试验证,改造后写入性能提升 20%。 更多极限网关配置信息请参考[官网](https://docs.infinilabs.com/gateway/main/zh//)。 ## 关于极限网关(INFINI Gateway) ![](/img/blog/banner/gateway_banner@2x.png) INFINI Gateway 是一个面向搜索场景的高性能数据网关,所有请求都经过网关处理后再转发到后端的搜索业务集群。基于 INFINI Gateway,可以实现索引级别的限速限流、常见查询的缓存加速、查询请求的审计、查询结果的动态修改等等。 Gateway 现已开源(),如有相关问题或建议,欢迎提交 PR 或 Issue,一起参与开源共建!