Elasticsearch 与 Kibana
什么是 Elasticsearch?
Elasticsearch 是一个基于 Apache Lucene 的分布式搜索和分析引擎,专为处理大规模数据而设计。它广泛用于日志分析、全文搜索、实时数据分析和监控等 场景。
info
Elasticsearch 是 ELK Stack(Elasticsearch、Logstash、Kibana)的核心组件,常用于企业级日志管理和数据分析。
Elasticsearch 核心概念
| 概念 | 说明 | 类比 |
|---|---|---|
| 索引 (Index) | 文档的集合,类似于数据库 | 数据库 |
| 类型 (Type) | 索引中的文档类型(7.x+ 已废弃) | 表 |
| 文档 (Document) | 索引中的基本数据单位 | 行 |
| 字段 (Field) | 文档的属性 | 列 |
| 分片 (Shard) | 索引的水平分割 | 分区 |
| 副本 (Replica) | 分片的备份 | 备份 |
安装与配置
使用 Docker 安装 Elasticsearch
# 拉取 Elasticsearch 镜像
docker pull docker.elastic.co/elasticsearch/elasticsearch:8.11.0
# 运行 Elasticsearch(单节点模式)
docker run -d \
--name elasticsearch \
-p 9200:9200 \
-p 9300:9300 \
-e "discovery.type=single-node" \
-e "xpack.security.enabled=false" \
docker.elastic.co/elasticsearch/elasticsearch:8.11.0
# 验证安装
curl http://localhost:9200
使用 Docker Compose 安装 ELK Stack
version: '3.8'
services:
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.11.0
container_name: elasticsearch
environment:
- discovery.type=single-node
- xpack.security.enabled=false
- "ES_JAVA_OPTS=-Xms512m -Xmx512m"
ports:
- "9200:9200"
- "9300:9300"
volumes:
- es_data:/usr/share/elasticsearch/data
logstash:
image: docker.elastic.co/logstash/logstash:8.11.0
container_name: logstash
volumes:
- ./logstash/config:/usr/share/logstash/config
- ./logstash/pipeline:/usr/share/logstash/pipeline
ports:
- "5044:5044"
depends_on:
- elasticsearch
kibana:
image: docker.elastic.co/kibana/kibana:8.11.0
container_name: kibana
environment:
- ELASTICSEARCH_HOSTS=http://elasticsearch:9200
ports:
- "5601:5601"
depends_on:
- elasticsearch
volumes:
es_data:
# 启动整个 ELK Stack
docker-compose up -d
# 访问 Kibana
# http://localhost:5601
基本操作
使用 REST API
- cURL
- Python
- JavaScript
# 检查集群健康状态
curl -X GET "localhost:9200/_cat/health?v"
# 创建索引
curl -X PUT "localhost:9200/logs" -H 'Content-Type: application/json' -d'
{
"settings": {
"number_of_shards": 1,
"number_of_replicas": 1
},
"mappings": {
"properties": {
"timestamp": {
"type": "date"
},
"level": {
"type": "keyword"
},
"message": {
"type": "text"
},
"source": {
"type": "keyword"
}
}
}
}'
# 查看索引信息
curl -X GET "localhost:9200/logs?pretty"
# 索引文档(添加日志)
curl -X POST "localhost:9200/logs/_doc" -H 'Content-Type: application/json' -d'
{
"timestamp": "2024-01-15T10:30:00Z",
"level": "INFO",
"message": "用户登录成功",
"source": "auth-service",
"user_id": "user123"
}'
# 批量索引文档
curl -X POST "localhost:9200/logs/_bulk" -H 'Content-Type: application/json' -d'
{"index":{}}
{"timestamp":"2024-01-15T10:31:00Z","level":"ERROR","message":"数据库连接失败","source":"db-service"}
{"index":{}}
{"timestamp":"2024-01-15T10:32:00Z","level":"WARN","message":"内存使用率过高","source":"monitor-service"}
'
# 搜索文档
curl -X GET "localhost:9200/logs/_search?pretty" -H 'Content-Type: application/json' -d'
{
"query": {
"match": {
"message": "登录"
}
}
}'
# 按级别过滤日志
curl -X GET "localhost:9200/logs/_search?pretty" -H 'Content-Type: application/json' -d'
{
"query": {
"term": {
"level": "ERROR"
}
},
"sort": [
{
"timestamp": {
"order": "desc"
}
}
]
}'
from elasticsearch import Elasticsearch
from datetime import datetime
import json
# 连接到 Elasticsearch
es = Elasticsearch(
hosts=[{"host": "localhost", "port": 9200}],
timeout=30
)
# 检查连接
if es.ping():
print("成功连接到 Elasticsearch")
else:
print("无法连接到 Elasticsearch")
# 创建索引
def create_log_index():
index_body = {
"settings": {
"number_of_shards": 1,
"number_of_replicas": 1
},
"mappings": {
"properties": {
"timestamp": {
"type": "date"
},
"level": {
"type": "keyword"
},
"message": {
"type": "text",
"analyzer": "standard"
},
"source": {
"type": "keyword"
},
"user_id": {
"type": "keyword"
}
}
}
}
if not es.indices.exists(index="logs"):
es.indices.create(index="logs", body=index_body)
print("索引 'logs' 创建成功")
else:
print("索引 'logs' 已存在")
# 添加日志文档
def add_log(level, message, source, user_id=None):
doc = {
"timestamp": datetime.utcnow().isoformat(),
"level": level,
"message": message,
"source": source,
"user_id": user_id
}
result = es.index(index="logs", document=doc)
print(f"日志已添加,ID: {result['_id']}")
return result
# 批量添加日志
def bulk_add_logs(logs):
actions = []
for log in logs:
action = {
"_index": "logs",
"_source": {
"timestamp": datetime.utcnow().isoformat(),
**log
}
}
actions.append(action)
from elasticsearch.helpers import bulk
success, failed = bulk(es, actions)
print(f"批量添加完成: {success} 成功, {len(failed)} 失败")
return success, failed
# 搜索日志
def search_logs(query_text, level=None, size=10):
query = {
"bool": {
"must": [
{
"match": {
"message": query_text
}
}
]
}
}
if level:
query["bool"]["must"].append({
"term": {
"level": level
}
})
body = {
"query": query,
"sort": [
{
"timestamp": {
"order": "desc"
}
}
],
"size": size
}
result = es.search(index="logs", body=body)
return result["hits"]["hits"]
# 聚合统计
def aggregate_logs_by_level():
body = {
"size": 0,
"aggs": {
"log_levels": {
"terms": {
"field": "level",
"size": 10
}
}
}
}
result = es.search(index="logs", body=body)
return result["aggregations"]["log_levels"]["buckets"]
# 使用示例
if __name__ == "__main__":
# 创建索引
create_log_index()
# 添加单条日志
add_log("INFO", "用户登录成功", "auth-service", "user123")
add_log("ERROR", "数据库连 接失败", "db-service")
add_log("WARN", "内存使用率过高", "monitor-service")
# 批量添加日志
logs = [
{"level": "INFO", "message": "订单创建成功", "source": "order-service", "user_id": "user456"},
{"level": "ERROR", "message": "支付处理失败", "source": "payment-service"},
{"level": "INFO", "message": "邮件发送成功", "source": "email-service"}
]
bulk_add_logs(logs)
# 搜索日志
print("\n搜索包含'登录'的日志:")
results = search_logs("登录")
for hit in results:
print(f" - [{hit['_source']['level']}] {hit['_source']['message']}")
# 只搜索错误日志
print("\n搜索错误日志:")
error_logs = search_logs("", level="ERROR")
for hit in error_logs:
print(f" - {hit['_source']['message']}")
# 按级别聚合
print("\n日志级别统计:")
stats = aggregate_logs_by_level()
for bucket in stats:
print(f" {bucket['key']}: {bucket['doc_count']} 条")
const { Client } = require('@elastic/elasticsearch');
// 连接到 Elasticsearch
const client = new Client({
node: 'http://localhost:9200'
});
// 检查连接
async function checkConnection() {
try {
const response = await client.ping();
console.log('成功连接到 Elasticsearch');
} catch (error) {
console.error('无法连接到 Elasticsearch:', error);
}
}
// 创建索引
async function createLogIndex() {
const indexBody = {
settings: {
number_of_shards: 1,
number_of_replicas: 1
},
mappings: {
properties: {
timestamp: { type: 'date' },
level: { type: 'keyword' },
message: { type: 'text' },
source: { type: 'keyword' },
user_id: { type: 'keyword' }
}
}
};
try {
const exists = await client.indices.exists({ index: 'logs' });
if (!exists) {
await client.indices.create({
index: 'logs',
body: indexBody
});
console.log("索引 'logs' 创建成功");
} else {
console.log("索引 'logs' 已存在");
}
} catch (error) {
console.error('创建索引失败:', error);
}
}
// 添加日志
async function addLog(level, message, source, userId = null) {
const doc = {
timestamp: new Date().toISOString(),
level,
message,
source,
user_id: userId
};
try {
const result = await client.index({
index: 'logs',
body: doc
});
console.log(`日志已添加,ID: ${result._id}`);
return result;
} catch (error) {
console.error('添加日志失败:', error);
}
}
// 搜索日志
async function searchLogs(queryText, level = null, size = 10) {
const query = {
bool: {
must: [
{
match: {
message: queryText
}
}
]
}
};
if (level) {
query.bool.must.push({
term: {
level: level
}
});
}
try {
const result = await client.search({
index: 'logs',
body: {
query: query,
sort: [
{
timestamp: {
order: 'desc'
}
}
],
size: size
}
});
return result.body.hits.hits;
} catch (error) {
console.error('搜索失败:', error);
return [];
}
}
// 使用示例
async function main() {
await checkConnection();
await createLogIndex();
// 添加日志
await addLog('INFO', '用户登录成功', 'auth-service', 'user123');
await addLog('ERROR', '数据库连接失败', 'db-service');
await addLog('WARN', '内存使用率过高', 'monitor-service');
// 搜索日志
console.log('\n搜索包含"登录"的日志:');
const results = await searchLogs('登录');
results.forEach(hit => {
console.log(` - [${hit._source.level}] ${hit._source.message}`);
});
// 搜索错误日志
console.log('\n搜索错误日志:');
const errorLogs = await searchLogs('', 'ERROR');
errorLogs.forEach(hit => {
console.log(` - ${hit._source.message}`);
});
}
main();
日志记录最佳实践
结构化日志格式
import logging
from elasticsearch import Elasticsearch
from datetime import datetime
import json
# 配置 Elasticsearch 日志处理器
class ElasticsearchHandler(logging.Handler):
def __init__(self, es_client, index_name):
super().__init__()
self.es = es_client
self.index_name = index_name
def emit(self, record):
try:
log_entry = {
"timestamp": datetime.utcnow().isoformat(),
"level": record.levelname,
"message": record.getMessage(),
"module": record.module,
"function": record.funcName,
"line": record.lineno,
"logger": record.name,
"thread": record.threadName,
"process": record.process
}
# 添加异常信息
if record.exc_info:
log_entry["exception"] = self.format(record)
# 添加额外字段
if hasattr(record, 'user_id'):
log_entry["user_id"] = record.user_id
if hasattr(record, 'request_id'):
log_entry["request_id"] = record.request_id
self.es.index(index=self.index_name, document=log_entry)
except Exception:
self.handleError(record)
# 配置日志
es = Elasticsearch(['localhost:9200'])
handler = ElasticsearchHandler(es, 'application-logs')
handler.setLevel(logging.INFO)
logger = logging.getLogger('myapp')
logger.setLevel(logging.INFO)
logger.addHandler(handler)
# 使用示例
logger.info("用户操作", extra={"user_id": "user123", "request_id": "req456"})
logger.error("处理失败", exc_info=True)
日志轮转和索引管理
from elasticsearch import Elasticsearch
from datetime import datetime, timedelta
es = Elasticsearch(['localhost:9200'])
# 创建带日期的索引模板
def create_index_template():
template = {
"index_patterns": ["logs-*"],
"settings": {
"number_of_shards": 1,
"number_of_replicas": 1,
"index.lifecycle.name": "logs-policy",
"index.lifecycle.rollover_alias": "logs"
},
"mappings": {
"properties": {
"timestamp": {"type": "date"},
"level": {"type": "keyword"},
"message": {"type": "text"},
"source": {"type": "keyword"}
}
}
}
es.indices.put_index_template(name="logs-template", body=template)
print("索引模板创建成功")
# 创建每日索引
def get_daily_index():
today = datetime.now().strftime("%Y.%m.%d")
return f"logs-{today}"
# 删除旧索引(保留30天)
def cleanup_old_indices(days_to_keep=30):
cutoff_date = datetime.now() - timedelta(days=days_to_keep)
cutoff_str = cutoff_date.strftime("%Y.%m.%d")
# 列出所有日志索引
indices = es.indices.get_alias(index="logs-*")
for index_name in indices:
# 提取日期
date_str = index_name.split("-")[-1]
if date_str < cutoff_str:
es.indices.delete(index=index_name)
print(f"已删除旧索引: {index_name}")
# 使用示例
if __name__ == "__main__":
create_index_template()
# 使用每日索引
daily_index = get_daily_index()
es.index(index=daily_index, document={
"timestamp": datetime.utcnow().isoformat(),
"level": "INFO",
"message": "测试日志"
})
# 定期清理旧索引
# cleanup_old_indices(30)
Kibana 可视化
在 Kibana 中创建索引模式
- 打开 Kibana (http://localhost:5601)
- 导航到 Management > Stack Management > Index Patterns
- 点击 Create index pattern
- 输入索引模式:
logs-*或application-logs-* - 选择时间字段:
timestamp - 点击 Create index pattern
创建可视化图表
1. 日志级别统计(饼图)
{
"visState": {
"title": "日志级别分布",
"type": "pie",
"params": {
"addTooltip": true,
"addLegend": true
},
"aggs": [
{
"id": "1",
"type": "count",
"schema": "metric"
},
{
"id": "2",
"type": "terms",
"schema": "segment",
"params": {
"field": "level",
"size": 5,
"order": "desc",
"orderBy": "1"
}
}
]
}
}
2. 时间序列日志(折线图)
{
"visState": {
"title": "日志时间序列",
"type": "histogram",
"params": {
"addTooltip": true,
"addLegend": true
},
"aggs": [
{
"id": "1",
"type": "count",
"schema": "metric"
},
{
"id": "2",
"type": "date_histogram",
"schema": "segment",
"params": {
"field": "timestamp",
"interval": "1h",
"min_doc_count": 1
}
},
{
"id": "3",
"type": "terms",
"schema": "group",
"params": {
"field": "level",
"size": 5
}
}
]
}
}
3. 错误日志表格
在 Kibana 中创建 Discover 视图:
- 导航到 Discover
- 选择索引模式:
logs-* - 添加过滤器:
level: ERROR
- 添加列:
timestamplevelmessagesource
- 保存视图
创建仪表板
- 导航到 Dashboard > Create dashboard
- 添加已创建的可视化图表
- 配置时间范围选择器
- 添加搜索过滤器
- 保存仪表板