极限网关案例分享(2):使用极限网关来加速索引写入速度
Gateway
ES Bulk
2024-12-28

背景 #

某些业务数据写入速度慢,需提高写入速度。

方案 #

设计方案如下:

  • 将数据写入改造成 bulk 请求形式
  • 前置网关将指定业务的 bulk 请求单独拆分出来,转发到后置网关
  • 后置网关对 bulk 请求进行异步消费处理

前置网关拆分指定请求 #

网关配置如下:

path.data: data-front
path.logs: log-front

entry:
  - name: my_es_entry
    enabled: true
    router: my_router
    max_concurrency: 50000
    network:
      binding: :9243
    tls:
      enabled: true
      cert_file: /etc/ssl.crt
      key_file: /etc/ssl.key
      skip_insecure_verify: false

flow:
  - name: test-async-indexing
    filter:
      - elasticsearch:
          elasticsearch: test-es
      - flow:
          when:
            equals:
              _ctx.response.status: 0
          flows:
            - default-flow
      - drop:
  - name: bulk_flow
    filter:
      - flow:
          when:
            and:
              - contains:
                  _ctx.request.path: /_bulk
              - prefix:
                  _ctx.request.host: 165ed1e903914b929746c642d92b5a21
          flows:
            - test-async-indexing
      - flow:
          flows:
            - default-flow
  - name: default-flow
    filter:
      - http:
          schema: "https" #https or http
          max_idle_conn_duration: "900s"
          skip_failure_host: false
          hosts:
            - "192.168.200.209:9243"
            - "192.168.200.210:9243"
            - "192.168.200.211:9243"
  - name: drop_flow
    filter:
      - drop:
  - name: test-request_indexing
    filter:
      - ratio:
          ratio: 0.5
          flow: drop_flow
          continue: false
      - logging:
          queue_name: test-request_logging
          max_request_body_size: 4096
          max_response_body_size: 4096
      - drop:
  - name: request_logging
    filter:
      - flow:
          when:
            prefix:
              _ctx.request.host: 165ed1e903914b929746c642d92b5a21
          flows:
            - test-request_indexing
      - logging:
          queue_name: request_logging
          max_request_body_size: 4096
          max_response_body_size: 4096
          when: #>1s or none-200 requests will be logged
            or:
              - not:
                  or:
                    - equals:
                        _ctx.request.path: "/favicon.ico"
                    - in:
                        _ctx.response.status: [404, 200, 201]
                    - in:
                        _ctx.request.path: ["/sw.js"]
              - range:
                  _ctx.elapsed.gte: 1000
router:
  - name: my_router
    default_flow: default-flow
    tracing_flow: request_logging
    rules:
      - method:
          - "*"
        pattern:
          - "/_bulk"
          - "/{any_index}/_bulk"
        flow:
          - bulk_flow

elasticsearch:
  - name: logging-server
    enabled: true
    endpoints:
      - https://d6794e84d46e4b7db21d364de10620c4.192.168.200.209.ip.es.io:9243
    basic_auth:
      username: elastic
      password: 1qaz!QAZ
    discovery:
      enabled: false
  - name: test-logging-server
    enabled: true
    endpoints:
      - https://ccb82916fc594ccf94a241bba9a8d1b6.192.168.200.209.ip.es.io:9243
    basic_auth:
      username: elastic
      password: 1qaz!QAZ
    discovery:
      enabled: false
  - name: test-es
    enabled: true
    endpoints:
      - https://165ed1e903914b929746c642d92b5a21.192.168.3.185.async.vip:8000
    basic_auth:
      username: elastic
      password: 1qaz!QAZ
    discovery:
      enabled: false

pipeline:
  - name: metrics_ingest
    auto_start: true
    keep_running: true
    retry_delay_in_ms: 1000
    processor:
      - json_indexing:
          index_name: ".infini_metrics"
          elasticsearch: "logging-server"
          input_queue: "metrics"
          idle_timeout_in_seconds: 1
          worker_size: 1
          bulk_size_in_mb: 10 #in MB
          when:
            cluster_available: ["logging-server"]
  - name: indexing_merge
    auto_start: true
    keep_running: true
    processor:
      - indexing_merge:
          input_queue: "request_logging"
          elasticsearch: "logging-server"
          index_name: ".infini_requests_logging"
          output_queue:
            name: "gateway_requests"
            label:
              tag: "request_logging"
          worker_size: 1
          bulk_size_in_mb: 10
  - name: logging_requests
    auto_start: true
    keep_running: true
    processor:
      - bulk_indexing:
          bulk:
            compress: true
            batch_size_in_mb: 10
            batch_size_in_docs: 5000
            bulk_result_message_queue: ""
            response_handle:
              save_success_results: false
              output_bulk_stats: false
          queue_selector:
            labels:
              tag: request_logging
          consumer:
            fetch_max_messages: 100
          queues:
            type: indexing_merge
          when:
            cluster_available: ["logging-server"]
  - name: test-logging_requests
    auto_start: true
    keep_running: true
    processor:
      - bulk_indexing:
          bulk:
            compress: true
            batch_size_in_mb: 10
            batch_size_in_docs: 5000
            bulk_result_message_queue: ""
            response_handle:
              save_success_results: false
              output_bulk_stats: false
          queue_selector:
            labels:
              tag: test-request_logging
          consumer:
            fetch_max_messages: 100
          queues:
            type: indexing_merge
          when:
            cluster_available: ["test-logging-server"]
  - name: test_indexing_merge
    auto_start: true
    keep_running: true
    processor:
      - indexing_merge:
          input_queue: "test-request_logging"
          elasticsearch: "test-logging-server"
          index_name: ".infini_requests_logging"
          output_queue:
            name: "test-gateway_requests"
            label:
              tag: "test-request_logging"
          worker_size: 1
          bulk_size_in_mb: 10

metrics:
  enabled: true
  queue: metrics
  instance:
    enabled: true
  network:
    enabled: true
    summary: true

badger:
  value_log_max_entries: 1000000
  value_log_file_size: 104857600

配置说明:

- name: test-async-indexing
  filter:
    - elasticsearch:
        elasticsearch: test-es
    - flow:
        when:
          equals:
            _ctx.response.status: 0
        flows:
          - default-flow
    - drop:
- name: bulk_flow
  filter:
    - flow:
        when:
          and:
            - contains:
                _ctx.request.path: /_bulk
            - prefix:
                _ctx.request.host: 165ed1e903914b929746c642d92b5a21
        flows:
          - test-async-indexing
    - flow:
        flows:
          - default-flow
  • 根据请求信息中的 request.host 将指定请求过滤到独立的处理流程中。
- name: test-request_indexing
  filter:
    - ratio:
        ratio: 0.5
        flow: drop_flow
        continue: false
    - logging:
        queue_name: test-request_logging
        max_request_body_size: 4096
        max_response_body_size: 4096
    - drop:
- name: request_logging
  filter:
    - flow:
        when:
          prefix:
            _ctx.request.host: 165ed1e903914b929746c642d92b5a21
        flows:
          - test-request_indexing
    - logging:
        queue_name: request_logging
        max_request_body_size: 4096
        max_response_body_size: 4096
        when: #>1s or none-200 requests will be logged
          or:
            - not:
                or:
                  - equals:
                      _ctx.request.path: "/favicon.ico"
                  - in:
                      _ctx.response.status: [404, 200, 201]
                  - in:
                      _ctx.request.path: ["/sw.js"]
            - range:
                _ctx.elapsed.gte: 1000
  • 将请求记录也按照指定的 request.host 过滤到单独的处理流程中。

后置网关异步消费请求数据 #

网关配置如下:

path.data: data-async
path.logs: log-async

pipeline:
  - name: metrics_merge
    auto_start: true
    keep_running: true
    processor:
      - indexing_merge:
          input_queue: "metrics"
          elasticsearch: "logging-server"
          index_name: ".infini_metrics"
          output_queue:
            name: "bulk_requests"
            label:
              tag: "metrics"
          worker_size: 1
          bulk_size_in_mb: 10
  - name: request_logging_merge
    auto_start: true
    keep_running: true
    processor:
      - indexing_merge:
          input_queue: "request_logging"
          elasticsearch: "logging-server"
          index_name: ".infini_requests_logging"
          output_queue:
            name: "bulk_requests"
            label:
              tag: "request_logging"
          worker_size: 1
          bulk_size_in_mb: 10
  - name: failure_message_merge
    auto_start: true
    keep_running: true
    processor:
      - indexing_merge:
          input_queue: "bulk_result_messages"
          elasticsearch: "logging-server"
          index_name: ".infini_async_bulk_results"
          output_queue:
            name: "bulk_requests"
            label:
              tag: "bulk_logging"
          worker_size: 1
          bulk_size_in_mb: 10
  - name: ingest_merged_requests
    auto_start: true
    keep_running: true
    processor:
      - bulk_indexing:
          bulk:
            compress: true
            batch_size_in_mb: 10
            batch_size_in_docs: 5000
            retry_rules:
              retry_429: true
              default: true
              retry_4xx: false
          consumer:
            fetch_max_messages: 100
          queues:
            type: indexing_merge
          when:
            cluster_available: ["logging-server"]
  - name: bulk_request_ingest
    auto_start: true
    keep_running: true
    retry_delay_in_ms: 1000
    processor:
      - bulk_indexing:
          max_connection_per_node: 1000
          num_of_slices: 2
          max_worker_size: 200
          idle_timeout_in_seconds: 3
          bulk:
            compress: false
            batch_size_in_mb: 20
            batch_size_in_docs: 5000
            dead_letter_queue: "bulk_dead_requests"
            bulk_result_message_queue: "bulk_result_messages"
            max_request_body_size: 10240
            max_response_body_size: 10240
            save_success_results: true
            retry_rules:
              denied:
                keyword:
                  - illegal_state_exception
          consumer:
            fetch_max_messages: 1000
            eof_retry_delay_in_ms: 500
          queue_selector:
            labels:
              type: bulk_reshuffle

entry:
  - name: my_es_entry
    enabled: true
    router: my_router
    max_concurrency: 200000
    network:
      binding: 0.0.0.0:8000
    tls:
      enabled: true
      cert_file: /etc/ssl.crt
      key_file: /etc/ssl.key
      skip_insecure_verify: false

flow:
  - name: async_bulk
    filter:
      - bulk_reshuffle:
          when:
            contains:
              _ctx.request.path: /_bulk
          elasticsearch: prod
          level: node
          partition_size: 1
          #shards: [1,3,5,7,9,11,13]
          continue_metadata_missing: true
          fix_null_id: true
      - http:
          schema: "https" #https or http
          hosts:
            - "165ed1e903914b929746c642d92b5a21.192.168.200.209.ip.es.a:9243"
            - "165ed1e903914b929746c642d92b5a21.192.168.200.209.ip.es.b:9243"
            - "165ed1e903914b929746c642d92b5a21.192.168.200.209.ip.es.c:9243"

router:
  - name: my_router
    default_flow: async_bulk

elasticsearch:
  - name: prod
    enabled: true
    endpoints:
      - https://165ed1e903914b929746c642d92b5a21.192.168.200.209.ip.es.a:9243
      - https://165ed1e903914b929746c642d92b5a21.192.168.200.209.ip.es.b:9243
      - https://165ed1e903914b929746c642d92b5a21.192.168.200.209.ip.es.c:9243
    discovery:
      enabled: false
    basic_auth:
      username: elastic
      password: 1qaz!QAZ
  - name: logging-server
    enabled: true
    endpoints:
      - https://ccb82916fc594ccf94a241bba9a8d1b6.192.168.200.209.ip.es.io:9243
    basic_auth:
      username: elastic
      password: 1qaz!QAZ
    discovery:
      enabled: false
    request_timeout: 60

elastic:
  enabled: true
  remote_configs: false
  health_check:
    enabled: true
    interval: 30s
  availability_check:
    enabled: true
    interval: 60s
  metadata_refresh:
    enabled: true
    interval: 30s
  cluster_settings_check:
    enabled: false
    interval: 20s

disk_queue:
  prepare_files_to_read: true
  #max_bytes_per_file: 20971520
  eof_retry_delay_in_ms: 500
  compress:
    delete_after_compress: true
    idle_threshold: 20

metrics:
  enabled: true
  queue: metrics
  instance:
    enabled: true
  network:
    enabled: true
    summary: true

badger:
  value_log_max_entries: 1000000
  value_log_file_size: 104857600

配置说明:

- name: async_bulk
  filter:
    - bulk_reshuffle:
        when:
          contains:
            _ctx.request.path: /_bulk
        elasticsearch: prod
        level: node
        partition_size: 1
        #shards: [1,3,5,7,9,11,13]
        continue_metadata_missing: true
        fix_null_id: true
    - http:
        schema: "https" #https or http
        hosts:
          - "165ed1e903914b929746c642d92b5a21.192.168.200.209.ip.es.a:9243"
          - "165ed1e903914b929746c642d92b5a21.192.168.200.209.ip.es.b:9243"
          - "165ed1e903914b929746c642d92b5a21.192.168.200.209.ip.es.c:9243"
  • 请求首先经过 bulk_reshuffle 过滤器,如果提交 buffer 成功,请求返回 200,数据在队列中等待消费;如果提交 buffer 失败,请求将进入 http 过滤器,直接发往 ECE Proxy。

经测试验证,改造后写入性能提升 20%。

更多极限网关配置信息请参考 官网

关于极限网关(INFINI Gateway) #

INFINI Gateway 是一个面向搜索场景的高性能数据网关,所有请求都经过网关处理后再转发到后端的搜索业务集群。基于 INFINI Gateway,可以实现索引级别的限速限流、常见查询的缓存加速、查询请求的审计、查询结果的动态修改等等。

Gateway 现已开源(https://github.com/infinilabs/gateway),如有相关问题或建议,欢迎提交 PR 或 Issue,一起参与开源共建!

标签
Easysearch x
Gateway x
Console x