📣 极限科技诚招搜索运维工程师(Elasticsearch/Easysearch)- 全职/北京 👉 : 立即申请加入
极限网关案例分享(2):使用极限网关来加速索引写入速度

背景 #

某些业务数据写入速度慢,需提高写入速度。

方案 #

设计方案如下:

  • 将数据写入改造成 bulk 请求形式
  • 前置网关将指定业务的 bulk 请求单独拆分出来,转发到后置网关
  • 后置网关对 bulk 请求进行异步消费处理

前置网关拆分指定请求 #

网关配置如下:

path.data: data-front
path.logs: log-front

entry:
  - name: my_es_entry
    enabled: true
    router: my_router
    max_concurrency: 50000
    network:
      binding: :9243
    tls:
      enabled: true
      cert_file: /etc/ssl.crt
      key_file: /etc/ssl.key
      skip_insecure_verify: false

flow:
  - name: test-async-indexing
    filter:
      - elasticsearch:
          elasticsearch: test-es
      - flow:
          when:
            equals:
              _ctx.response.status: 0
          flows:
            - default-flow
      - drop:
  - name: bulk_flow
    filter:
      - flow:
          when:
            and:
              - contains:
                  _ctx.request.path: /_bulk
              - prefix:
                  _ctx.request.host: 165ed1e903914b929746c642d92b5a21
          flows:
            - test-async-indexing
      - flow:
          flows:
            - default-flow
  - name: default-flow
    filter:
      - http:
          schema: "https" #https or http
          max_idle_conn_duration: "900s"
          skip_failure_host: false
          hosts:
            - "192.168.200.209:9243"
            - "192.168.200.210:9243"
            - "192.168.200.211:9243"
  - name: drop_flow
    filter:
      - drop:
  - name: test-request_indexing
    filter:
      - ratio:
          ratio: 0.5
          flow: drop_flow
          continue: false
      - logging:
          queue_name: test-request_logging
          max_request_body_size: 4096
          max_response_body_size: 4096
      - drop:
  - name: request_logging
    filter:
      - flow:
          when:
            prefix:
              _ctx.request.host: 165ed1e903914b929746c642d92b5a21
          flows:
            - test-request_indexing
      - logging:
          queue_name: request_logging
          max_request_body_size: 4096
          max_response_body_size: 4096
          when: #>1s or none-200 requests will be logged
            or:
              - not:
                  or:
                    - equals:
                        _ctx.request.path: "/favicon.ico"
                    - in:
                        _ctx.response.status: [404, 200, 201]
                    - in:
                        _ctx.request.path: ["/sw.js"]
              - range:
                  _ctx.elapsed.gte: 1000
router:
  - name: my_router
    default_flow: default-flow
    tracing_flow: request_logging
    rules:
      - method:
          - "*"
        pattern:
          - "/_bulk"
          - "/{any_index}/_bulk"
        flow:
          - bulk_flow

elasticsearch:
  - name: logging-server
    enabled: true
    endpoints:
      - https://d6794e84d46e4b7db21d364de10620c4.192.168.200.209.ip.es.io:9243
    basic_auth:
      username: elastic
      password: 1qaz!QAZ
    discovery:
      enabled: false
  - name: test-logging-server
    enabled: true
    endpoints:
      - https://ccb82916fc594ccf94a241bba9a8d1b6.192.168.200.209.ip.es.io:9243
    basic_auth:
      username: elastic
      password: 1qaz!QAZ
    discovery:
      enabled: false
  - name: test-es
    enabled: true
    endpoints:
      - https://165ed1e903914b929746c642d92b5a21.192.168.3.185.async.vip:8000
    basic_auth:
      username: elastic
      password: 1qaz!QAZ
    discovery:
      enabled: false

pipeline:
  - name: metrics_ingest
    auto_start: true
    keep_running: true
    retry_delay_in_ms: 1000
    processor:
      - json_indexing:
          index_name: ".infini_metrics"
          elasticsearch: "logging-server"
          input_queue: "metrics"
          idle_timeout_in_seconds: 1
          worker_size: 1
          bulk_size_in_mb: 10 #in MB
          when:
            cluster_available: ["logging-server"]
  - name: indexing_merge
    auto_start: true
    keep_running: true
    processor:
      - indexing_merge:
          input_queue: "request_logging"
          elasticsearch: "logging-server"
          index_name: ".infini_requests_logging"
          output_queue:
            name: "gateway_requests"
            label:
              tag: "request_logging"
          worker_size: 1
          bulk_size_in_mb: 10
  - name: logging_requests
    auto_start: true
    keep_running: true
    processor:
      - bulk_indexing:
          bulk:
            compress: true
            batch_size_in_mb: 10
            batch_size_in_docs: 5000
            bulk_result_message_queue: ""
            response_handle:
              save_success_results: false
              output_bulk_stats: false
          queue_selector:
            labels:
              tag: request_logging
          consumer:
            fetch_max_messages: 100
          queues:
            type: indexing_merge
          when:
            cluster_available: ["logging-server"]
  - name: test-logging_requests
    auto_start: true
    keep_running: true
    processor:
      - bulk_indexing:
          bulk:
            compress: true
            batch_size_in_mb: 10
            batch_size_in_docs: 5000
            bulk_result_message_queue: ""
            response_handle:
              save_success_results: false
              output_bulk_stats: false
          queue_selector:
            labels:
              tag: test-request_logging
          consumer:
            fetch_max_messages: 100
          queues:
            type: indexing_merge
          when:
            cluster_available: ["test-logging-server"]
  - name: test_indexing_merge
    auto_start: true
    keep_running: true
    processor:
      - indexing_merge:
          input_queue: "test-request_logging"
          elasticsearch: "test-logging-server"
          index_name: ".infini_requests_logging"
          output_queue:
            name: "test-gateway_requests"
            label:
              tag: "test-request_logging"
          worker_size: 1
          bulk_size_in_mb: 10

metrics:
  enabled: true
  queue: metrics
  instance:
    enabled: true
  network:
    enabled: true
    summary: true

badger:
  value_log_max_entries: 1000000
  value_log_file_size: 104857600

配置说明:

- name: test-async-indexing
  filter:
    - elasticsearch:
        elasticsearch: test-es
    - flow:
        when:
          equals:
            _ctx.response.status: 0
        flows:
          - default-flow
    - drop:
- name: bulk_flow
  filter:
    - flow:
        when:
          and:
            - contains:
                _ctx.request.path: /_bulk
            - prefix:
                _ctx.request.host: 165ed1e903914b929746c642d92b5a21
        flows:
          - test-async-indexing
    - flow:
        flows:
          - default-flow
  • 根据请求信息中的 request.host 将指定请求过滤到独立的处理流程中。
- name: test-request_indexing
  filter:
    - ratio:
        ratio: 0.5
        flow: drop_flow
        continue: false
    - logging:
        queue_name: test-request_logging
        max_request_body_size: 4096
        max_response_body_size: 4096
    - drop:
- name: request_logging
  filter:
    - flow:
        when:
          prefix:
            _ctx.request.host: 165ed1e903914b929746c642d92b5a21
        flows:
          - test-request_indexing
    - logging:
        queue_name: request_logging
        max_request_body_size: 4096
        max_response_body_size: 4096
        when: #>1s or none-200 requests will be logged
          or:
            - not:
                or:
                  - equals:
                      _ctx.request.path: "/favicon.ico"
                  - in:
                      _ctx.response.status: [404, 200, 201]
                  - in:
                      _ctx.request.path: ["/sw.js"]
            - range:
                _ctx.elapsed.gte: 1000
  • 将请求记录也按照指定的 request.host 过滤到单独的处理流程中。

后置网关异步消费请求数据 #

网关配置如下:

path.data: data-async
path.logs: log-async

pipeline:
  - name: metrics_merge
    auto_start: true
    keep_running: true
    processor:
      - indexing_merge:
          input_queue: "metrics"
          elasticsearch: "logging-server"
          index_name: ".infini_metrics"
          output_queue:
            name: "bulk_requests"
            label:
              tag: "metrics"
          worker_size: 1
          bulk_size_in_mb: 10
  - name: request_logging_merge
    auto_start: true
    keep_running: true
    processor:
      - indexing_merge:
          input_queue: "request_logging"
          elasticsearch: "logging-server"
          index_name: ".infini_requests_logging"
          output_queue:
            name: "bulk_requests"
            label:
              tag: "request_logging"
          worker_size: 1
          bulk_size_in_mb: 10
  - name: failure_message_merge
    auto_start: true
    keep_running: true
    processor:
      - indexing_merge:
          input_queue: "bulk_result_messages"
          elasticsearch: "logging-server"
          index_name: ".infini_async_bulk_results"
          output_queue:
            name: "bulk_requests"
            label:
              tag: "bulk_logging"
          worker_size: 1
          bulk_size_in_mb: 10
  - name: ingest_merged_requests
    auto_start: true
    keep_running: true
    processor:
      - bulk_indexing:
          bulk:
            compress: true
            batch_size_in_mb: 10
            batch_size_in_docs: 5000
            retry_rules:
              retry_429: true
              default: true
              retry_4xx: false
          consumer:
            fetch_max_messages: 100
          queues:
            type: indexing_merge
          when:
            cluster_available: ["logging-server"]
  - name: bulk_request_ingest
    auto_start: true
    keep_running: true
    retry_delay_in_ms: 1000
    processor:
      - bulk_indexing:
          max_connection_per_node: 1000
          num_of_slices: 2
          max_worker_size: 200
          idle_timeout_in_seconds: 3
          bulk:
            compress: false
            batch_size_in_mb: 20
            batch_size_in_docs: 5000
            dead_letter_queue: "bulk_dead_requests"
            bulk_result_message_queue: "bulk_result_messages"
            max_request_body_size: 10240
            max_response_body_size: 10240
            save_success_results: true
            retry_rules:
              denied:
                keyword:
                  - illegal_state_exception
          consumer:
            fetch_max_messages: 1000
            eof_retry_delay_in_ms: 500
          queue_selector:
            labels:
              type: bulk_reshuffle

entry:
  - name: my_es_entry
    enabled: true
    router: my_router
    max_concurrency: 200000
    network:
      binding: 0.0.0.0:8000
    tls:
      enabled: true
      cert_file: /etc/ssl.crt
      key_file: /etc/ssl.key
      skip_insecure_verify: false

flow:
  - name: async_bulk
    filter:
      - bulk_reshuffle:
          when:
            contains:
              _ctx.request.path: /_bulk
          elasticsearch: prod
          level: node
          partition_size: 1
          #shards: [1,3,5,7,9,11,13]
          continue_metadata_missing: true
          fix_null_id: true
      - http:
          schema: "https" #https or http
          hosts:
            - "165ed1e903914b929746c642d92b5a21.192.168.200.209.ip.es.a:9243"
            - "165ed1e903914b929746c642d92b5a21.192.168.200.209.ip.es.b:9243"
            - "165ed1e903914b929746c642d92b5a21.192.168.200.209.ip.es.c:9243"

router:
  - name: my_router
    default_flow: async_bulk

elasticsearch:
  - name: prod
    enabled: true
    endpoints:
      - https://165ed1e903914b929746c642d92b5a21.192.168.200.209.ip.es.a:9243
      - https://165ed1e903914b929746c642d92b5a21.192.168.200.209.ip.es.b:9243
      - https://165ed1e903914b929746c642d92b5a21.192.168.200.209.ip.es.c:9243
    discovery:
      enabled: false
    basic_auth:
      username: elastic
      password: 1qaz!QAZ
  - name: logging-server
    enabled: true
    endpoints:
      - https://ccb82916fc594ccf94a241bba9a8d1b6.192.168.200.209.ip.es.io:9243
    basic_auth:
      username: elastic
      password: 1qaz!QAZ
    discovery:
      enabled: false
    request_timeout: 60

elastic:
  enabled: true
  remote_configs: false
  health_check:
    enabled: true
    interval: 30s
  availability_check:
    enabled: true
    interval: 60s
  metadata_refresh:
    enabled: true
    interval: 30s
  cluster_settings_check:
    enabled: false
    interval: 20s

disk_queue:
  prepare_files_to_read: true
  #max_bytes_per_file: 20971520
  eof_retry_delay_in_ms: 500
  compress:
    delete_after_compress: true
    idle_threshold: 20

metrics:
  enabled: true
  queue: metrics
  instance:
    enabled: true
  network:
    enabled: true
    summary: true

badger:
  value_log_max_entries: 1000000
  value_log_file_size: 104857600

配置说明:

- name: async_bulk
  filter:
    - bulk_reshuffle:
        when:
          contains:
            _ctx.request.path: /_bulk
        elasticsearch: prod
        level: node
        partition_size: 1
        #shards: [1,3,5,7,9,11,13]
        continue_metadata_missing: true
        fix_null_id: true
    - http:
        schema: "https" #https or http
        hosts:
          - "165ed1e903914b929746c642d92b5a21.192.168.200.209.ip.es.a:9243"
          - "165ed1e903914b929746c642d92b5a21.192.168.200.209.ip.es.b:9243"
          - "165ed1e903914b929746c642d92b5a21.192.168.200.209.ip.es.c:9243"
  • 请求首先经过 bulk_reshuffle 过滤器,如果提交 buffer 成功,请求返回 200,数据在队列中等待消费;如果提交 buffer 失败,请求将进入 http 过滤器,直接发往 ECE Proxy。

经测试验证,改造后写入性能提升 20%。

更多极限网关配置信息请参考 官网

关于极限网关(INFINI Gateway) #

INFINI Gateway 是一个面向搜索场景的高性能数据网关,所有请求都经过网关处理后再转发到后端的搜索业务集群。基于 INFINI Gateway,可以实现索引级别的限速限流、常见查询的缓存加速、查询请求的审计、查询结果的动态修改等等。

Gateway 现已开源(https://github.com/infinilabs/gateway),如有相关问题或建议,欢迎提交 PR 或 Issue,一起参与开源共建!

标签
Easysearch x
产品更新 x
performance x
2026 x
开源 x
赞助 x
开源生态 x
社区 x
Coco AI x
二等奖 x
兴智杯 x
人工智能 x
赛事 x
低空经济 x
商业化 x
数据分析 x
金猿奖 x
国产化 x
搜索引擎 x
技术卓越奖 x
创新产品奖 x
IT168 x
APM x
Skywalking x
Easy-Es x
Coco x
AI x
GitLab x
代码审核 x
石油石化 x
Gitee x
投票 x
Meilisearch x
Rust x
轻量级 x
搜索百科 x
Docker x
Docker Compose x
Easyserach x
Console x
DevOps x
Elasticsearch x
国产替代 x
backup x
snapshot x
CCR x
Gateway x
esdump x
source_reuse x
ignore_above x
OpenSearch x
AWS x
Lucene x
Solr x
Easyearch x
发明专利 x
数据分区 x
国际专利 x
一等奖 x
人工智能应用创新大赛 x
bulk x
embedding x
OpenAI x
IK x
TDBC x
2025 x
信通院 x
可信数据库大会 x
搜索型数据库 x
中国数据库产业图谱 x
上海开源创新菁英荟 x
开源创新新星企业 x
Workshop x
AI 搜索 x
智能助手 x
Automation x
Logstash x
MongoDB x
开源中国 x
直播 x
merge x
Elasticsearch 9 x
GitCode x
AI搜索 x
Cloud x
rollup x
Kubernetes x
Operator x
Arm64 x
Snapshot x
S3 x
Grafana x
Opensearch x
Nginx x
直播活动 x
搜索客社区 x
Meetup x
ES x
企业搜索 x
DeepSeek x
RAG x
certificate x
windows x
Rollup x
TopN x
Filebeat x
Ubuntu x
请求限速 x
INFINI Console x
指标 x
Kibana x
多集群 x
client x
Spring Boot x
ECE x
ES Bulk x
vector database x
Postgres x
可搜索快照 x
SDK x
官网 x
Web 开发 x
Next.js x
React x
Three.js x
Metrics x
Helm x
filter x
querycache x
practice x
Agent x
localStorage x
响应式 x
时间组件 x
时区组件 x
极限科技 x
三周年 x
周年庆 x
国家高新技术企业 x
校园招聘 x
湖北工业大学 x
Tauri x
Web 开发人员 x
桌面应用开发 x
桌面端 x
Electron x
Pizza x
认证培训 x
报名 x
Scrapy x
爬虫 x
Rust开发者大会 x
docsearch x
文档搜索 x
Easyseach x
有奖征文 x
黑神话悟空 x
EKS x
征文系列 x
跨集群搜索 x
科技中小企业 x
白皮书 x
Python SDK x
数据库产业图谱 x
超大规模 x
分布式集群 x
写入限流 x
2024可信数据库发展大会 x
创新型中小企业 x
搜索数据库 x
正排索引 x
免费许可证 x
K8S x
DTC2024 x
实时搜索 x
ES国产化 x
Redis x
OOM x
测试 x
内存 x
趋势 x
AI绘画 x
Stable Diffusion x
Diffusion x
Model x
GAN x
语义搜索 x
知识图 x
向量数据库 x
中国信通院 x
星河(Galaxy) x
标杆案例 x
鲲鹏 x
鲲鹏技术认证 x
客户端 x
日志平台 x
LDAP x
Loadgen x
中国一汽 x
国内数据库 x
墨天轮 x
监控系统 x
集成测试 x
ZSTD x
Helm Charts x
国产适配 x
兆芯 x
Linux x
LoongArch x
信创适配 x
二维拆分算法 x
中国移动云 x
Vault x
加密 x
安全工具 x
kNN x
向量检索 x
图片搜索 x
Alerting x
SQL x
搜索 x
Embedding x
可信数据库 x
统信 x
海光 x
龙芯 x
restore x
Arm x
大数据企业证书 x
移动云大会 x
信通院产品评测 x
国内首家 x
数据可视化 x
北京软协 x
第十届理事会会员单位 x
Apache Arrow x
宣传片 x
大会分享 x
多集群管理 x
无缝数据迁移 x
Loadrun x
INFINI Gateway x
log4j x