Elasticsearch快速入门到实战指南

Elasticsearch(ES)是一个开源的分布式搜索和分析引擎,基于Apache Lucene构建,专为云计算时代设计。它不仅能够处理PB级结构化与非结构化数据,还提供了近实时(NRT)的搜索体验。本指南将带您从基础概念到生产实战,全面掌握ES的核心技术。

1. Elasticsearch核心概念与架构解析

1.1 分布式文档存储与倒排索引原理

1.1.1 倒排索引(Inverted Index)机制

倒排索引是ES实现快速全文检索的核心数据结构,其工作原理如下:

1
2
3
4
5
6
7
8
9
10
11
12
graph TD
A[原始文档] --> B[文档分词]
B --> C[构建倒排索引]
C --> D[词汇表]
C --> E[倒排列表]
D --> F[快速定位文档]
E --> F

G[查询"搜索引擎"] --> H[词汇表查找]
H --> I[获取倒排列表]
I --> J[计算相关性]
J --> K[返回排序结果]

倒排索引组成

  • Term Dictionary:词汇表,存储所有唯一的词项
  • Postings List:倒排列表,记录每个词项对应的文档ID列表
  • Term Frequency:词项在文档中出现的频率
  • Position:词项在文档中的位置信息

1.1.2 文档存储流程示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 原始文档
{
"title": "Elasticsearch分布式搜索引擎",
"content": "ES是一个分布式的搜索和分析引擎",
"timestamp": "2024-12-08T10:00:00Z"
}

// 分词后构建的倒排索引
{
"elasticsearch": [doc1, doc5, doc12],
"分布式": [doc1, doc3, doc8],
"搜索": [doc1, doc2, doc7],
"引擎": [doc1, doc4, doc9]
}

1.2 集群、节点、分片与副本机制

1.2.1 集群架构层次

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
graph TB
Cluster[Elasticsearch Cluster]

subgraph Node-1[Node 1 - Master Node]
Node1-Master[Master Process]
Node1-Data[Data Process]
end

subgraph Node-2[Node 2 - Data Node]
Node2-Data[Data Process]
Node2-Shard1[Primary Shard 1]
Node2-Shard2[Replica Shard 2]
end

subgraph Node-3[Node 3 - Data Node]
Node3-Data[Data Process]
Node3-Shard1[Replica Shard 1]
Node3-Shard2[Primary Shard 2]
end

Cluster --> Node-1
Cluster --> Node-2
Cluster --> Node-3

Node2-Shard1 -.-> Node3-Shard1
Node3-Shard2 -.-> Node2-Shard2

1.2.2 核心概念详解

概念 描述 配置示例
Cluster 一个ES集群由一个或多个节点组成 cluster.name: my-application
Node 单个ES实例,可承担不同角色 node.roles: [master, data]
Index 逻辑命名空间,类似数据库 index.number_of_shards: 3
Shard 数据分片,水平扩展单元 index.number_of_replicas: 1
Replica 分片副本,提供高可用 自动分配

1.2.3 分片分配策略

1
2
3
4
5
6
7
8
9
10
11
# 查看集群分片分配
GET _cat/shards?v

# 输出示例
index shard prirep state docs store ip node
myindex 0 p STARTED 1200 5.2mb 10.0.0.1 node-1
myindex 0 r STARTED 1200 5.2mb 10.0.0.2 node-2
myindex 1 p STARTED 1350 5.8mb 10.0.0.3 node-3
myindex 1 r STARTED 1350 5.8mb 10.0.0.1 node-1
myindex 2 p STARTED 1100 4.9mb 10.0.0.2 node-2
myindex 2 r STARTED 1100 4.9mb 10.0.0.3 node-3

1.3 RESTful API设计理念与DSL查询语法

1.3.1 RESTful API设计原则

ES采用RESTful架构,所有操作通过HTTP方法实现:

HTTP方法 路径 描述 示例
GET /{index}/_search 搜索文档 GET /products/_search?q=iphone
PUT /{index}/_mapping 更新映射 PUT /products/_mapping
POST /{index}/_doc 创建文档 POST /products/_doc
DELETE /{index}/_doc/{id} 删除文档 DELETE /products/_doc/1

1.3.2 Query DSL查询语法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
{
"query": {
"bool": {
"must": [
{ "match": { "title": "Elasticsearch" } },
{ "range": { "price": { "gte": 1000 } } }
],
"filter": [
{ "term": { "category": "electronics" } }
],
"should": [
{ "match": { "tags": "popular" } }
],
"must_not": [
{ "term": { "status": "out_of_stock" } }
]
}
},
"sort": [
{ "price": { "order": "desc" } },
{ "_score": { "order": "desc" } }
],
"from": 0,
"size": 10,
"_source": ["title", "price", "category"]
}

2. 环境部署与基础操作

2.1 单机与集群部署最佳实践

2.1.1 单机部署(开发环境)

Docker方式部署

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 拉取ES镜像
docker pull elasticsearch:8.11.0

# 运行单节点容器
docker run -d --name elasticsearch \
-p 9200:9200 -p 9300:9300 \
-e "discovery.type=single-node" \
-e "ES_JAVA_OPTS=-Xms1g -Xmx1g" \
-e "xpack.security.enabled=false" \
-v es-data:/usr/share/elasticsearch/data \
elasticsearch:8.11.0

# 验证部署
curl -X GET "localhost:9200/_cluster/health?pretty"

原生安装

1
2
3
4
5
6
7
8
9
10
11
12
13
# Ubuntu/Debian
wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-8.11.0-linux-x86_64.tar.gz
tar -xzf elasticsearch-8.11.0-linux-x86_64.tar.gz
cd elasticsearch-8.11.0/

# 配置文件优化
# config/elasticsearch.yml
cluster.name: my-cluster
node.name: node-1
path.data: /var/lib/elasticsearch
path.logs: /var/log/elasticsearch
network.host: 0.0.0.0
http.port: 9200

2.1.2 三节点集群部署(生产环境)

Docker Compose集群

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# docker-compose.yml
version: '3.8'
services:
es-master:
image: elasticsearch:8.11.0
container_name: es-master
environment:
- node.name=es-master
- cluster.name=es-cluster
- node.roles=[master]
- discovery.seed_hosts=es-data1,es-data2
- cluster.initial_master_nodes=es-master
- ES_JAVA_OPTS=-Xms2g -Xmx2g
- xpack.security.enabled=false
ports:
- 9200:9200
volumes:
- es-master-data:/usr/share/elasticsearch/data

es-data1:
image: elasticsearch:8.11.0
container_name: es-data1
environment:
- node.name=es-data1
- cluster.name=es-cluster
- node.roles=[data,ingest]
- discovery.seed_hosts=es-master,es-data2
- ES_JAVA_OPTS=-Xms2g -Xmx2g
- xpack.security.enabled=false
volumes:
- es-data1-data:/usr/share/elasticsearch/data

es-data2:
image: elasticsearch:8.11.0
container_name: es-data2
environment:
- node.name=es-data2
- cluster.name=es-cluster
- node.roles=[data,ingest]
- discovery.seed_hosts=es-master,es-data1
- ES_JAVA_OPTS=-Xms2g -Xmx2g
- xpack.security.enabled=false
volumes:
- es-data2-data:/usr/share/elasticsearch/data

volumes:
es-master-data:
es-data1-data:
es-data2-data:

2.2 索引创建与映射(Mapping)配置

2.2.1 创建索引基础配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
# 创建电商产品索引
PUT /products
{
"settings": {
"number_of_shards": 3,
"number_of_replicas": 1,
"analysis": {
"analyzer": {
"ik_analyzer": {
"type": "custom",
"tokenizer": "ik_max_word",
"filter": ["lowercase", "synonym_filter"]
}
},
"filter": {
"synonym_filter": {
"type": "synonym",
"synonyms": ["苹果,apple", "手机,iphone"]
}
}
}
},
"mappings": {
"properties": {
"product_id": {
"type": "keyword"
},
"title": {
"type": "text",
"analyzer": "ik_analyzer",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"description": {
"type": "text",
"analyzer": "ik_analyzer"
},
"price": {
"type": "double"
},
"category": {
"type": "keyword"
},
"tags": {
"type": "keyword"
},
"inventory": {
"type": "integer"
},
"created_at": {
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
},
"location": {
"type": "geo_point"
}
}
}
}

2.2.2 动态模板映射

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
PUT /logs
{
"mappings": {
"dynamic_templates": [
{
"string_fields": {
"match_mapping_type": "string",
"mapping": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
}
}
},
{
"date_fields": {
"match": "*_date",
"mapping": {
"type": "date"
}
}
}
]
}
}

2.3 文档CRUD操作与批量处理

2.3.1 单文档操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# 创建文档
POST /products/_doc/1
{
"product_id": "P001",
"title": "iPhone 15 Pro",
"description": "最新款苹果手机,搭载A17芯片",
"price": 8999.00,
"category": "electronics",
"tags": ["apple", "smartphone", "5G"],
"inventory": 100,
"created_at": "2024-12-08 10:00:00",
"location": {
"lat": 39.9042,
"lon": 116.4074
}
}

# 获取文档
GET /products/_doc/1

# 更新文档(部分更新)
POST /products/_update/1
{
"doc": {
"price": 7999.00,
"inventory": 95
}
}

# 删除文档
DELETE /products/_doc/1

2.3.2 批量处理操作

1
2
3
4
5
6
7
8
9
10
11
12
13
# 批量创建/更新/删除
POST /_bulk
{ "index" : { "_index" : "products", "_id" : "2" } }
{ "product_id": "P002", "title": "MacBook Pro", "price": 12999, "category": "electronics" }
{ "update" : { "_index" : "products", "_id" : "1" } }
{ "doc" : { "inventory" : 90 } }
{ "delete" : { "_index" : "products", "_id" : "3" } }

# 批量搜索
POST /products/_mget
{
"ids": ["1", "2", "3"]
}

2.3.3 Python客户端操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
from elasticsearch import Elasticsearch
from datetime import datetime

# 创建客户端
es = Elasticsearch(['http://localhost:9200'])

# 创建索引
index_body = {
"settings": {
"number_of_shards": 3,
"number_of_replicas": 1
},
"mappings": {
"properties": {
"title": {"type": "text"},
"price": {"type": "double"},
"created_at": {"type": "date"}
}
}
}

# 创建索引(如果不存在)
if not es.indices.exists(index="products"):
es.indices.create(index="products", body=index_body)

# 批量插入数据
from elasticsearch.helpers import bulk

documents = [
{
"_index": "products",
"_id": i,
"_source": {
"title": f"Product {i}",
"price": 100 + i,
"created_at": datetime.now()
}
}
for i in range(1, 101)
]

bulk(es, documents)

# 搜索文档
response = es.search(
index="products",
body={
"query": {
"range": {
"price": {"gte": 150, "lte": 200}
}
}
}
)

print(f"找到 {response['hits']['total']['value']} 个文档")

3. 高级搜索功能实战

3.1 全文检索与相关性评分

3.1.1 TF-IDF与BM25算法对比

TF-IDF(Term Frequency-Inverse Document Frequency)

  • 词频(TF):词项在文档中出现的频率
  • 逆文档频率(IDF):词项在整个文档集合中的稀有程度

BM25(Best Matching 25)

  • ES 5.0+默认算法,改进TF-IDF的饱和问题
  • 考虑文档长度归一化
  • 引入可调参数k1和b
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
// 自定义相似度配置
PUT /products
{
"settings": {
"similarity": {
"custom_bm25": {
"type": "BM25",
"k1": 1.2,
"b": 0.75
}
}
},
"mappings": {
"properties": {
"title": {
"type": "text",
"similarity": "custom_bm25"
}
}
}
}

3.1.2 高级查询DSL示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
// 多字段全文搜索
GET /products/_search
{
"query": {
"multi_match": {
"query": "苹果手机",
"fields": [
"title^2",
"description",
"tags^1.5"
],
"type": "best_fields",
"fuzziness": "AUTO"
}
},
"highlight": {
"fields": {
"title": {},
"description": {}
},
"pre_tags": ["<mark>"],
"post_tags": ["</mark>"]
}
}

// 地理位置查询
GET /products/_search
{
"query": {
"bool": {
"must": {
"match": {
"category": "restaurant"
}
},
"filter": {
"geo_distance": {
"distance": "5km",
"location": {
"lat": 39.9042,
"lon": 116.4074
}
}
}
}
},
"sort": [
{
"_geo_distance": {
"location": "39.9042,116.4074",
"order": "asc",
"unit": "km"
}
}
]
}

3.2 聚合分析(Aggregation)与数据可视化

3.2.1 聚合类型详解

聚合类型 描述 示例
Metric 数值计算 avg, sum, min, max
Bucket 分组聚合 terms, range, date_histogram
Pipeline 管道聚合 moving_avg, derivative

3.2.2 电商数据分析实战

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
// 价格分布统计
GET /products/_search
{
"size": 0,
"aggs": {
"price_stats": {
"stats": {
"field": "price"
}
},
"price_ranges": {
"range": {
"field": "price",
"ranges": [
{"to": 1000, "key": "低价"},
{"from": 1000, "to": 5000, "key": "中价"},
{"from": 5000, "key": "高价"}
]
}
},
"category_top_tags": {
"terms": {
"field": "category",
"size": 10
},
"aggs": {
"top_tags": {
"terms": {
"field": "tags",
"size": 5
}
},
"avg_price": {
"avg": {
"field": "price"
}
}
}
}
}
}

// 时间序列分析
GET /logs/_search
{
"size": 0,
"query": {
"range": {
"@timestamp": {
"gte": "now-7d"
}
}
},
"aggs": {
"errors_over_time": {
"date_histogram": {
"field": "@timestamp",
"calendar_interval": "1h",
"time_zone": "Asia/Shanghai"
},
"aggs": {
"error_count": {
"filter": {
"term": {
"level": "ERROR"
}
}
},
"error_rate": {
"bucket_script": {
"buckets_path": {
"error": "error_count>_count",
"total": "_count"
},
"script": "params.error / params.total * 100"
}
}
}
}
}
}

3.2.3 Kibana可视化配置

1
2
3
4
5
6
7
8
9
// 创建索引模式
PUT /api/saved_objects/index-pattern/products
{
"attributes": {
"title": "products",
"timeFieldName": "created_at",
"fields": "[\n {\"name\":\"title\",\"type\":\"string\",\"esTypes\":[\"text\"],\"count\":0,\"scripted\":false,\"searchable\":true,\"aggregatable\":false},\n {\"name\":\"price\",\"type\":\"number\",\"esTypes\":[\"double\"],\"count\":0,\"scripted\":false,\"searchable\":true,\"aggregatable\":true}\n ]"
}
}

3.3 近实时搜索(NRT)与刷新策略

3.3.1 NRT机制详解

ES的近实时特性通过以下机制实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
sequenceDiagram
Client->>ES: 索引文档
ES->>Memory: 写入内存缓冲区
ES->>Translog: 写入事务日志
ES-->>Client: 立即返回成功

Note over ES: 每1秒自动刷新
ES->>Memory: 缓冲区内容刷新到段
ES->>Cache: 更新搜索缓存

Note over ES: 每30分钟或Translog满时
ES->>Disk: Flush到磁盘
ES->>Translog: 清理事务日志

3.3.2 刷新策略配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# 索引级别刷新配置
PUT /products/_settings
{
"refresh_interval": "30s",
"index.translog.durability": "async",
"index.translog.sync_interval": "30s"
}

# 实时搜索优化
PUT /products/_settings
{
"refresh_interval": "1s",
"index.search.idle.after": "30s"
}

# 批量写入优化
PUT /products/_settings
{
"refresh_interval": "-1",
"index.number_of_replicas": 0
}

# 写入完成后恢复
POST /products/_refresh
PUT /products/_settings
{
"refresh_interval": "1s",
"index.number_of_replicas": 1
}

4. 生产环境优化方案

4.1 性能调优与JVM配置

4.1.1 JVM内存配置最佳实践

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# jvm.options配置文件
# 堆内存设置(50%物理内存,最大32GB)
-Xms16g
-Xmx16g

# G1垃圾收集器优化
-XX:+UseG1GC
-XX:MaxGCPauseMillis=200
-XX:G1HeapRegionSize=16m
-XX:G1ReservePercent=25
-XX:InitiatingHeapOccupancyPercent=30

# 系统优化
-XX:+AlwaysPreTouch
-XX:+DisableExplicitGC
-XX:+HeapDumpOnOutOfMemoryError
-XX:HeapDumpPath=/var/lib/elasticsearch/heapdump.hprof

4.1.2 系统级优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 文件描述符限制
# /etc/security/limits.conf
elasticsearch soft nofile 65535
elasticsearch hard nofile 65535

# 虚拟内存设置
# /etc/sysctl.conf
vm.max_map_count=262144
vm.swappiness=1

# 线程池配置
# elasticsearch.yml
thread_pool.search.size: 50
thread_pool.search.queue_size: 1000
thread_pool.write.size: 25
thread_pool.write.queue_size: 200

4.1.3 索引性能优化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
PUT /products
{
"settings": {
"number_of_shards": 6,
"number_of_replicas": 1,
"index.refresh_interval": "30s",
"index.translog.durability": "async",
"index.translog.sync_interval": "30s",
"index.codec": "best_compression",
"index.routing.allocation.total_shards_per_node": 2,
"index.search.idle.after": "30s"
},
"mappings": {
"_source": {
"enabled": false,
"includes": ["title", "price", "category"],
"excludes": ["internal_notes"]
},
"properties": {
"title": {
"type": "text",
"index_options": "docs",
"norms": false,
"store": true
},
"price": {
"type": "double",
"doc_values": true,
"store": true
},
"created_at": {
"type": "date",
"format": "yyyy-MM-dd"
}
}
}
}

4.2 安全认证与权限控制

4.2.1 内置安全配置

1
2
3
4
5
6
7
8
9
10
11
12
# 启用安全特性
# elasticsearch.yml
xpack.security.enabled: true
xpack.security.transport.ssl.enabled: true
xpack.security.http.ssl.enabled: true

# 生成证书
bin/elasticsearch-certutil ca
bin/elasticsearch-certutil cert --ca elastic-stack-ca.p12

# 设置内置用户密码
bin/elasticsearch-setup-passwords interactive

4.2.2 角色权限管理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
# 创建角色
POST /_security/role/read_only_products
{
"cluster": ["monitor"],
"indices": [
{
"names": ["products"],
"privileges": ["read", "view_index_metadata"],
"field_security": {
"grant": ["title", "price", "category"],
"except": ["cost_price", "supplier_info"]
}
}
]
}

# 创建用户并分配角色
POST /_security/user/analyst_user
{
"password": "secure_password123",
"roles": ["read_only_products", "kibana_user"],
"full_name": "Data Analyst",
"email": "analyst@company.com"
}

# API Key认证
POST /_security/api_key
{
"name": "logstash_key",
"role_descriptors": {
"logstash_writer": {
"cluster": ["monitor", "manage_index_templates"],
"index": [
{
"names": ["logstash-*"],
"privileges": ["write", "create", "read"]
}
]
}
}
}

4.3 监控告警与灾难恢复策略

4.3.1 集群监控指标

1
2
3
4
5
6
7
8
9
10
11
12
13
14
# 集群健康检查
GET /_cluster/health

# 节点统计信息
GET /_nodes/stats

# 索引统计
GET /products/_stats

# 分片分配
GET /_cat/shards?v

# 线程池监控
GET /_cat/thread_pool?v

4.3.2 告警规则配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# Metricbeat配置
metricbeat.modules:
- module: elasticsearch
metricsets:
- node
- node_stats
- index
- index_recovery
- index_summary
- shard
- ml_job
period: 10s
hosts: ["http://localhost:9200"]
username: "elastic"
password: "changeme"

# 关键告警规则
alerts:
- name: "Cluster Health Red"
condition: "cluster_health.status == 'red'"
action: "email_admin"

- name: "High JVM Heap Usage"
condition: "node_stats.jvm.mem.heap_used_percent > 85"
action: "slack_notification"

- name: "Low Disk Space"
condition: "node_stats.fs.total.available_in_bytes < 10GB"
action: "page_oncall"

4.3.3 快照备份与恢复

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
# 注册快照仓库
PUT /_snapshot/backup_repo
{
"type": "fs",
"settings": {
"location": "/mnt/backups/elasticsearch",
"compress": true,
"chunk_size": "1gb",
"max_restore_bytes_per_sec": "100mb",
"max_snapshot_bytes_per_sec": "100mb"
}
}

# 创建快照策略
PUT /_slm/policy/daily_snapshots
{
"schedule": "0 30 2 * * ?",
"name": "<snapshot-{now/d}>",
"repository": "backup_repo",
"config": {
"indices": ["products", "logs-*"],
"include_global_state": false,
"partial": false
},
"retention": {
"expire_after": "30d",
"min_count": 5,
"max_count": 50
}
}

# 手动创建快照
PUT /_snapshot/backup_repo/snapshot_20241208?wait_for_completion=true
{
"indices": "products",
"ignore_unavailable": true,
"include_global_state": false
}

# 恢复快照
POST /_snapshot/backup_repo/snapshot_20241208/_restore
{
"indices": "products",
"index_settings": {
"index.number_of_replicas": 0
},
"ignore_index_settings": ["index.refresh_interval"],
"include_global_state": false
}

5. 典型应用场景案例

5.1 日志分析(ELK Stack实现)

5.1.1 架构设计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
graph LR
Applications[应用系统] --> Filebeat[Filebeat]
Servers[服务器] --> Metricbeat[Metricbeat]
Networks[网络设备] --> Logstash[Logstash]

Filebeat --> Kafka[Kafka]
Metricbeat --> Kafka
Logstash --> Kafka

Kafka --> Elasticsearch[Elasticsearch Cluster]
Elasticsearch --> Kibana[Kibana]
Elasticsearch --> Grafana[Grafana]

Kibana --> Users[运维人员]
Grafana --> Users

style Elasticsearch fill:#f9f,stroke:#333
style Kibana fill:#9f9,stroke:#333

5.1.2 Filebeat配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# filebeat.yml
filebeat.inputs:
- type: log
enabled: true
paths:
- /var/log/nginx/access.log
- /var/log/app/*.log
fields:
log_type: application
fields_under_root: true
multiline.pattern: '^\d{4}-\d{2}-\d{2}'
multiline.negate: true
multiline.match: after

processors:
- add_host_metadata:
when.not.contains.tags: forwarded
- decode_json_fields:
fields: ["message"]
target: ""
overwrite_keys: true

output.kafka:
hosts: ["kafka1:9092", "kafka2:9092"]
topic: 'filebeat-logs'
partition.round_robin:
reachable_only: false
required_acks: 1
compression: gzip
max_message_bytes: 1000000

logging.level: info
logging.to_files: true
logging.files:
path: /var/log/filebeat
name: filebeat
keepfiles: 7
permissions: 0644

5.1.3 日志分析Dashboard

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
// 创建索引模板
PUT /_index_template/log_template
{
"index_patterns": ["log-*"],
"template": {
"settings": {
"number_of_shards": 3,
"number_of_replicas": 1
},
"mappings": {
"properties": {
"@timestamp": {"type": "date"},
"level": {"type": "keyword"},
"message": {"type": "text", "analyzer": "standard"},
"host": {"type": "keyword"},
"service": {"type": "keyword"},
"response_time": {"type": "double"},
"status_code": {"type": "integer"},
"user_agent": {"type": "text"},
"client_ip": {"type": "ip"}
}
}
}
}

// 错误日志聚合查询
GET /log-*/_search
{
"size": 0,
"query": {
"range": {
"@timestamp": {
"gte": "now-1h"
}
}
},
"aggs": {
"errors_by_service": {
"filter": {
"terms": {
"level": ["ERROR", "FATAL"]
}
},
"aggs": {
"services": {
"terms": {
"field": "service",
"size": 10
},
"aggs": {
"error_messages": {
"top_hits": {
"size": 3,
"sort": [
{"@timestamp": {"order": "desc"}}
],
"_source": ["message", "@timestamp"]
}
}
}
}
}
},
"response_time_percentiles": {
"percentiles": {
"field": "response_time",
"percents": [50, 95, 99]
}
}
}
}

5.2 电商搜索与推荐系统

5.2.1 搜索架构设计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
from elasticsearch import Elasticsearch
from typing import Dict, List, Any
import json

class EcommerceSearchService:
def __init__(self, es_host: str = "localhost:9200"):
self.es = Elasticsearch([es_host])
self.index_name = "products"

def create_product_index(self):
"""创建电商产品索引"""
mapping = {
"settings": {
"analysis": {
"analyzer": {
"product_analyzer": {
"type": "custom",
"tokenizer": "ik_max_word",
"filter": ["lowercase", "synonym", "stop"]
}
},
"filter": {
"synonym": {
"type": "synonym",
"synonyms": [
"苹果手机,iphone",
"笔记本电脑,笔记本,laptop",
"运动鞋,球鞋,鞋子"
]
}
}
}
},
"mappings": {
"properties": {
"product_id": {"type": "keyword"},
"title": {
"type": "text",
"analyzer": "product_analyzer",
"fields": {
"suggest": {"type": "completion"},
"keyword": {"type": "keyword"}
}
},
"description": {"type": "text", "analyzer": "product_analyzer"},
"price": {"type": "double"},
"original_price": {"type": "double"},
"discount": {"type": "double"},
"brand": {"type": "keyword"},
"category": {"type": "keyword"},
"sub_category": {"type": "keyword"},
"tags": {"type": "keyword"},
"features": {"type": "keyword"},
"rating": {"type": "float"},
"review_count": {"type": "integer"},
"sales_count": {"type": "integer"},
"stock": {"type": "integer"},
"images": {"type": "keyword"},
"created_at": {"type": "date"},
"updated_at": {"type": "date"},
"location": {"type": "geo_point"}
}
}
}

if not self.es.indices.exists(index=self.index_name):
self.es.indices.create(index=self.index_name, body=mapping)

def search_products(self, query: str, filters: Dict[str, Any] = None,
sort_by: str = "_score", from_: int = 0, size: int = 20) -> Dict:
"""智能产品搜索"""

must_clauses = []
filter_clauses = []
should_clauses = []

# 全文搜索
if query:
must_clauses.append({
"multi_match": {
"query": query,
"fields": [
"title^3",
"description^2",
"brand^2",
"category",
"tags"
],
"type": "best_fields",
"fuzziness": "AUTO"
}
})

# 应用过滤器
if filters:
if filters.get("category"):
filter_clauses.append({"term": {"category": filters["category"]}})

if filters.get("price_range"):
price_range = filters["price_range"]
filter_clauses.append({
"range": {
"price": {
"gte": price_range.get("min", 0),
"lte": price_range.get("max", 999999)
}
}
})

if filters.get("brand"):
filter_clauses.append({"terms": {"brand": filters["brand"]}})

if filters.get("rating"):
filter_clauses.append({
"range": {
"rating": {"gte": filters["rating"]}
}
})

# 提升相关性的should条件
should_clauses.extend([
{"term": {"stock": {"value": 0, "boost": 0.1}}},
{"range": {"rating": {"gte": 4.5, "boost": 1.5}}},
{"range": {"sales_count": {"gte": 100, "boost": 1.2}}}
])

# 构建查询
search_body = {
"query": {
"bool": {
"must": must_clauses,
"filter": filter_clauses,
"should": should_clauses,
"minimum_should_match": 0
}
},
"sort": self._get_sort_criteria(sort_by),
"from": from_,
"size": size,
"highlight": {
"fields": {
"title": {"pre_tags": ["<em>"], "post_tags": ["</em>"]},
"description": {"pre_tags": ["<em>"], "post_tags": ["</em>"]}
}
},
"aggs": {
"categories": {
"terms": {"field": "category", "size": 10}
},
"brands": {
"terms": {"field": "brand", "size": 20}
},
"price_ranges": {
"range": {
"field": "price",
"ranges": [
{"to": 1000, "key": "低价"},
{"from": 1000, "to": 5000, "key": "中价"},
{"from": 5000, "to": 10000, "key": "高价"},
{"from": 10000, "key": "超高价"}
]
}
},
"avg_rating": {
"avg": {"field": "rating"}
}
}
}

return self.es.search(index=self.index_name, body=search_body)

def _get_sort_criteria(self, sort_by: str) -> List[Dict]:
"""获取排序条件"""
sort_map = {
"relevance": ["_score"],
"price_asc": [{"price": {"order": "asc"}}],
"price_desc": [{"price": {"order": "desc"}}],
"rating": [{"rating": {"order": "desc"}}, {"_score": {"order": "desc"}}],
"sales": [{"sales_count": {"order": "desc"}}, {"_score": {"order": "desc"}}],
"newest": [{"created_at": {"order": "desc"}}]
}
return sort_map.get(sort_by, ["_score"])

def get_search_suggestions(self, prefix: str, size: int = 5) -> List[str]:
"""搜索建议"""
suggest_query = {
"suggest": {
"product_suggest": {
"prefix": prefix,
"completion": {
"field": "title.suggest",
"size": size,
"fuzzy": {
"fuzziness": "AUTO"
}
}
}
}
}

response = self.es.search(index=self.index_name, body=suggest_query)
suggestions = response['suggest']['product_suggest'][0]['options']
return [suggestion['_source']['title'] for suggestion in suggestions]

def get_recommendations(self, product_id: str, user_behavior: Dict = None) -> Dict:
"""基于内容的推荐"""
# 获取当前产品信息
product = self.es.get(index=self.index_name, id=product_id)['_source']

# 构建推荐查询
recommendation_query = {
"query": {
"bool": {
"must_not": [
{"term": {"product_id": product_id}}
],
"should": [
{"term": {"category": {"value": product['category'], "boost": 3}}},
{"term": {"brand": {"value": product['brand'], "boost": 2}}},
{"terms": {"tags": product.get('tags', []), "boost": 1.5}},
{"more_like_this": {
"fields": ["title", "description"],
"like": [{
"_index": self.index_name,
"_id": product_id
}],
"min_term_freq": 1,
"max_query_terms": 12,
"boost": 2
}}
]
}
},
"size": 8,
"_source": ["product_id", "title", "price", "rating", "images"]
}

# 个性化权重调整
if user_behavior:
if user_behavior.get('preferred_brands'):
recommendation_query['query']['bool']['should'].append({
"terms": {"brand": user_behavior['preferred_brands'], "boost": 2.5}
})

if user_behavior.get('price_preference'):
price_pref = user_behavior['price_preference']
recommendation_query['query']['bool']['should'].append({
"range": {
"price": {
"gte": price_pref['min'],
"lte": price_pref['max'],
"boost": 1.8
}
}
})

return self.es.search(index=self.index_name, body=recommendation_query)

# 使用示例
search_service = EcommerceSearchService()

# 搜索产品
results = search_service.search_products(
query="苹果手机",
filters={
"category": "electronics",
"price_range": {"min": 3000, "max": 10000},
"brand": ["Apple", "华为"],
"rating": 4.0
},
sort_by="rating",
from_=0,
size=20
)

# 获取推荐
recommendations = search_service.get_recommendations(
product_id="P001",
user_behavior={
"preferred_brands": ["Apple"],
"price_preference": {"min": 5000, "max": 15000}
}
)

5.3 地理空间数据查询

5.3.1 地理数据索引设计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
class GeoLocationService:
def __init__(self, es_host: str = "localhost:9200"):
self.es = Elasticsearch([es_host])
self.index_name = "locations"

def create_location_index(self):
"""创建地理位置索引"""
mapping = {
"settings": {
"number_of_shards": 3,
"number_of_replicas": 1
},
"mappings": {
"properties": {
"location_id": {"type": "keyword"},
"name": {
"type": "text",
"fields": {
"keyword": {"type": "keyword"}
}
},
"location": {"type": "geo_point"},
"address": {"type": "text"},
"category": {"type": "keyword"},
"rating": {"type": "float"},
"open_hours": {
"type": "object",
"properties": {
"monday": {"type": "keyword"},
"tuesday": {"type": "keyword"},
"wednesday": {"type": "keyword"},
"thursday": {"type": "keyword"},
"friday": {"type": "keyword"},
"saturday": {"type": "keyword"},
"sunday": {"type": "keyword"}
}
},
"services": {"type": "keyword"},
"created_at": {"type": "date"}
}
}
}

if not self.es.indices.exists(index=self.index_name):
self.es.indices.create(index=self.index_name, body=mapping)

def add_location(self, location_data: Dict):
"""添加地理位置数据"""
self.es.index(
index=self.index_name,
id=location_data["location_id"],
body=location_data
)

def search_nearby(self, lat: float, lon: float, distance: str = "5km",
category: str = None, size: int = 10) -> Dict:
"""搜索附近的位置"""

query = {
"query": {
"bool": {
"filter": [
{
"geo_distance": {
"distance": distance,
"location": {
"lat": lat,
"lon": lon
}
}
}
]
}
},
"sort": [
{
"_geo_distance": {
"location": {
"lat": lat,
"lon": lon
},
"order": "asc",
"unit": "km",
"distance_type": "plane"
}
},
{"rating": {"order": "desc"}}
],
"size": size
}

if category:
query["query"]["bool"]["filter"].append({
"term": {"category": category}
})

return self.es.search(index=self.index_name, body=query)

def search_in_bbox(self, top_left: Dict, bottom_right: Dict,
category: str = None) -> Dict:
"""在边界框内搜索"""

query = {
"query": {
"bool": {
"filter": [
{
"geo_bounding_box": {
"location": {
"top_left": top_left,
"bottom_right": bottom_right
}
}
}
]
}
},
"size": 100
}

if category:
query["query"]["bool"]["filter"].append({
"term": {"category": category}
})

return self.es.search(index=self.index_name, body=query)

def search_in_polygon(self, polygon_points: List[Dict],
category: str = None) -> Dict:
"""在多边形区域内搜索"""

query = {
"query": {
"bool": {
"filter": [
{
"geo_polygon": {
"location": {
"points": polygon_points
}
}
}
]
}
},
"size": 100
}

if category:
query["query"]["bool"]["filter"].append({
"term": {"category": category}
})

return self.es.search(index=self.index_name, body=query)

def aggregate_by_distance(self, lat: float, lon: float,
distance_ranges: List[Dict]) -> Dict:
"""按距离范围聚合统计"""

agg_query = {
"size": 0,
"aggs": {
"distance_ranges": {
"geo_distance": {
"field": "location",
"origin": {"lat": lat, "lon": lon},
"ranges": distance_ranges
},
"aggs": {
"categories": {
"terms": {"field": "category", "size": 10}
},
"avg_rating": {
"avg": {"field": "rating"}
}
}
}
}
}

return self.es.search(index=self.index_name, body=agg_query)

# 使用示例
geo_service = GeoLocationService()

# 添加示例数据
restaurants = [
{
"location_id": "rest_001",
"name": "海底捞火锅",
"location": {"lat": 39.9042, "lon": 116.4074},
"category": "restaurant",
"rating": 4.5,
"services": ["wifi", "parking", "delivery"]
},
{
"location_id": "rest_002",
"name": "星巴克咖啡",
"location": {"lat": 39.9142, "lon": 116.3974},
"category": "coffee",
"rating": 4.2,
"services": ["wifi", "outdoor_seating"]
}
]

for restaurant in restaurants:
geo_service.add_location(restaurant)

# 搜索附近的餐厅
results = geo_service.search_nearby(
lat=39.9042,
lon=116.4074,
distance="2km",
category="restaurant",
size=5
)

# 按距离聚合分析
agg_results = geo_service.aggregate_by_distance(
lat=39.9042,
lon=116.4074,
distance_ranges=[
{"to": 1, "key": "1km内"},
{"from": 1, "to": 3, "key": "1-3km"},
{"from": 3, "to": 5, "key": "3-5km"},
{"from": 5, "key": "5km外"}
]
)

总结与最佳实践

核心学习要点

  1. 架构理解:掌握分布式架构、分片机制和倒排索引原理
  2. 查询优化:熟练使用DSL查询,合理设计索引映射
  3. 性能调优:JVM配置、系统优化、索引设置的最佳实践
  4. 安全防护:认证授权、网络隔离、数据加密的完整方案
  5. 生产运维:监控告警、备份恢复、灾难恢复策略

性能基准数据

场景 配置 性能指标
单节点写入 16GB内存,4核CPU 5000 docs/sec
三节点集群 32GB内存,8核CPU 15000 docs/sec
搜索延迟 SSD存储,10GB索引 <100ms
聚合查询 1000万文档 <500ms
地理搜索 100万位置点 <200ms

生产环境检查清单

  • JVM堆内存设置为物理内存的50%
  • 禁用swap,设置vm.swappiness=1
  • 配置合理的分片数量(每GB堆内存20-25个分片)
  • 启用监控和告警机制
  • 配置定期快照备份策略
  • 设置最小_master_nodes避免脑裂
  • 配置安全认证和角色权限
  • 建立灾难恢复预案

进阶学习路径

  1. 深度原理:研究Lucene底层实现、段合并机制
  2. 机器学习:学习ES的ML功能、异常检测、预测分析
  3. 性能调优:掌握Profile API、Explain API的使用
  4. 集群管理:学习跨集群搜索、CCR、ILM等高级功能
  5. 云原生:研究Elastic Cloud、Kubernetes部署方案

通过本指南的学习,您已经掌握了Elasticsearch从基础概念到企业级应用的完整知识体系。建议在实际项目中逐步实践这些技术,并持续关注ES社区的最新发展动态。