Skip to main content

Elasticsearch 与 Kibana

什么是 Elasticsearch?

Elasticsearch 是一个基于 Apache Lucene 的分布式搜索和分析引擎,专为处理大规模数据而设计。它广泛用于日志分析、全文搜索、实时数据分析和监控等场景。

info

Elasticsearch 是 ELK Stack(Elasticsearch、Logstash、Kibana)的核心组件,常用于企业级日志管理和数据分析。

Elasticsearch 核心概念

概念说明类比
索引 (Index)文档的集合,类似于数据库数据库
类型 (Type)索引中的文档类型(7.x+ 已废弃)
文档 (Document)索引中的基本数据单位
字段 (Field)文档的属性
分片 (Shard)索引的水平分割分区
副本 (Replica)分片的备份备份

安装与配置

使用 Docker 安装 Elasticsearch

# 拉取 Elasticsearch 镜像
docker pull docker.elastic.co/elasticsearch/elasticsearch:8.11.0

# 运行 Elasticsearch(单节点模式)
docker run -d \
--name elasticsearch \
-p 9200:9200 \
-p 9300:9300 \
-e "discovery.type=single-node" \
-e "xpack.security.enabled=false" \
docker.elastic.co/elasticsearch/elasticsearch:8.11.0

# 验证安装
curl http://localhost:9200

使用 Docker Compose 安装 ELK Stack

version: '3.8'

services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.11.0
container_name: elasticsearch
environment:
- discovery.type=single-node
- xpack.security.enabled=false
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
ports:
- "9200:9200"
- "9300:9300"
volumes:
- es_data:/usr/share/elasticsearch/data

logstash:
image: docker.elastic.co/logstash/logstash:8.11.0
container_name: logstash
volumes:
- ./logstash/config:/usr/share/logstash/config
- ./logstash/pipeline:/usr/share/logstash/pipeline
ports:
- "5044:5044"
depends_on:
- elasticsearch

kibana:
image: docker.elastic.co/kibana/kibana:8.11.0
container_name: kibana
environment:
- ELASTICSEARCH_HOSTS=http://elasticsearch:9200
ports:
- "5601:5601"
depends_on:
- elasticsearch

volumes:
es_data:
# 启动整个 ELK Stack
docker-compose up -d

# 访问 Kibana
# http://localhost:5601

基本操作

使用 REST API

# 检查集群健康状态
curl -X GET "localhost:9200/_cat/health?v"

# 创建索引
curl -X PUT "localhost:9200/logs" -H 'Content-Type: application/json' -d'
{
"settings": {
"number_of_shards": 1,
"number_of_replicas": 1
},
"mappings": {
"properties": {
"timestamp": {
"type": "date"
},
"level": {
"type": "keyword"
},
"message": {
"type": "text"
},
"source": {
"type": "keyword"
}
}
}
}'

# 查看索引信息
curl -X GET "localhost:9200/logs?pretty"

# 索引文档(添加日志)
curl -X POST "localhost:9200/logs/_doc" -H 'Content-Type: application/json' -d'
{
"timestamp": "2024-01-15T10:30:00Z",
"level": "INFO",
"message": "用户登录成功",
"source": "auth-service",
"user_id": "user123"
}'

# 批量索引文档
curl -X POST "localhost:9200/logs/_bulk" -H 'Content-Type: application/json' -d'
{"index":{}}
{"timestamp":"2024-01-15T10:31:00Z","level":"ERROR","message":"数据库连接失败","source":"db-service"}
{"index":{}}
{"timestamp":"2024-01-15T10:32:00Z","level":"WARN","message":"内存使用率过高","source":"monitor-service"}
'

# 搜索文档
curl -X GET "localhost:9200/logs/_search?pretty" -H 'Content-Type: application/json' -d'
{
"query": {
"match": {
"message": "登录"
}
}
}'

# 按级别过滤日志
curl -X GET "localhost:9200/logs/_search?pretty" -H 'Content-Type: application/json' -d'
{
"query": {
"term": {
"level": "ERROR"
}
},
"sort": [
{
"timestamp": {
"order": "desc"
}
}
]
}'

日志记录最佳实践

结构化日志格式

import logging
from elasticsearch import Elasticsearch
from datetime import datetime
import json

# 配置 Elasticsearch 日志处理器
class ElasticsearchHandler(logging.Handler):
def __init__(self, es_client, index_name):
super().__init__()
self.es = es_client
self.index_name = index_name

def emit(self, record):
try:
log_entry = {
"timestamp": datetime.utcnow().isoformat(),
"level": record.levelname,
"message": record.getMessage(),
"module": record.module,
"function": record.funcName,
"line": record.lineno,
"logger": record.name,
"thread": record.threadName,
"process": record.process
}

# 添加异常信息
if record.exc_info:
log_entry["exception"] = self.format(record)

# 添加额外字段
if hasattr(record, 'user_id'):
log_entry["user_id"] = record.user_id
if hasattr(record, 'request_id'):
log_entry["request_id"] = record.request_id

self.es.index(index=self.index_name, document=log_entry)
except Exception:
self.handleError(record)

# 配置日志
es = Elasticsearch(['localhost:9200'])
handler = ElasticsearchHandler(es, 'application-logs')
handler.setLevel(logging.INFO)

logger = logging.getLogger('myapp')
logger.setLevel(logging.INFO)
logger.addHandler(handler)

# 使用示例
logger.info("用户操作", extra={"user_id": "user123", "request_id": "req456"})
logger.error("处理失败", exc_info=True)

日志轮转和索引管理

from elasticsearch import Elasticsearch
from datetime import datetime, timedelta

es = Elasticsearch(['localhost:9200'])

# 创建带日期的索引模板
def create_index_template():
template = {
"index_patterns": ["logs-*"],
"settings": {
"number_of_shards": 1,
"number_of_replicas": 1,
"index.lifecycle.name": "logs-policy",
"index.lifecycle.rollover_alias": "logs"
},
"mappings": {
"properties": {
"timestamp": {"type": "date"},
"level": {"type": "keyword"},
"message": {"type": "text"},
"source": {"type": "keyword"}
}
}
}

es.indices.put_index_template(name="logs-template", body=template)
print("索引模板创建成功")

# 创建每日索引
def get_daily_index():
today = datetime.now().strftime("%Y.%m.%d")
return f"logs-{today}"

# 删除旧索引(保留30天)
def cleanup_old_indices(days_to_keep=30):
cutoff_date = datetime.now() - timedelta(days=days_to_keep)
cutoff_str = cutoff_date.strftime("%Y.%m.%d")

# 列出所有日志索引
indices = es.indices.get_alias(index="logs-*")

for index_name in indices:
# 提取日期
date_str = index_name.split("-")[-1]
if date_str < cutoff_str:
es.indices.delete(index=index_name)
print(f"已删除旧索引: {index_name}")

# 使用示例
if __name__ == "__main__":
create_index_template()

# 使用每日索引
daily_index = get_daily_index()
es.index(index=daily_index, document={
"timestamp": datetime.utcnow().isoformat(),
"level": "INFO",
"message": "测试日志"
})

# 定期清理旧索引
# cleanup_old_indices(30)

Kibana 可视化

在 Kibana 中创建索引模式

  1. 打开 Kibana (http://localhost:5601)
  2. 导航到 Management > Stack Management > Index Patterns
  3. 点击 Create index pattern
  4. 输入索引模式:logs-*application-logs-*
  5. 选择时间字段:timestamp
  6. 点击 Create index pattern

创建可视化图表

1. 日志级别统计(饼图)

{
"visState": {
"title": "日志级别分布",
"type": "pie",
"params": {
"addTooltip": true,
"addLegend": true
},
"aggs": [
{
"id": "1",
"type": "count",
"schema": "metric"
},
{
"id": "2",
"type": "terms",
"schema": "segment",
"params": {
"field": "level",
"size": 5,
"order": "desc",
"orderBy": "1"
}
}
]
}
}

2. 时间序列日志(折线图)

{
"visState": {
"title": "日志时间序列",
"type": "histogram",
"params": {
"addTooltip": true,
"addLegend": true
},
"aggs": [
{
"id": "1",
"type": "count",
"schema": "metric"
},
{
"id": "2",
"type": "date_histogram",
"schema": "segment",
"params": {
"field": "timestamp",
"interval": "1h",
"min_doc_count": 1
}
},
{
"id": "3",
"type": "terms",
"schema": "group",
"params": {
"field": "level",
"size": 5
}
}
]
}
}

3. 错误日志表格

在 Kibana 中创建 Discover 视图:

  1. 导航到 Discover
  2. 选择索引模式:logs-*
  3. 添加过滤器:
    • level: ERROR
  4. 添加列:
    • timestamp
    • level
    • message
    • source
  5. 保存视图

创建仪表板

  1. 导航到 Dashboard > Create dashboard
  2. 添加已创建的可视化图表
  3. 配置时间范围选择器
  4. 添加搜索过滤器
  5. 保存仪表板

高级查询

复杂查询示例

from elasticsearch import Elasticsearch

es = Elasticsearch(['localhost:9200'])

# 多条件查询
def complex_search():
body = {
"query": {
"bool": {
"must": [
{
"match": {
"message": "错误"
}
}
],
"filter": [
{
"range": {
"timestamp": {
"gte": "2024-01-01",
"lte": "2024-01-31"
}
}
},
{
"term": {
"level": "ERROR"
}
}
],
"must_not": [
{
"term": {
"source": "test-service"
}
}
]
}
},
"aggs": {
"by_source": {
"terms": {
"field": "source",
"size": 10
},
"aggs": {
"by_level": {
"terms": {
"field": "level"
}
}
}
},
"error_timeline": {
"date_histogram": {
"field": "timestamp",
"calendar_interval": "1d"
}
}
},
"highlight": {
"fields": {
"message": {}
}
}
}

result = es.search(index="logs-*", body=body)
return result

# 全文搜索与模糊匹配
def fuzzy_search(query_text):
body = {
"query": {
"multi_match": {
"query": query_text,
"fields": ["message^2", "source"],
"fuzziness": "AUTO",
"type": "best_fields"
}
}
}

result = es.search(index="logs-*", body=body)
return result

# 使用示例
if __name__ == "__main__":
# 复杂查询
results = complex_search()
print(f"找到 {results['hits']['total']['value']} 条结果")

# 聚合结果
if 'aggregations' in results:
print("\n按来源统计:")
for bucket in results['aggregations']['by_source']['buckets']:
print(f" {bucket['key']}: {bucket['doc_count']} 条")

性能优化

索引优化建议

# 1. 使用批量操作
from elasticsearch.helpers import bulk

def bulk_index_logs(logs):
actions = [
{
"_index": "logs",
"_source": log
}
for log in logs
]
bulk(es, actions, chunk_size=1000)

# 2. 使用刷新策略
es.indices.put_settings(
index="logs",
body={
"settings": {
"refresh_interval": "30s" # 降低刷新频率
}
}
)

# 3. 优化映射
mapping = {
"properties": {
"timestamp": {"type": "date"},
"level": {"type": "keyword"}, # 使用 keyword 而非 text
"message": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
}
}
}

故障排查

常见问题

问题:索引创建失败

# 检查集群状态
curl -X GET "localhost:9200/_cluster/health?pretty"

# 查看索引设置
curl -X GET "localhost:9200/logs/_settings?pretty"

问题:搜索性能慢

# 使用过滤器而非查询(过滤器会被缓存)
body = {
"query": {
"bool": {
"filter": [ # 使用 filter 而非 must
{"term": {"level": "ERROR"}},
{"range": {"timestamp": {"gte": "2024-01-01"}}}
]
}
}
}

问题:磁盘空间不足

# 查看索引大小
curl -X GET "localhost:9200/_cat/indices?v&s=store.size:desc"

# 删除旧索引
curl -X DELETE "localhost:9200/logs-2024.01.01"

总结

Elasticsearch 是一个强大的搜索和分析引擎,特别适合日志管理和数据分析。结合 Kibana 可以创建丰富的可视化仪表板,帮助监控和分析应用日志。通过合理设计索引结构、使用批量操作和优化查询,可以构建高效可靠的日志管理系统。

相关资源