ES 踩坑记:Set Processor 字段更新引发的 _source 污染
Easysearch
ES
2025-02-27

问题背景 #

社区的一个伙伴想对一个 integer 的字段类型添加一个 keyword 类型的子字段,然后进行精确匹配的查询优化,提高查询的速度。

整个索引数据量不大,并不想进行 reindex 这样的复杂操作,就想到了使用 update_by_query 的存量数据更新。

所以我们测试了下面这套方案,在设置完字段的子字段后,利用 set processor 来对这个子字段进行 update_by_query

操作记录:

# 测试索引
PUT /test
{
  "mappings": {
    "properties": {
      "status": {
        "type": "integer"
      }
    }
  }
}
# 测试数据
POST /test/_bulk
{"index":{}}
{"status":404}
{"index":{}}
{"status":500}

GET test/_search

# 添加子字段
PUT test/_mapping
{
  "properties": {
    "status": {
      "type": "integer",
      "fields": {
        "keyword": {
          "type": "keyword"
        }
      }
    }
  }
}

GET test/_search

#创建管道pipeline.实现更新逻辑
PUT _ingest/pipeline/copy_status_to_keyword
{
  "description": "resets the value of status and subfields",
  "processors": [
    {
      "set": {
        "field": "status",
        "value": "{{{status}}}"
      }
    }
  ]
}

#update 执行
POST test/_update_by_query?pipeline=copy_status_to_keyword
{
  "query": {
    "bool": {
      "must_not": {
        "exists": {
          "field": "status.keyword"
        }
      },
      "must": {
        "exists": {
          "field": "status"
        }
      }
    }
  }
}


GET test/_search
{
  "query": {
    "exists": {
      "field": "status.keyword"
    }
  }
}

# 返回结果
   "hits": [
      {
        "_index": "test_set",
        "_type": "_doc",
        "_id": "G7zHNpUBLvnTvXTpVIC4",
        "_score": 1,
        "_source": {
          "status": "404"
        }
      },
      {
        "_index": "test_set",
        "_type": "_doc",
        "_id": "HLzHNpUBLvnTvXTpVIC4",
        "_score": 1,
        "_source": {
          "status": "500"
        }
      }
    ]

测试检查了一下,status.keyword 可以被 search,可以满足我们的预期要求。

但是,小伙伴到正式上线的时候却发生了问题。应用程序读取发现 _source 中 status 的类型变了,开始报错字段类型不符合。

# 写入的时候
    "hits": [
      {
        "_index": "test",
        "_type": "_doc",
        "_id": "2ry5NpUBLvnTvXTp1F5z",
        "_score": 1,
        "_source": {
          "status": 404 # 这里还是 integer 类型
        }
      },
      {
        "_index": "test",
        "_type": "_doc",
        "_id": "27y5NpUBLvnTvXTp1F5z",
        "_score": 1,
        "_source": {
          "status": 500
        }
      }
    ]

# update 完成后
   "hits": [
      {
        "_index": "test",
        "_type": "_doc",
        "_id": "2ry5NpUBLvnTvXTp1F5z",
        "_score": 1,
        "_source": {
          "status": "404" # 字段内容添加上了引号,成为了 string 类型
        }
      },
      {
        "_index": "test",
        "_type": "_doc",
        "_id": "27y5NpUBLvnTvXTp1F5z",
        "_score": 1,
        "_source": {
          "status": "500"
        }
      }
    ]

解决方案 #

还好小伙伴那边有数据主备库,赶紧做了切换。然后开始对已有的数据进行修复。

最终商定了下面两个方案进行 fix。

  1. 用 script 保持数据类型重写
POST test/_update_by_query
{
"script": {
  "source": """
    if (ctx._source.status instanceof String) {
      ctx._source.status = Integer.parseInt(ctx._source.status);
    }
  """,
  "lang": "painless"
  }
}
  1. 查询结果读取 docvalue 而不是 source。这个方案可以绕过这个问题,但是需要改动应用程序。
GET test/_search
{
  "_source": false,
  "docvalue_fields": [
    "status"
  ]
}

# 返回
    "hits": [
      {
        "_index": "test",
        "_type": "_doc",
        "_id": "wLy-NpUBLvnTvXTpRGvw",
        "_score": 1,
        "fields": {
          "status": [
            404
          ]
        }
      },
      {
        "_index": "test",
        "_type": "_doc",
        "_id": "wby-NpUBLvnTvXTpRGvw",
        "_score": 1,
        "fields": {
          "status": [
            500
          ]
        }
      }
    ]

问题分析 #

好了,现在我们回过头来分析一下之前方案出现的问题,用 set proceesor 为什么会导致 source 内的字段类型从 int 变成 string 呢?

因为 script 脚本写法能够成功,而 set 会失败,我们从 set 的使用入手,去看看代码里是不是有什么线索?

set processor 问题的细节 #

让我们深入分析值类型转换的核心代码路径:

// SetProcessor.java
document.setFieldValue(field, value, ignoreEmptyValue);

这里的 value 参数类型为 ValueSource,

// SetProcessor.Factory.create()
Object value = ConfigurationUtils.readObject(TYPE, processorTag, config, "value");
ValueSource valueSource = ValueSource.wrap(value, scriptService);

其核心实现逻辑在接口 ValueSource.java 中:

// ValueSource.java 关键方法 59行
public static ValueSource wrap(Object value, ScriptService scriptService) {
    ......
    } else if (value instanceof String) {
            // This check is here because the DEFAULT_TEMPLATE_LANG(mustache) is not
            // installed for use by REST tests. `value` will not be
            // modified if templating is not available
            if (scriptService.isLangSupported(DEFAULT_TEMPLATE_LANG) && ((String) value).contains("{{")) {
                Script script = new Script(ScriptType.INLINE, DEFAULT_TEMPLATE_LANG, (String) value, Collections.emptyMap());
                return new TemplatedValue(scriptService.compile(script, TemplateScript.CONTEXT));
            } else {
                return new ObjectValue(value);
            }
        }
        ......
}

当配置中的 value 值为"{{{status}}}“字符串时,创建 TemplateValue 实例。

这里 “{{{status}}}” 的写法属于 Mustache 语法,一种轻量级的模板引擎语法。ES 在 search template 中会主要应用,在 set processor 也用了 Mustache 进行字段内容的引用。

// 在 ValueSource.java 内部
private static class TemplateValue extends ValueSource {
    private final TemplateScript.Factory template;

    @Override
        public Object copyAndResolve(Map<String, Object> model) {
            return template.newInstance(model).execute();
        }

}

继续看抽象类 TemplateScript.java#execute() ,这个方法在定义的时候已经明确声明返回的是 string

    /** Run a template and return the resulting string, encoded in utf8 bytes. */
    public abstract String execute();

而实现的子类则很明显是 MustacheExecutableScript.execute(),即 Mustache 语法引擎的实现。

private class MustacheExecutableScript extends TemplateScript {
  ......
  @Override
        public String execute() {
            final StringWriter writer = new StringWriter();
            try {
                // crazy reflection here
                SpecialPermission.check();
                AccessController.doPrivileged((PrivilegedAction<Void>) () -> {
                    template.execute(writer, params);
                    return null;
                });
            } catch (Exception e) {
                logger.error((Supplier<?>) () -> new ParameterizedMessage("Error running {}", template), e);
                throw new GeneralScriptException("Error running " + template, e);
            }
            return writer.toString();
        }
    ......

这里也可以印证了字段内容类型被强制转为字符串

类型转换过程 #

deepseek 帮我总结的类型转换过程如下:

sequenceDiagram
    participant SetProcessor
    participant ValueSource
    participant TemplateValue
    participant TemplateScript
    participant MustacheEngine

    SetProcessor->>ValueSource: wrap("{{status}}")
    ValueSource->>TemplateValue: 创建实例
    SetProcessor->>TemplateValue: copyAndResolve(doc)
    TemplateValue->>TemplateScript: newInstance(doc)
    TemplateScript->>MustacheEngine: compile("{{status}}")
    MustacheEngine-->>TemplateScript: 返回Mustache编译后模板实现
    TemplateValue->>TemplateScript: execute()
    MustacheEngine-->>TemplateScript: 在这里将结果渲染为String
    TemplateValue-->>SetProcessor: 返回"200"(String)

小结 #

所以,这里 source 内字段类型被转变的原因,是 ES 对 set processor 使用 Mustache 语法产生的结果值进行了特殊处理,将内容都处理成了 string。

假设这次使用 set 去处理的值都是一个默认值 404 ,则不会出现这个问题

PUT _ingest/pipeline/copy_status_to_keyword_1
{
  "description": "resets the value of status and subfields",
  "processors": [
    {
      "set": {
        "field": "status",
        "value": 404
      }
    }
  ]
}

#update 执行方式
POST test/_update_by_query?pipeline=copy_status_to_keyword_1
{
  "query": {
    "bool": {
      "must_not": {
        "exists": {
          "field": "status.keyword"
        }
      },
      "must": {
        "exists": {
          "field": "status"
        }
      }
    }
  }
}

GET test/_search
# 返回内容
      {
        "_index": "test",
        "_type": "_doc",
        "_id": "tN0QRZUBLvnTvXTpJMTI",
        "_score": 1,
        "_source": {
          "status": 404
        }
      },
      {
        "_index": "test",
        "_type": "_doc",
        "_id": "td0QRZUBLvnTvXTpJMTI",
        "_score": 1,
        "_source": {
          "status": 404
        }
      }

那 ES 在 set 这段 Mustache 语法的处理里,使用 string 作为返回值,大家觉得合理么?如果需要保留原来的数据内容类型,不修改 TemplateScript.java#execute()这个方法可以实现么?

标签
Easysearch x
Gateway x
Console x