StarRocks Elasticsearch 外表查询源码解析

文章随手写的,有需要的自己看看。

ES 相关概念

index:index 在 es 中不是指索引,其对应数据库中的 table 概念。一个 index 含有多个 document,一个 document 是一个 json。

mapping:针对于 index 来说的,指一个 index 的结构,对应数据库中的 table schema。

mapping type:一般也叫 type,在老版 ES 中,type 的默认值为 _doc,但是在 ES8 后移除了 type。但是 SR 有对 type 做出了兼容。

doc_values:ES 为了更加高效的支持 sorting 和 aggregations 操作,会对大部分字段被创建时自动创建对应的 doc_value(doc_values 采用列式存储)。然而 text 和 annotated_text 不支持创建对应的doc_values。

_source:可以理解为原始数据,即用户存进去是什么样就是什么样,不会被改动。比如数组打平问题,假设用户输入数组 [1, [2, 3]] ,在 _source 字段中,仍然以原样 [1, [2, 3] 存储,但是在 doc_values 中会被自动打平成 [1, 2, 3] 。

keyword :如果有些文本字段在 ES 中不希望被分词索引,可以使用 keyword 类型。

text 和 keyword 的区别:

text:

  • 会分词,然后进行索引
  • 支持模糊、精确查询
  • 不支持聚合
  • 不支持 doc_value

keyword:

  • 不进行分词,直接索引
  • 支持模糊、精确查询
  • 支持聚合,keyword 字段允许被存储在 doc_values 中

更多参考:https://cloud.tencent.com/developer/article/1672344

Create Table 流程

收到 create table 语句后,在 LocalMetastore 中执行 createEsTable() 方法。

createEsTable() 中创建 EsTable 后,将其注册到 EsRepository 中。EsRepository 会在内存中负责维护 SR 中所有的 EsTable 和 EsRestClient。其中每一个 EsTable 都有一个对应的 EsRestClient。

public class EsRepository extends LeaderDaemon {
    private Map<Long, EsTable> esTables;
    private Map<Long, EsRestClient> esClients;
}

EsTable:顾名思义,保存 ES table 相关信息。

EsRestClient:用于发起 http 请求,通过 restful 协议和 ES 交互。

EsRepository 通过继承 LeaderDaemon 类,实现在 FE 启动后定时和 ES 集群同步 EsTable 的元信息。

为什么 HiveRepository 和 IcebergRepository 不这么做。

protected void runAfterCatalogReady() {
    for (EsTable esTable : esTables.values()) {
        try {
            esTable.syncTableMetaData(esClients.get(esTable.getId()));
        } catch (Throwable e) {
            LOG.warn("Exception happens when fetch index [{}] meta data from remote es cluster", esTable.getName(),
                    e);
            esTable.setEsTablePartitions(null);
            esTable.setLastMetaDataSyncException(e);
        }
    }
}

EsTable 元信息同步

EsRepository 会定时调用 EsTable 中的 syncTableMetaData(EsRestClient client) 方法同步 EsTable 和 ES 集群中的元信息。

元信息同步由 EsMetaStateTracker 负责,在同步前会创建一个 SearchContext,这个 context 会贯穿同步的三个阶段。拿到的信息都会放在 SearchContext 中。

VersionPhase:

通过 GET ``/ 获得 ES 集群信息,其实就拿了个 version.number。

{
    "name": "00ec8680adec",
    "cluster_name": "docker-cluster",
    "cluster_uuid": "nkNEidmtS_yocGW7wIHdlQ",
    "version": {
        "number": "8.3.2",
        "build_type": "docker",
        "build_hash": "8b0b1f23fbebecc3c88e4464319dea8989f374fd",
        "build_date": "2022-07-06T15:15:15.901688194Z",
        "build_snapshot": false,
        "lucene_version": "9.2.0",
        "minimum_wire_compatibility_version": "7.17.0",
        "minimum_index_compatibility_version": "7.0.0"
    },
    "tagline": "You Know, for Search"
}

MappingPhase:

通过 GET {index}/_mapping 获取指定 index 的 mapping。

{
    "index": {
        "mappings": {
            "properties": {
                "age": {
                    "type": "long"
                },
                "array1": {
                    "type": "long"
                },
                "name": {
                    "type": "text",
                    "fields": {
                        "keyword": {
                            "type": "keyword",
                            "ignore_above": 256
                        }
                    }
                }
            }
        }
    }
}

在 MappingPhase,使用如下代码解析获取的 json 数据。

for (Column col : searchContext.columns()) {
    String colName = col.getName();
    // if column exists in StarRocks Table but no found in ES's mapping, we choose to ignore this situation?
    if (!properties.has(colName)) {
        continue;
    }
    JSONObject fieldObject = properties.optJSONObject(colName);

    resolveKeywordFields(searchContext, fieldObject, colName);
    resolveDocValuesFields(searchContext, fieldObject, colName);
}

resolveKeywordFields() 会把 keyword field 放到 searchContext.resolveKeywordFields 中,resolveDocValuesFields() 会把 doc_values field 放到 searchContext.docValueFieldsContext 中

PartitionPhase

通过 GET ``/{index}/_search_shards 获取 index 在 ES 中的分片。

BUG 来源地:后面我们 be 发送请求是根据 shard 中的 ip 地址,而不是添加外表时指定的地址。如果 ES 中 shard 地址和 SR 添加外表的地址不同,会导致 BE 无法正常请求 ES。这种情况通常发生于一台机器会有多个网卡的情况,然后用户又没有指定 ES 的 IP。

{
    "nodes": {
        "wuGFWnAjSg6xXidaxX7azQ": {
            "name": "node-1",
            "ephemeral_id": "5nXFzMFTRxai5PMN_fLYMQ",
            "transport_address": "172.26.195.67:9200",
            "attributes": {}
        },
        "x-uhMBJ9SuGxYBLMGpwwWw": {
            "name": "node-2",
            "ephemeral_id": "Y1Oct7tSRaC6L-F93Sw3tw",
            "transport_address": "172.26.195.68:9200",
            "attributes": {}
        }
    },
    "indices": {
        "danny": {}
    },
    "shards": [
        [
            {
                "state": "STARTED",
                "primary": true,
                "node": "x-uhMBJ9SuGxYBLMGpwwWw",
                "relocating_node": null,
                "shard": 0,
                "index": "danny",
                "allocation_id": {
                    "id": "d-TAtX0hS4WxUVzy0S3HuA"
                }
            },
            {
                "state": "STARTED",
                "primary": false,
                "node": "wuGFWnAjSg6xXidaxX7azQ",
                "relocating_node": null,
                "shard": 0,
                "index": "danny",
                "allocation_id": {
                    "id": "-PEjbmMBQIOqbCEQcFidEw"
                }
            }
        ],
        [
            {
                "state": "STARTED",
                "primary": true,
                "node": "wuGFWnAjSg6xXidaxX7azQ",
                "relocating_node": null,
                "shard": 1,
                "index": "danny",
                "allocation_id": {
                    "id": "dKDxXub4RcKXIzf59oc3vQ"
                }
            },
            {
                "state": "STARTED",
                "primary": false,
                "node": "x-uhMBJ9SuGxYBLMGpwwWw",
                "relocating_node": null,
                "shard": 1,
                "index": "danny",
                "allocation_id": {
                    "id": "Ew-HGVb7RfW8kZPy4nBbiA"
                }
            }
        ],
        [
            {
                "state": "STARTED",
                "primary": true,
                "node": "x-uhMBJ9SuGxYBLMGpwwWw",
                "relocating_node": null,
                "shard": 2,
                "index": "danny",
                "allocation_id": {
                    "id": "cy-71kM_RLGDPJyeVwzLJw"
                }
            },
            {
                "state": "STARTED",
                "primary": false,
                "node": "wuGFWnAjSg6xXidaxX7azQ",
                "relocating_node": null,
                "shard": 2,
                "index": "danny",
                "allocation_id": {
                    "id": "ClwYb-LxQ7e319c-8yI9eg"
                }
            }
        ],
        [
            {
                "state": "STARTED",
                "primary": false,
                "node": "x-uhMBJ9SuGxYBLMGpwwWw",
                "relocating_node": null,
                "shard": 3,
                "index": "danny",
                "allocation_id": {
                    "id": "bWh7KAPITcW10_1uKYPPEA"
                }
            },
            {
                "state": "STARTED",
                "primary": true,
                "node": "wuGFWnAjSg6xXidaxX7azQ",
                "relocating_node": null,
                "shard": 3,
                "index": "danny",
                "allocation_id": {
                    "id": "p-Lw1PFkQdy5CgUfLdhPZA"
                }
            }
        ],
        [
            {
                "state": "STARTED",
                "primary": true,
                "node": "x-uhMBJ9SuGxYBLMGpwwWw",
                "relocating_node": null,
                "shard": 4,
                "index": "danny",
                "allocation_id": {
                    "id": "qva4hTx6QU2LR19453jazA"
                }
            },
            {
                "state": "STARTED",
                "primary": false,
                "node": "wuGFWnAjSg6xXidaxX7azQ",
                "relocating_node": null,
                "shard": 4,
                "index": "danny",
                "allocation_id": {
                    "id": "125HJtYCS62qxeve_onPDA"
                }
            }
        ]
    ]
}

一个 index 会有多个 shard,一个 shard 又有多个副本,然后这些 shard 会分布在多个 node 上。SR 以 EsShardPartitions 保存一个 index 所有的 shard 信息。

public class EsShardPartitions {
    private final String indexName;
    // shardid -> host1, host2, host3
    private Map<Integer, List<EsShardRouting>> shardRoutings;
}

再通过 GET /_nodes/http 获取节点地址信息,并将地址信息写入上面的 EsShardPartitions 中。

{
    "_nodes": {
        "total": 2,
        "successful": 2,
        "failed": 0
    },
    "cluster_name": "wyf",
    "nodes": {
        "wuGFWnAjSg6xXidaxX7azQ": {
            "name": "node-1",
            "transport_address": "172.26.195.67:9200",
            "host": "172.26.195.67",
            "ip": "172.26.195.67",
            "version": "6.5.3",
            "build_flavor": "default",
            "build_type": "tar",
            "build_hash": "159a78a",
            "roles": [
                "master",
                "data",
                "ingest"
            ],
            "http": {
                "bound_address": [
                    "172.26.195.67:8200"
                ],
                "publish_address": "172.26.195.67:8200",
                "max_content_length_in_bytes": 104857600
            }
        },
        "x-uhMBJ9SuGxYBLMGpwwWw": {
            "name": "node-2",
            "transport_address": "172.26.195.68:9200",
            "host": "172.26.195.68",
            "ip": "172.26.195.68",
            "version": "6.5.3",
            "build_flavor": "default",
            "build_type": "tar",
            "build_hash": "159a78a",
            "roles": [
                "master",
                "data",
                "ingest"
            ],
            "http": {
                "bound_address": [
                    "172.26.195.68:8200"
                ],
                "publish_address": "172.26.195.68:8200",
                "max_content_length_in_bytes": 104857600
            }
        }
    }
}

总的来说,PartitionPhase 就是获取一个 index 所有的 shard 信息以及该 shard 所在 node 的信息(IP 等等)。

发起查询

FE

EsScanNode 负责 fragment plan 的生成。

protected void toThrift(TPlanNode msg) {
    if (EsTable.TRANSPORT_HTTP.equals(table.getTransport())) {
        msg.node_type = TPlanNodeType.ES_HTTP_SCAN_NODE;
    } else {
        msg.node_type = TPlanNodeType.ES_SCAN_NODE;
    }
    Map<String, String> properties = Maps.newHashMap();
    properties.put(EsTable.USER, table.getUserName());
    properties.put(EsTable.PASSWORD, table.getPasswd());
    properties.put(EsTable.ES_NET_SSL, String.valueOf(table.sslEnabled()));
    TEsScanNode esScanNode = new TEsScanNode(desc.getId().asInt());
    esScanNode.setProperties(properties);
    if (table.isDocValueScanEnable()) {
        esScanNode.setDocvalue_context(table.docValueContext());
        properties.put(EsTable.DOC_VALUES_MODE, String.valueOf(useDocValueScan(desc, table.docValueContext())));
    }
    if (table.isKeywordSniffEnable() && table.fieldsContext().size() > 0) {
        esScanNode.setFields_context(table.fieldsContext());
    }
    msg.es_scan_node = esScanNode;
}

可以看到主要就是装填一些关于 ES 集群的信息,比如是否开启 SSL,ES 的鉴权用户名密码,巴拉巴拉。

主要关注 intuseDocValueScan() 方法,其判断是否让 be 使用 doc_value 进行扫描。如果 select 的字段中存在不支持 doc_value 的字段(比如 text),则降级为 _source 搜索,否则使用 doc_values 进行搜索。

其他重要方法:

List<TScanRangeLocations> computeShardLocations():计算每一个 shard 应该被分配到哪个 BE 上进行计算(争取被计算的 shard 所在节点和 BE 是同一个节点),类似于 hdfs 通过移动计算的方式减少网络开销。

getNodeExplainString():存粹输出 explain 信息用的。因为我们具体的 Query DSL 是在 BE 生成的,正常情况下 FE 是拿不到具体的 Query DSL 信息。但是为了生成 explain 信息,SR 实现了 QueryBuilder 和 QueryConverter 这两个类,模拟 BE 的行为生成了 Query DSL 信息,返回个用户。

Q:既然 FE 都已经生成了 Query DSL 信息,为什么 BE 还要再去生成一遍,直接发下去不就好了?

A:将来会改。

BE

ESDataSource 中的 open() 负责查询前的准备工作。

open()

open() 分别调用如下主要方法:

  • _build_conjuncts():将 ES 能够处理的 predicates 用 EsPredicate 包装起来,存放在 _predicates 字段中。
  • _normalize_conjuncts():扫描 _conjunct_ctxs 存放的 Expr 表达式,移除在 _build_conjuncts() 中已经下推的 Expr。不然就重复应用 Expr 了。
  • _create_scanner():创建 ESScanReader。
  • 在创建 ESScanReader 时,会调用 ESScrollQueryBuilder::build() 方法生成 Query DSL 语句。长下面这个样子:
  • JSON { "query": { "bool": { "filter": [{ "bool": { "should": [{ "term": { "age": "1" } }] } }] } }, "stored_fields": "_none_", "docvalue_fields": [ "age", "array1" ], "sort": [ "_doc" ], "size": 4096 }

ESScanReader 创建过程中,里面主要涉及请求 url 的组装,鉴权用户名密码的组装。

注意如下方法:

if (props.find(KEY_TERMINATE_AFTER) != props.end()) {
    _exactly_once = true;
    // just send a normal search  against the elasticsearch with additional terminate_after param to achieve terminate early effect when limit take effect
    if (_type.empty()) {
        _search_url = fmt::format("{}/{}/_search?terminate_after={}&preference=_shards:{}&{}", _target, _index,
                                  props.at(KEY_TERMINATE_AFTER), _shards, filter_path);
    } else {
        _search_url = fmt::format("{}/{}/{}/_search?terminate_after={}&preference=_shards:{}&{}", _target, _index,
                                  _type, props.at(KEY_TERMINATE_AFTER), _shards, filter_path);
    }
} else {
    _exactly_once = false;
    // scroll request for scanning
    // add terminate_after for the first scroll to avoid decompress all postings list
    if (_type.empty()) {
        _init_scroll_url = fmt::format("{}/{}/_search?scroll={}&preference=_shards:{}&{}", _target, _index,
                                       _scroll_keep_alive, _shards, filter_path);
    } else {
        _init_scroll_url = fmt::format("{}/{}/{}/_search?scroll={}&preference=_shards:{}&{}", _target, _index,
                                       _type, _scroll_keep_alive, _shards, filter_path);
    }
    _next_scroll_url = _target + REQUEST_SEARCH_SCROLL_PATH + "?" + filter_path;
}

可以看到 _exactly_once 变量。当上层向下发起的请求存在 limit 算子,且允许该 limit 算子下推到 ES 时,这里就会设置 _exactly_once 为 true。

只有当所有算子允许下推到 ES 时,limit 算子才允许跟着下推到 ES。

exactly_once=true 模式下,BE 会直接使用 _search 进行搜索,请求一次就行。如果 exactly_once=false 模式,BE 在执行搜索时会携带 scroll 参数,类似于一个游标,告诉 ES 我上次读取到哪里,然后你从上次读的地方继续返回(可以理解为分页查询,每次返回一点)。

数据量比较大时肯定采用 scroll 模式读取。

其后,ESDataSource 也会调用 ESScanReader的 open() 方法。因为在 SR 的架构设计中,open() 方法是做准备工作,不执行请求。然而相对于 ES 来说,只要你请求了 URL 就返回结果,不存在准备工作。所以可以看到如下代码:

Status ESScanReader::open() {
    _is_first = true;
    if (_exactly_once) {
        RETURN_IF_ERROR(_network_client.init(_search_url));
    } else {
        RETURN_IF_ERROR(_network_client.init(_init_scroll_url));
    }
    // phase open, we cached the first response for `get_next` phase
    Status status = _network_client.execute_post_request(_query, &_cached_response);
    if (!status.ok() || _network_client.get_http_status() != 200) {
        return Status::InternalError(err_msg);
    }
    VLOG(1) << "open _cached response: " << _cached_response;
    return Status::OK();
}

open() 阶段,我们发起了请求,验证了 url 的合法性,ES 就直接把结果返回给我们了。SR 不能白白浪费这一次请求,故先把返回的 json 结果暂存在 _cached_response 中,等到 get_next() 被调用的时候直接解析。

get_next()

ESDataSource

ESDataSouce 中的 get_next() 主要分为三步:

Status ESDataSource::get_next(RuntimeState* state, vectorized::ChunkPtr* chunk) {
    RETURN_IF_ERROR(_es_reader->get_next(&_batch_eof, _es_scroll_parser));
    RETURN_IF_ERROR(_es_scroll_parser->fill_chunk(state, chunk, &_line_eof));
    RETURN_IF_ERROR(ExecNode::eval_conjuncts(_conjunct_ctxs, ck));

    return Status::OK();
}
  1. 调用 ESScanReader 的 get_next() 方法,在 ESScanReader 中,会直接调用 ScrollParser 的 parse() 方法。如果是第一次请求,会使用 open() 阶段缓存的 _cached_response。
  2. 调用 ScrollParser 的 fill_chunk() 方法,把 parse() 的结果保存到 chunk 中。chunk 中有很多列,就一个一个 column 填充。目前按行填充,效率低?
  3. eval_conjuncts() 对 fill 后的 chunk 执行表达式。执行的是那些没有被下推到 ES 的表达式。
ESScanReader

ESScanReader 中的 get_next() 逻辑比较简单,不做阐述。主要就是处理 scroll 读取逻辑,每次读一部分。然后再调用 ScrollParser 的 parse() 方法解析得到的 json 数据。

ScrollParser

这里稍微说明下 ScrollParser 的 fill_chunk() 方法,parse() 其实只是保存了 json 数据,真正的解析工作由 fill_chunk() 方法负责。

Status ScrollParser::fill_chunk(RuntimeState* state, ChunkPtr* chunk, bool* line_eos) {

    *chunk = std::make_shared<Chunk>();
    std::vector<SlotDescriptor*> slot_descs = _tuple_desc->slots();

    // 剩余待插入数据条数
    size_t left_sz = _size - _cur_line;
    // 保证待插入数据条数不超过 chunk_size()
    size_t fill_sz = std::min(left_sz, (size_t)state->chunk_size());

    // 初始化 chunk 里面的 column,可以使用 ChunkHelper 简化?
    for (auto& slot_desc : slot_descs) {
        ColumnPtr column = ColumnHelper::create_column(slot_desc->type(), slot_desc->is_nullable());
        column->reserve(fill_sz);
        (*chunk)->append_column(std::move(column), slot_desc->id());
    }

    auto slots = _tuple_desc->slots();
    // TODO: we could fill chunk by column rather than row
    for (size_t i = 0; i < fill_sz; ++i) {
        const rapidjson::Value& obj = _inner_hits_node[_cur_line + i];
        // 判断是否读取 doc_values,在返回的结果中,不可能同时存在 fields 和 _source 字段。
        // fields 字段就是 doc_values 的意思。
        bool pure_doc_value = _is_pure_doc_value(obj);
        bool has_source = obj.HasMember(FIELD_SOURCE);
        bool has_fields = obj.HasMember(FIELD_FIELDS);

        // fields 和 _source 同时不存在,就直接给 column 插默认值。
        if (!has_source && !has_fields) {
            for (size_t col_idx = 0; col_idx < slots.size(); ++col_idx) {
                SlotDescriptor* slot_desc = slot_descs[col_idx];
                ColumnPtr& column = (*chunk)->get_column_by_slot_id(slot_desc->id());
                if (slot_desc->is_nullable()) {
                    column->append_default();
                } else {
                    return Status::DataQualityError(
                            fmt::format("col `{}` is not null, but value from ES is null", slot_desc->col_name()));
                }
            }
            continue;
        }
        // 检查,前面说了,fields 和 _source 必定存在一个,而且两个不可能同时存在
        DCHECK(has_source ^ has_fields);
        const rapidjson::Value& line = has_source ? obj[FIELD_SOURCE] : obj[FIELD_FIELDS];

        for (size_t col_idx = 0; col_idx < slots.size(); ++col_idx) {
            SlotDescriptor* slot_desc = slot_descs[col_idx];
            ColumnPtr& column = (*chunk)->get_column_by_slot_id(slot_desc->id());

            // _id field must exists in every document, this is guaranteed by ES
            // if _id was found in tuple, we would get `_id` value from inner-hit node
            // json-format response would like below:
            //    "hits": {
            //            "hits": [
            //                {
            //                    "_id": "UhHNc3IB8XwmcbhBk1ES",
            //                    "_source": {
            //                          "k": 201,
            //                    }
            //                }
            //            ]
            //        }
            // 这个分支不会走到,留着干什么未知?而且在 ES8中,貌似根本就没有 _id 字段了?
            if (slot_desc->col_name() == FIELD_ID) {
                // actually this branch will not be reached, this is guaranteed by Doris FE.
                if (pure_doc_value) {
                    return Status::RuntimeError("obtain `_id` is not supported in doc_values mode");
                }
                PrimitiveType type = slot_desc->type().type;
                DCHECK(type == TYPE_CHAR || type == TYPE_VARCHAR);

                const auto& _id = obj[FIELD_ID];
                Slice slice(_id.GetString(), _id.GetStringLength());
                _append_data<TYPE_VARCHAR>(column.get(), slice);

                continue;
            }

            // if pure_doc_value enabled, docvalue_context must contains the key
            // todo: need move all `pure_docvalue` for every tuple outside fill_tuple
            //  should check pure_docvalue for one table scan not every tuple
            const char* col_name = pure_doc_value ? _doc_value_context->at(slot_desc->col_name()).c_str()
                                                  : slot_desc->col_name().c_str();

            auto has_col = line.HasMember(col_name);
            if (has_col) {
                const rapidjson::Value& col = line[col_name];
                // doc value
                bool is_null = col.IsNull() || (pure_doc_value && col.IsArray() && (col.Empty() || col[0].IsNull()));
                if (!is_null) {
                    // append value from ES to column
                    RETURN_IF_ERROR(_append_value_from_json_val(column.get(), slot_desc->type(), col, pure_doc_value));
                    continue;
                }
                // handle null col
                if (slot_desc->is_nullable()) {
                    _append_null(column.get());
                } else {
                    return Status::DataQualityError(
                            fmt::format("col `{}` is not null, but value from ES is null", slot_desc->col_name()));
                }
            } else {
                // if don't has col in ES , append a default value
                _append_null(column.get());
            }
        }
    }
    _cur_line += fill_sz;
    return Status::OK();
}

ScrollParser 中由_append_value_from_json_val() 具体负责将一个 value 值插入 column 中。比较简单,就是用了模版方法,包含了各种类型转换。

7 条回复 A文章作者 M管理员
  1. 为什么这个ElasticSearch不能直接对接jni scanner? 这样直接升级ES的Jar就行了。里面一坨http的啥东西,还要拼凑json.

  2. 你好。请问StarRocks支持Opensearch外部表吗,测试了一下,不行。不知道是我配置问题还是不支持的问题。

    • 他们兼容 es 的接口吗,不兼容肯定不支持啊。

    • Smith

      Opensearch是基于ES 7.10开发的,兼容ES 7.10及以下的接口。StarRocks报错是这个[42000][1064] Failed to connect to ES server, errmsg is: The requested URL returned error: 400。应该是兼容性问题

    • 可以看看 fe.log,里面有更加具体的报错信息

    • Smith

      感谢,我去看看。

  3. Smith 🐂B!

搜索