--- title: "谈谈 ES 6.8 到 7.10 的功能变迁(5)- 任务和集群管理" date: 2025-02-09 lastmod: 2025-02-09 description: "本文详细对比了ES 7.10与6.8在集群管理与任务优化的功能改进,包括断联查询自动取消、投票节点角色、异步查询及可搜索快照四大特性,提升任务灵活性、主节点选举稳定性及成本优化,特别适合日志场景应用。" tags: ["Elasticsearch"] summary: "这一篇我们继续了解 ES 7.10 相较于 ES 6.8 调优的集群管理和任务管理的方法,主要有断联查询的主动取消、投票节点角色、异步查询和可搜索快照四个功能。 Query 自动取消 # 对于一个完善的产品来说,当一个任务发起链接主动断联的时候,服务端与之相关的任务应该也都被回收。但是这个特性到了 elasticsearch 7.4 版本才有了明确的声明。 Elasticsearch now automatically terminates queries sent through the _search endpoint when the initiating connection is closed. 相关的 PR 和 issue 在这里,对源码有兴趣的同学可以挖掘一下。 PR:https://github.com/elastic/elasticsearch/pull/43332 issue:https://github.com/elastic/elasticsearch/issues/43105 简单来说,ES 接受在某个查询的 http 链接断掉的时候,与其相关的父子任务的自动取消。原来的场景下可能需要手工一个个关闭。 实际测试 # 利用 painless 模拟复杂查询,下面这个查询在测试集群上能维持 5s 左右 GET /_search?max_concurrent_shard_requests=1 { "query": { "bool": { "must": [ { "script": { "script": { "lang": "painless", "source": """ long sum = 0; for (int i = 0; i < 100000; i++) { sum += i; } return true; """ } } }, { "script": { "script": { "lang": "painless", "source": """ long product = 1; for (int i = 1; i < 100000; i++) { product *= i; } return true; """ } } }, { "script": { "script": { "lang": "painless", "source": """ long factorial = 1; for (int i = 1; i < 100000; i++) { factorial *= i; } long squareSum = 0; for (int j = 0; j < 100000; j++) { squareSum += j * j; } return true; """ } } }, { "script": { "script": { "lang": "painless", "source": """ long fib1 = 0; long fib2 = 1; long next; for (int i = 0; i < 100000; i++) { next = fib1 + fib2; fib1 = fib2; fib2 = next; } return true; """ } } } ] } } } 查看任务被终止的状态" --- 这一篇我们继续了解 ES 7.10 相较于 ES 6.8 调优的集群管理和任务管理的方法,主要有断联查询的主动取消、投票节点角色、异步查询和可搜索快照四个功能。 ## Query 自动取消 对于一个完善的产品来说,当一个任务发起链接主动断联的时候,服务端与之相关的任务应该也都被回收。但是这个特性到了 elasticsearch 7.4 版本才有了明确的声明。 > Elasticsearch now automatically terminates queries sent through the \_search endpoint when the initiating connection is closed. 相关的 PR 和 issue 在这里,对源码有兴趣的同学可以挖掘一下。 PR:https://github.com/elastic/elasticsearch/pull/43332 issue:https://github.com/elastic/elasticsearch/issues/43105 简单来说,ES 接受在某个查询的 http 链接断掉的时候,与其相关的父子任务的自动取消。原来的场景下可能需要手工一个个关闭。 ### 实际测试 利用 painless 模拟复杂查询,下面这个查询在测试集群上能维持 5s 左右 ``` GET /_search?max_concurrent_shard_requests=1 { "query": { "bool": { "must": [ { "script": { "script": { "lang": "painless", "source": """ long sum = 0; for (int i = 0; i < 100000; i++) { sum += i; } return true; """ } } }, { "script": { "script": { "lang": "painless", "source": """ long product = 1; for (int i = 1; i < 100000; i++) { product *= i; } return true; """ } } }, { "script": { "script": { "lang": "painless", "source": """ long factorial = 1; for (int i = 1; i < 100000; i++) { factorial *= i; } long squareSum = 0; for (int j = 0; j < 100000; j++) { squareSum += j * j; } return true; """ } } }, { "script": { "script": { "lang": "painless", "source": """ long fib1 = 0; long fib2 = 1; long next; for (int i = 0; i < 100000; i++) { next = fib1 + fib2; fib1 = fib2; fib2 = next; } return true; """ } } } ] } } } ``` 查看任务被终止的状态 ``` GET /_tasks?detailed=true&actions=*search* ``` 测试脚本,判断上面该查询被取消后是否还可以查到任务 ```python import requests import multiprocessing import time from requests.exceptions import RequestException from datetime import datetime # Elasticsearch 地址 #ES_URL = "http://localhost:9210" # 6.8版本地址 ES_URL = "http://localhost:9201" # 耗时查询的 DSL LONG_RUNNING_QUERY = {"size":0, "query": { "bool": { "must": [ { "script": { "script": { "lang": "painless", "source": """ long sum = 0; for (int i = 0; i < 100000; i++) { sum += i; } return true; """ } } }, { "script": { "script": { "lang": "painless", "source": """ long product = 1; for (int i = 1; i < 100000; i++) { product *= i; } return true; """ } } }, { "script": { "script": { "lang": "painless", "source": """ long factorial = 1; for (int i = 1; i < 100000; i++) { factorial *= i; } long squareSum = 0; for (int j = 0; j < 100000; j++) { squareSum += j * j; } return true; """ } } }, { "script": { "script": { "lang": "painless", "source": """ long fib1 = 0; long fib2 = 1; long next; for (int i = 0; i < 100000; i++) { next = fib1 + fib2; fib1 = fib2; fib2 = next; } return true; """ } } } ] } } } # 用于同步的事件对象 query_finished = multiprocessing.Event() # 新增:进程终止标志位 process_terminated = multiprocessing.Event() # 定义一个函数用于添加时间戳到日志 def log_with_timestamp(message,*message1): timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S") print(f"[{timestamp}] {message}+{message1}") # 发起查询的函数 def run_query(): try: log_with_timestamp("发起查询...") session = requests.Session() response = session.post( f"{ES_URL}/_search", json=LONG_RUNNING_QUERY, stream=True # 启用流式请求,允许后续中断 ) try: # 尝试读取响应内容(如果连接未被中断) if response.status_code == 200: log_with_timestamp("查询完成,结果:", response.json()) else: log_with_timestamp("查询失败,错误信息:", response.text) except RequestException as e: log_with_timestamp("请求被中断:", e) finally: # 标记查询完成 query_finished.set() # 中断连接的信号函数 def interrupt_signal(): time.sleep(1) # 等待 1 秒 log_with_timestamp("发出中断查询信号...") # 标记可以中断查询了 query_finished.set() # 检测任务是否存在的函数 def check_task_exists(): # 等待进程终止标志位 process_terminated.wait() max_retries = 3 retries = 0 time.sleep(1) #1s后检查 while retries < max_retries: log_with_timestamp("检查任务是否存在...") tasks_url = f"{ES_URL}/_tasks?detailed=true&actions=*search*" try: tasks_response = requests.get(tasks_url) if tasks_response.status_code == 200: tasks = tasks_response.json().get("nodes") if tasks: log_with_timestamp("任务仍存在:", tasks) else: log_with_timestamp("任务已消失") break else: log_with_timestamp("获取任务列表失败,错误信息:", tasks_response.text) except RequestException as e: log_with_timestamp(f"检测任务失败(第 {retries + 1} 次重试): {e}") retries += 1 time.sleep(1) # 等待 1 秒后重试 if retries == max_retries: log_with_timestamp("达到最大重试次数,无法检测任务状态。") # 主函数 def main(): # 启动查询进程 query_process = multiprocessing.Process(target=run_query) query_process.start() # 启动中断信号进程 interrupt_process = multiprocessing.Process(target=interrupt_signal) interrupt_process.start() # 等待中断信号 query_finished.wait() # 检查查询进程是否还存活并终止它 if query_process.is_alive(): log_with_timestamp("尝试中断查询进程...") query_process.terminate() log_with_timestamp("查询进程已终止") # 新增:设置进程终止标志位 process_terminated.set() # 启动任务检测进程 check_process = multiprocessing.Process(target=check_task_exists) check_process.start() # 等待所有进程完成 query_process.join() interrupt_process.join() check_process.join() if __name__ == "__main__": main() ``` 实际测试结果: ```shell # 6.8 版本 [2025-02-08 15:17:21] 发起查询...+() [2025-02-08 15:17:22] 发出中断查询信号...+() [2025-02-08 15:17:22] 尝试中断查询进程...+() [2025-02-08 15:17:22] 查询进程已终止+() [2025-02-08 15:17:23] 检查任务是否存在...+() [2025-02-08 15:17:23] 任务仍存在:+({'fYMNv_KxQGCGzhgfMxPXuA': {......}},) ``` 可以看到在查询任务被终止后 1s 再去检查,任务仍然存在 ```shell # 7.10 版本 [2025-02-08 15:18:16] 发起查询...+() [2025-02-08 15:18:17] 发出中断查询信号...+() [2025-02-08 15:18:17] 尝试中断查询进程...+() [2025-02-08 15:18:17] 查询进程已终止+() [2025-02-08 15:18:18] 检查任务是否存在...+() [2025-02-08 15:18:18] 任务已消失+() ``` 这里可以看到任务已经检测不到了。 ### 关于 timeout 配置 这里展开讨论下,timeout 配置。超时回收处理是一个‘best effort’行为。 > (Optional, time units) Specifies the period of time to wait for a response. If no response is received before the timeout expires, the request fails and returns an error. Defaults to no timeout. > the search request is more of a best effort and does not guarantee that the request will never last longer than the specified amount of time. ## 异步搜索 ### 使用方法 可以让用户进行异步的搜索,可以通过相关参数进行检查维护该搜索的状态和结果。比较合适查询量较大但对延迟要求较低的查询,进行精细化的管理控制。 注意:这里的参数基本都是添加到 url 上的,并不是添加到 request body 上的。 ``` POST test_index/_async_search?keep_on_completion=true { "query": { "match_all": {} } } ``` 注:这里为了产生查询结果 id 使用了 keep_on_completion 参数,这个参数的使用见下面解释。 返回结果,和一般的查询结果不同的是,添加了结果 id 和查询的一些状态数据。 ``` { "id": "Fmk2b0VjM2FEVE9Dbk9TemVyOTlkMncbOFlwRGU2OWZTa2kxNEpoT0Q2bVZrZzozODIz",//结果id,可以用于后续的复查 "is_partial": false,//是否为部分完成结果 "is_running": false,//是否还在查询 "start_time_in_millis": 1738978637287,//查询产生时间戳 "expiration_time_in_millis": 1739410637287,//查询结果过期时间戳 "response": { "took": 1, "timed_out": false, "_shards": { "total": 1, "successful": 1, "skipped": 0, "failed": 0 }, "hits": { "total": { "value": 3, "relation": "eq" }, "max_score": 1, "hits": [······] } } } ``` 管理查询结果 ``` //查询结果和第一次返回的内容一致 GET /_async_search/Fmk2b0VjM2FEVE9Dbk9TemVyOTlkMncbOFlwRGU2OWZTa2kxNEpoT0Q2bVZrZzozODIz //主动删除查询结果 DELETE /_async_search/Fmk2b0VjM2FEVE9Dbk9TemVyOTlkMncbOFlwRGU2OWZTa2kxNEpoT0Q2bVZrZzozODIz ``` #### 关键参数 - wait_for_completion_timeout:参数(默认为 1 秒),这个参数用来设置异步查询的等待时间。**当异步搜索在此时间内完成时,响应将不包括 ID,结果也不会存储在集群中**。 - keep_on_completion:参数(默认为 false)可以设置为 true,**可以强制存储查询结果**,即便在 wait_for_completion_timeout 设置时间内完成搜索,该结果也能被查询到。 - keep_alive:指定异步搜索结果可以被保存多长时间,默认为 5d(5 天)。在此期间之后,正在进行的异步搜索和任何保存的搜索结果将被删除。 - batched_reduce_size:是 Elasticsearch 中的一个配置参数,默认值为 5。它的作用是控制分片结果的部分归并频率,具体来说,它决定了协调节点(coordinating node)在接收到多少个分片的响应后,会执行一次部分结果归并(partial reduction)。 - pre_filter_shard_size:是 Elasticsearch 中与查询执行相关的一个参数,它的默认值为 1,并且不可更改。这个参数的作用是强制 Elasticsearch 在执行查询之前,先进行一轮预过滤(pre-filter),以确定哪些分片(shard)可能包含与查询匹配的文档,从而跳过那些肯定不包含匹配文档的分片。 #### 查询结果存储位置 异步查询的结果部分存储在`.async-search`中,但是进行了程序加密,内容对使用者不可见。 ``` GET .async-search/_search // 返回的结果 ··· "hits": [ { "_index": ".async-search", "_type": "_doc", "_id": "bPNotcTCTV-gSIiZLuK0IA", "_score": 1, "_source": { "result": "i6+xAwFERm1KUVRtOTBZMVJEVkZZdFoxTkphVnBNZFVzd1NVRWJPRmx3UkdVMk9XWlRhMmt4TkVwb1QwUTJiVlpyWnpvek1EWTEAAQEDAD+AAAADP4AAAAAAABR0Sm9yNDVRQlQ3bzBsZTdsYmp0TgAAAARfZG9jAP//////////AwALeyJhIjoxMTExfQoAAAAAAAAAAQEAAAAWOFlwRGU2OWZTa2kxNEpoT0Q2bVZrZwp0ZXN0X2luZGV4Fk5fYmphNXM1UWtpcnU4RXdleVlGSUEAAAA/gAAAAAAAFHRab3I0NVFCVDdvMGxlN2xlVHNrAAAABF9kb2MA//////////8DAAt7ImEiOjExMTJ9CgAAAAAAAAABAQAAABY4WXBEZTY5ZlNraTE0SmhPRDZtVmtnCnRlc3RfaW5kZXgWTl9iamE1czVRa2lydThFd2V5WUZJQQAAAD+AAAAAAAAUdHBvcjQ1UUJUN28wbGU3bGZqc28AAAAEX2RvYwD//////////wMAC3siYSI6MTExM30KAAAAAAAAAAEBAAAAFjhZcERlNjlmU2tpMTRKaE9ENm1Wa2cKdGVzdF9pbmRleBZOX2JqYTVzNVFraXJ1OEV3ZXlZRklBAAAAAAAAAAAAAgABAQEAAAAAAAsAAAAAAAABlOMuvCQAAAGU/O6IJA==", "headers": {}, "expiration_time": 1739410278436, "response_headers": {} } }, ··· ``` ## 只投票候选节点 这是一个主候选节点角色的优化,能相对固定 master 节点的位置,减少了选举候选节点过多的问题。 ### 作用 Voting - only master - eligible node(仅参与投票的具备主节点资格的节点)在 Elasticsearch 集群中有以下作用: 1. **参与主节点选举**:该节点参与主节点选举过程,但本身不会成为集群选出的主节点,主要作为选举中的决胜因素(打破平局)。 2. **保障高可用性**:在高可用性(HA)集群中,至少需要三个具备主节点资格的节点,其中至少两个不能是仅参与投票的节点,这样即使有一个节点故障,集群仍能选出主节点。 3. **分担选举及状态发布任务**:和普通具备主节点资格的节点一样,在集群状态发布期间承担特定任务。 4. **灵活承担其他角色**:可以同时承担集群中的其他角色,如数据节点;也可以作为专用节点,不承担其他角色。 ### 配置 三个节点的集群:可以配置两个普通主节点资格节点和一个仅参与投票的节点。这样在一个普通主节点故障时,剩下的普通主节点和仅参与投票的节点一起可以完成主节点选举,保证集群的正常运行。 理论上,**主候选节点数量能满足不同区域间的主备切换要求**即可,其余可以都是投票节点。 ## 可搜索快照 注意:这是一个收费功能 ### 实现机制 可搜索快照让你能够通过使用快照来保障数据恢复能力,而非在集群内维护副本分片,从而降低运营成本。 当你将快照中的索引挂载为可搜索快照时,Elasticsearch 会将索引分片复制到集群内的本地存储中。这能确保搜索性能与搜索其他任何索引相当,并尽量减少对访问快照存储库的需求。如果某个节点发生故障,可搜索快照索引的分片会自动从快照存储库中恢复。 搜索可搜索快照索引与搜索其他任何索引的方式相同。**搜索性能与常规索引相当,因为在挂载可搜索快照时,分片数据会被复制到集群中的节点上**。 如果某个节点发生故障,且需要从快照中恢复可搜索快照分片,在 Elasticsearch 将分片分配到其他节点的短暂时间内,集群健康状态将不会显示为绿色。在这些分片重新分配完成之前,对这些分片的搜索将会失败或返回部分结果。 对于搜索频率较低的数据,这能显著节省成本。使用可搜索快照,不再需要额外的索引分片副本以避免数据丢失,这有可能将搜索该数据所需的节点本地存储容量减少一半。同时可搜索快照依赖于备份使用的快照,也不需要额外的空间。 ### 使用建议 1. 从含多索引的快照挂载单个索引时,建议进行使用分隔,创建仅含目标索引的快照副本并挂载,方便独立管理备份与可搜索快照生命周期。 2. 挂载为可搜索快照索引前,建议将索引强制合并为每分片一个段,减少从存储库读取数据的操作和成本。 ### 实际测试 #### 基础配置 前提条件:需要一个镜像使用存储,这里使用 minIO 作为测试 1. 安装 S3 插件,并注册快照库信息 ``` # 在线安装插件 elasticsearch-plugin install repository-s3 # 设置访问minio的信息,elasticsearch的bin目录下,使用minIO中设置的用户名密码 ./elasticsearch-keystore add s3.client.default.access_key ./elasticsearch-keystore add s3.client.default.secret_key # 重载安全设置,然后重启节点 POST _nodes/reload_secure_settings # 注册快照库 PUT _snapshot/my-minio-repository { "type": "s3", "settings": { "bucket": "es-bucket", "endpoint": "http://127.0.0.1:9002", "compress": true } } ``` 2. 挂载需要的快照索引 ``` POST /_snapshot/my-minio-repository/snapshot_es_prp_cmain_20240829/_mount?wait_for_completion=true { "index": "es_prp_cmain_insured_itemkind_detail_formal_20240829", "renamed_index": "test_searchable_snapshot",//挂载时对索引进行重命名 "index_settings": { "index.number_of_replicas": 0 }, "ignore_index_settings": [ "index.refresh_interval" ] } ``` 3. 检查空间占用 ``` GET _cat/indices/test_searchable_snapshot?v health status index uuid pri rep docs.count docs.deleted store.size pri.store.size green open test_searchable_snapshot qROj2flcRdiGOZaejeAmQQ 1 0 10000 0 21.3mb 21.3mb ``` 在系统上也看到了对应 uuid 的文件目录 ``` [root@hcss-ecs 0]# ls _state snapshot_cache translog [root@hcss-ecs 0]# pwd /data/elasticsearch-7.10.2/data/nodes/0/indices/qROj2flcRdiGOZaejeAmQQ/0 ``` ## 小结 这篇的内容讲解测试的相对较细,对于查询的自动取消和异步查询增加了 ES 查询任务的灵活性;只投票节点也是加强了主节点选举的稳定性;可搜索快照是成本和功能的均衡方法,对于日志场景的使用是一个不错的选择。