业务背景

在排查问题时,想通过Grafana看板查看用户的监控,只能靠拍照,效率低,质量一般,设计一个方案能够方便的将问题出现前24小时的监控数据拿到,在本地导入,就能够在本地Grafana方便的查看。Prometheus 本身只提供了API查询的功能并没有导出数据功能,自带的Promtool也只提供验证规则文件和配置文件,调试等。

参考文章

Analyzing Prometheus data with external tools

Prometheus backfilling

方案一:使用API导出转换成CSV

使用API查询,将查询到的数据转换成CSV,刚好Grafana有插件能够将CSV作为数据源,经过实验后并不是特别顺利,能够读取到CSV,但没有成功绘制出图像。

总结

经过实验后并不是特别顺利,能够读取到CSV但没有成功绘制出图像,看板中部分查询语句中包含看板变量,CSV数据源无法实现看板变量。

方案二:拷贝Prometheus数据文件

Prometheus 按照两个小时为一个时间窗口,将两小时内产生的数据存储在一个块(Block)中。每个块都是一个单独的目录,里面含该时间窗口内的所有样本数据(chunks),元数据文件(meta.json)以及索引文件(index)。其中索引文件会将指标名称和标签索引到样板数据的时间序列中。此期间如果通过 API 删除时间序列,删除记录会保存在单独的逻辑文件 tombstone 当中。

Prometheus 为了防止丢失暂存在内存中的还未被写入磁盘的监控数据,引入了WAL机制。WAL被分割成默认大小为128M的文件段(segment),之前版本默认大小是256M,文件段以数字命名,长度为8位的整形。WAL的写入单位是页(page),每页的大小为32KB,所以每个段大小必须是页的大小的整数倍。如果WAL一次性写入的页数超过一个段的空闲页数,就会创建一个新的文件段来保存这些页,从而确保一次性写入的页不会跨段存储。这些数据暂时没有持久化,TSDB通过WAL将数据保存到磁盘上(保存的数据没有压缩,占用内存较大),当出现宕机,启动多协程读取WAL,恢复数据。

[mingming.chen@m162p65 data]$ tree
.
├── 01E2MA5GDWMP69GVBVY1W5AF1X
│   ├── chunks               # 保存压缩后的时序数据,每个chunks大小为512M,超过会生成新的chunks
│   │   └── 000001
│   ├── index                # chunks中的偏移位置
│   ├── meta.json            # 记录block块元信息,比如 样本的起始时间、chunks数量和数据量大小等
│   └── tombstones           # 通过API方式对数据进行软删除,将删除记录存储在此处(API的删除方式,并不是立即将数据从chunks文件中移除)
├── 01E2MH175FV0JFB7EGCRZCX8NF
│   ├── chunks
│   │   └── 000001
│   ├── index
│   ├── meta.json
│   └── tombstones
├── 01E2MQWYDFQAXXPB3M1HK6T20A
│   ├── chunks
│   │   └── 000001
│   ├── index
│   ├── meta.json
│   └── tombstones
├── lock
├── queries.active
└── wal                      #防止数据丢失(数据收集上来暂时是存放在内存中,wal记录了这些信息)
    ├── 00000366             #每个数据段最大为128M,存储默认存储两个小时的数据量。
    ├── 00000367
    ├── 00000368
    ├── 00000369
    └── checkpoint.000365
        └── 00000000

无论是block数据还是wal数据,都是可以直接打包,转移到本地的Prometheus,需要注意的是版本问题和本地Prometheus不能有数据,如果本地监控数据目录不为空,那么导入时会出现问题(因为时间问题),只需要近期数据,太远的数据没有价值,可以通过block文件里面的meta.json查看时间戳 例如,在meta.json中,会看到这样的内容:

jsonCopy code{
  "ulid": "01E2MA5GDWMP69GVBVY1W5AF1X",
  "minTime": 1609459200000,
  "maxTime": 1609545600000,
}

tsdb创建快照的方式,需要Prometheus 在启动时开启了–web.enable-admin-api,发送POST请求后就会在Prometheus 数据目录创建block文件的硬链接,作为快照。可以传输至其他机器恢复。 复制数据目录和tsdb快照都能够复制数据迁移到另一台Prometheus ,区别在于下表

对比好处坏处
file能够灵活的决定是否拷贝这个block文件版本兼容性可能一般
snapshot操作简单不易出错,版本支持会比操作文件更好只能对整个数据库快照,不能选择范围

总结

能够将客户的监控数据转移到我们的Prometheus (数据量不太可控),Grafana照常使用不影响,不会影响看板

方案三:tsdb command-line tool /promtool tsdb dump

这里有一个TSDB的PR,让tsdb支持导出功能,但是随着Prometheus 和TSDB的开发,TSDB已经被合入了Prometheus ,Prometheus 的官方文档中记录的promtool tsdb dump提供了导出tsdb数据的功能

promtool tsdb dump 的使用方法见官方文档,它会读取Prometheus 数据目录的文件,作为stdout打印出来,我们可以重定向到文本文件再使用grep之类的工具进行过滤(比如,我们只需要和es相关的指标序列),拿到了想要的数据,想办法导入到我们的Prometheus 。

使用promtool tsdb dump导出的数据并不是Openmetrics格式。实际上,它是一种更为底层、直接反映存储结构的格式,主要用于调试和低级数据操作。

Prometheus 2.24.0 / 2021-01-06支持了backfilling功能(能够将Openmetric格式的数据导入到Prometheus )

于是有人提议promtool tsdb dump导出的数据如果是Openmetric格式,那么可以实现更方便的导入和导出,Prometheus 作者的意见,迁移数据的正确方式就是tsdb的快照功能。

总结

如果使用promtool tsdb dump导出,然后通过shell/python转换成Openmetric标准格式文本文件,就能够通过我们的promtool导入,测试已经导入成功。问题是promtool tsdb dump此命令不能对数据进行筛选,会导致无意义的耗时。

方案四:使用API查询指定时间段指标,转换成Openmetric

Prometheus 2.24.0 / 2021-01-06支持了backfilling功能(能够将Openmetric格式的数据导入到Prometheus )

通过API查询到想要的数据(能够自定义时间范围,自定义指标名称,自定义步长),没有多余的耗时

总结

以上代码顺利查询指标并导出,仅依赖requests,如果有必要或许能够通过shell处理,能够进一步减少依赖,直接运行即可,导出200多条时间序列,前30小时的数据,最后文本文件的大小不到500M,在可以接受的范围,主要原因是Openmetric格式中太多的重复字段。

# -*- coding: utf-8 -*-
# !/usr/bin/python
import multiprocessing
import subprocess
import threading
import datetime
import tarfile
import shutil
import json
import gzip
import time
import sys
import re
import os
import io
from multiprocessing.pool import ThreadPool
from multiprocessing import Pool
from threading import Thread

if sys.version_info.major == 2:
    from Queue import Queue, Empty
    from urllib import quote

    reload(sys)
    sys.setdefaultencoding('utf-8')
else:
    from queue import Queue, Empty
    from urllib.parse import quote

GRAFANA_METRICS_BASE = ["elasticsearch_jvm_gc_collection_seconds_count", "elasticsearch_jvm_gc_collection_seconds_sum",
                        "elasticsearch_jvm_memory_max_bytes", "elasticsearch_jvm_memory_pool_peak_used_bytes",
                        "elasticsearch_jvm_memory_pool_used_bytes", "elasticsearch_jvm_memory_used_bytes",
                        "elasticsearch_nodes_roles", "elasticsearch_os_cpu_percent", "elasticsearch_os_load1",
                        "elasticsearch_os_load15", "elasticsearch_os_load5", "elasticsearch_process_open_files_count",
                        "elasticsearch_search_active_queries", "elasticsearch_thread_pool_active_count",
                        "elasticsearch_thread_pool_completed_count", "elasticsearch_thread_pool_queue_count",
                        "elasticsearch_thread_pool_rejected_count", "elasticsearch_transport_rx_size_bytes_total",
                        "elasticsearch_transport_tx_size_bytes_total", "elasticsearch_breakers_estimated_size_bytes",
                        "elasticsearch_breakers_limit_size_bytes",
                        "elasticsearch_breakers_tripped"]
GRAFANA_METRICS_FILE = ["elasticsearch_filesystem_data_available_bytes", "elasticsearch_filesystem_data_size_bytes",
                        "elasticsearch_filesystem_data_used_percent",
                        "elasticsearch_filesystem_io_stats_device_operations_count",
                        "elasticsearch_filesystem_io_stats_device_read_operations_count",
                        "elasticsearch_filesystem_io_stats_device_read_size_kilobytes_sum",
                        "elasticsearch_filesystem_io_stats_device_write_operations_count",
                        "elasticsearch_filesystem_io_stats_device_write_size_kilobytes_sum"]
GRAFANA_METRICS_CLUSTER = ["elasticsearch_cluster_health_active_primary_shards",
                           "elasticsearch_cluster_health_active_shards",
                           "elasticsearch_cluster_health_delayed_unassigned_shards",
                           "elasticsearch_cluster_health_initializing_shards",
                           "elasticsearch_cluster_health_number_of_data_nodes",
                           "elasticsearch_cluster_health_number_of_nodes",
                           "elasticsearch_cluster_health_number_of_pending_tasks",
                           "elasticsearch_cluster_health_relocating_shards", "elasticsearch_cluster_health_status",
                           "elasticsearch_cluster_health_unassigned_shards"]
GRAFANA_METRICS_XDCR = ["elasticsearch_xdcr_tasks_index_number",
                        "elasticsearch_xdcr_tasks_repository", "elasticsearch_cat_xdcr_localDocs",
                        "elasticsearch_cat_xdcr_localGCP", "elasticsearch_cat_xdcr_localLCP",
                        "elasticsearch_cat_xdcr_localSeqNo", "elasticsearch_cat_xdcr_remoteDocs",
                        "elasticsearch_cat_xdcr_remoteGCP", "elasticsearch_cat_xdcr_remoteLCP",
                        "elasticsearch_cat_xdcr_remoteSeqNo"]
GRAFANA_METRICS_ABOUT_NODE_COUNT = ["elasticsearch_indices_docs",
                                    "elasticsearch_indices_docs_deleted",
                                    "elasticsearch_indices_fielddata_evictions",
                                    "elasticsearch_indices_fielddata_memory_size_bytes",
                                    "elasticsearch_indices_filter_cache_evictions",
                                    "elasticsearch_indices_flush_time_seconds",
                                    "elasticsearch_indices_flush_total",
                                    "elasticsearch_indices_get_exists_time_seconds",
                                    "elasticsearch_indices_get_exists_total",
                                    "elasticsearch_indices_get_missing_time_seconds",
                                    "elasticsearch_indices_get_missing_total",
                                    "elasticsearch_indices_get_time_seconds",
                                    "elasticsearch_indices_get_total",
                                    "elasticsearch_indices_indexing_delete_time_seconds_total",
                                    "elasticsearch_indices_indexing_delete_total",
                                    "elasticsearch_indices_indexing_index_time_seconds_total",
                                    "elasticsearch_indices_indexing_index_total",
                                    "elasticsearch_indices_merges_docs_total",
                                    "elasticsearch_indices_merges_total",
                                    "elasticsearch_indices_merges_total_size_bytes_total",
                                    "elasticsearch_indices_merges_total_time_seconds_total",
                                    "elasticsearch_indices_query_cache_evictions",
                                    "elasticsearch_indices_query_cache_memory_size_bytes",
                                    "elasticsearch_indices_refresh_time_seconds_total",
                                    "elasticsearch_indices_refresh_total",
                                    "elasticsearch_indices_search_fetch_time_seconds",
                                    "elasticsearch_indices_search_fetch_total",
                                    "elasticsearch_indices_search_query_time_seconds",
                                    "elasticsearch_indices_search_query_total",
                                    "elasticsearch_indices_segments_count",
                                    "elasticsearch_indices_segments_memory_bytes",
                                    "elasticsearch_indices_store_size_bytes",
                                    "elasticsearch_indices_store_throttle_time_seconds_total",
                                    "elasticsearch_indices_translog_operations",
                                    "elasticsearch_indices_translog_size_in_bytes"]
GRAFANA_METRICS_ABOUT_INDEX_COUNT = ["elasticsearch_index_stats_index_current",
                                     "elasticsearch_index_stats_indexing_index_time_seconds_total",
                                     "elasticsearch_index_stats_indexing_index_total",
                                     "elasticsearch_index_stats_search_fetch_time_seconds_total",
                                     "elasticsearch_index_stats_search_query_time_seconds_total",
                                     "elasticsearch_index_stats_search_query_total",
                                     "elasticsearch_indices_deleted_docs_total",
                                     "elasticsearch_indices_docs_primary",
                                     "elasticsearch_indices_docs_total",
                                     "elasticsearch_indices_segment_count_primary",
                                     "elasticsearch_indices_segment_count_total",
                                     "elasticsearch_indices_segment_doc_values_memory_bytes_primary",
                                     "elasticsearch_indices_segment_doc_values_memory_bytes_total",
                                     "elasticsearch_indices_segment_fields_memory_bytes_primary",
                                     "elasticsearch_indices_segment_fields_memory_bytes_total",
                                     "elasticsearch_indices_segment_fixed_bit_set_memory_bytes_primary",
                                     "elasticsearch_indices_segment_fixed_bit_set_memory_bytes_total",
                                     "elasticsearch_indices_segment_index_writer_memory_bytes_primary",
                                     "elasticsearch_indices_segment_index_writer_memory_bytes_total",
                                     "elasticsearch_indices_segment_memory_bytes_primary",
                                     "elasticsearch_indices_segment_memory_bytes_total",
                                     "elasticsearch_indices_segment_norms_memory_bytes_primary",
                                     "elasticsearch_indices_segment_norms_memory_bytes_total",
                                     "elasticsearch_indices_segment_points_memory_bytes_primary",
                                     "elasticsearch_indices_segment_points_memory_bytes_total",
                                     "elasticsearch_indices_segment_terms_memory_primary",
                                     "elasticsearch_indices_segment_terms_memory_total",
                                     "elasticsearch_indices_segment_version_map_memory_bytes_primary",
                                     "elasticsearch_indices_segment_version_map_memory_bytes_total",
                                     "elasticsearch_indices_store_size_bytes_primary",
                                     "elasticsearch_indices_store_size_bytes_total"]
GRAFANA_METRICS = {
    '1': GRAFANA_METRICS_BASE,
    '2': GRAFANA_METRICS_FILE,
    '3': GRAFANA_METRICS_CLUSTER,
    '4': GRAFANA_METRICS_XDCR,
    '5': GRAFANA_METRICS_ABOUT_NODE_COUNT,
    '6': GRAFANA_METRICS_ABOUT_INDEX_COUNT
}
GLOBAL_CPU_CORES = None
GLOBAL_THREAD_COUNT = None

log_lock = threading.Lock()

session = None
"""设置session,降低请求耗时"""


def setup_session():
    global session
    try:
        import requests
        from requests.adapters import HTTPAdapter
        from urllib3.util.retry import Retry
        from requests.packages.urllib3.exceptions import InsecureRequestWarning
        session = requests.Session()
        retries = Retry(total=5, backoff_factor=0.1, status_forcelist=[500, 502, 503, 504])
        session.mount('https://', HTTPAdapter(max_retries=retries))
        requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
    except ImportError:
        session = None


def setting_threading():
    global GLOBAL_THREAD_COUNT
    global GLOBAL_CPU_CORES
    GLOBAL_CPU_CORES = multiprocessing.cpu_count()
    log_info("当前机器的CPU核心数是{}。".format(GLOBAL_CPU_CORES))
    log_info("设置合理的线程数,取决于本机的核心数,也取决于Prometheus的负载压力。")
    while True:
        threading_input = one_input("请输入希望设置的线程数(建议值为核心数的2倍,回车确认): ")
        if not threading_input:
            GLOBAL_THREAD_COUNT = GLOBAL_CPU_CORES * 2
            break
        try:
            threading_count = int(threading_input)
            if threading_count > 0:
                GLOBAL_THREAD_COUNT = threading_count
                break
            else:
                log_warning("线程数必须是正整数,请重新输入。")
        except ValueError:
            log_warning("无效输入,请输入一个正整数。")
    return GLOBAL_THREAD_COUNT


def one_input(prompt):
    return raw_input(prompt) if sys.version_info.major == 2 else input(prompt)


def log_info(message, *args):
    """为了防止多线程打日志太快造成混乱"""
    with log_lock:
        print('[[ INFO ]] ' + message.format(*args))


def log_warning(message, *args):
    with log_lock:
        print('[[ WARNING ]] ' + message.format(*args))


def log_error(message, *args):
    with log_lock:
        print('[[ ERROR ]] ' + message.format(*args))


def query_prometheus(url, auth, headers=None):
    """通用的请求函数,支持requests和curl"""
    try:
        if session:
            response = session.get(url, auth=auth, verify=False, headers=headers)
            response.raise_for_status()
            log_info("request请求的链接是 "
                     "{}", response.url)
            return response.json()
        else:
            cmd = [
                'curl', '-s', '-k', '-L',  # 静默输出,忽略证书,支持重定向
                '--keepalive',  # 允许在多个请求之间重用TCP连接
                '--compressed',  # 接受压缩的响应
                '--connect-timeout', '5',  # 设置连接超时为5秒
                '--max-time', '10',  # 设置请求最大持续时间为10秒
                '--retry', '3',  # 设置重试次数为3
                '--retry-delay', '2',  # 设置重试间隔为2秒
            ]
            if auth is not None:
                cmd += ['-u', '{}:{}'.format(auth[0], auth[1])]
            if headers is not None:
                for key, value in headers.items():
                    cmd += ['-H', '{}: {}'.format(key, value)]
            cmd.append(url)
            log_info("curl请求的链接是{}", subprocess.list2cmdline(cmd))
            response = subprocess.check_output(cmd)
            return json.loads(response)
    except Exception as e:
        log_error("获取数据失败,请检查{},原因:{}", url, str(e))
        return None


def setting_prometheus_url():
    """等待用户输入Prometheus_url"""
    while True:
        prometheus_url = one_input("请输入要查询的Prometheus地址(域名或IP+端口,结尾不要斜线):")
        if re.match(r'^https?://', prometheus_url):
            return prometheus_url
        else:
            log_warning("请输入正确的地址,形如https://xxx.com或者http://x.x.x.x:9090")


def setting_auth():
    """等待用户输入认证"""
    while True:
        username = one_input("请输入要查询的Prometheus用户名/无认证可回车跳过:")
        password = one_input("请输入要查询的Prometheus密码/无认证可回车跳过:")
        if username and not password:
            log_warning("请提供完整的用户名和密码/无认证可回车跳过。")
            continue
        elif not username and password:
            log_warning("请提供完整的用户名和密码/无认证可回车跳过。")
            continue
        auth = (username, password) if username and password else None
        return auth


def setting_time(step):
    """等待用户输入查询的时间范围或小时数"""
    while True:
        time_str = one_input("请输入往前查的时间范围,例如:20231111-20231112 或者小时数:")
        if re.match(r'\d{8}-\d{8}', time_str):
            start_date, end_date = time_str[:8], time_str[9:]
            try:
                start_time = datetime.datetime.strptime(start_date, "%Y%m%d").strftime("%Y-%m-%dT%H:%M:%SZ")
                end_time = datetime.datetime.strptime(end_date, "%Y%m%d").strftime("%Y-%m-%dT%H:%M:%SZ")
                hours = int((datetime.datetime.strptime(end_date, "%Y%m%d") -
                             datetime.datetime.strptime(start_date, "%Y%m%d")).total_seconds() / 3600)
                if check_query_limit(hours, step):
                    log_warning("查询数据量超过11000,请重新输入")
                else:
                    return start_time, end_time, hours
            except ValueError:
                log_warning("请输入正确的日期格式,例如:20231111-20231112")
        elif time_str.isdigit():
            hours = int(time_str)
            if check_query_limit(hours, step):
                log_warning("查询数据量超过11000,请重新输入")
            else:
                end_time = datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%SZ")
                query_time = datetime.datetime.now() - datetime.timedelta(hours=hours)
                start_time = query_time.strftime("%Y-%m-%dT%H:%M:%SZ")
                return start_time, end_time, hours
        else:
            log_warning("请输入正确的时间范围,例如:20231111-20231112 或者小时数")


def check_query_limit(hours, step):
    """检查是否超出Prometheus限制"""
    return hours * 60 * 60 / step > 11000


def get_clusters(prometheus_url, auth):
    """查询Prometheus中全部的集群列表"""
    cluster_url = '{}/api/v1/label/cluster/values'.format(prometheus_url)
    clusters = query_prometheus(cluster_url, auth)
    if not clusters:
        log_error("没有查询到任何集群")
        sys.exit(1)
    print("监控的集群有:")
    for cluster in clusters['data']:
        print(cluster)
    return clusters


def setting_cluster(clusters):
    """等待用户输入要查询的集群"""
    while True:
        cluster = one_input("请输入要查询集群名称:")
        if clusters and cluster in clusters['data']:
            return cluster
        else:
            log_warning("输入不存在,请输入正确的集群名称")


def get_scrape_interval(config, cluster):
    """匹配配置文件中的全局scrape_interval和每个集群的scrape_interval"""
    scrape_interval_pattern = re.compile(
        r'job_name:\s+.*?{}/.*?\s+.*?scrape_interval:\s+(\d+)s'.format(re.escape(cluster)),
        re.DOTALL
    )
    scrape_interval_match = scrape_interval_pattern.search(config)
    if scrape_interval_match:
        return int(scrape_interval_match.group(1))
    global_scrape_interval_pattern = re.compile(r'global:\s+scrape_interval:\s+(\d+)s')
    global_scrape_interval_match = global_scrape_interval_pattern.search(config)
    if global_scrape_interval_match:
        return int(global_scrape_interval_match.group(1))
    return None


def setting_step(prometheus_url, auth, cluster):
    """等待用户设置查询间隔"""
    config_url = '{}/api/v1/status/config'.format(prometheus_url)
    config = query_prometheus(config_url, auth)['data']['yaml']
    scrape_interval = get_scrape_interval(config, cluster)
    if scrape_interval is None:
        log_warning("无法找到集群{}或全局配置的爬取间隔".format(cluster))
        return None
    while True:
        step_input = one_input(
            "集群{}配置的爬取间隔是{}秒,回车使用默认值,或输入任意正整数值:".format(cluster, scrape_interval)
        )
        if not step_input:
            log_info("最大可以往前查{}小时".format(int(11000 / (3600 / scrape_interval))))
            return scrape_interval
        try:
            step = int(step_input)
            if step > 0:
                log_info("最大可以往前查{}小时".format(int(11000 / (3600 / step))))
                return step
            else:
                log_warning("请输入一个正整数")
        except ValueError:
            log_warning("请输入一个正整数")


def setting_metrics(step, hours):
    """等待用户选择要查询的指标"""
    total_query_count = int(3600 / step * hours)
    metrics_message = ("""输入数字代表要查询的指标集(无需分隔)
1.jvm_gc_thread_pool(jvm/gc相关指标,建议选择)
2.filesystem_io(文件系统相关指标,建议选择)
3.cluster_health(集群健康相关指标,建议选择)
4.xdcr_tasks(未开启跨集群复制,请勿选择此项)
5.elasticsearch_indices.*(每个elasticsearch_indices开头的指标,有[[{}*节点数]]行数据)
6.elasticsearch_index.*(每个elasticsearch_indices开头的指标,有[[{}*索引数]]行数据)
[[ WARNING ]] 在下面的步骤中,可以自定义输入要额外查询的指标,但是请记住上面的两条"""
                       .format(total_query_count, total_query_count))
    print(metrics_message)
    metrics_list = []
    unique_options = []
    while True:
        options = one_input("请输入要查询集合如123,回车跳过: ")
        if not options:
            log_warning("已跳过,请在下一轮输入准确的所需指标名")
            break
        if not options.isdigit():
            log_warning("输入包含不合规字符,请重新输入。")
            continue
        invalid_metrics = [metric for metric in options if metric not in GRAFANA_METRICS]
        if invalid_metrics:
            log_warning("不合规的输入:{}".format(' '.join(invalid_metrics)))
            continue
        for single_option in options:
            if single_option not in unique_options:
                unique_options.append(single_option)
                metrics_list.extend(GRAFANA_METRICS[single_option])
        break
    return metrics_list, unique_options


def get_all_metrics(prometheus_url, auth):
    """查询Prometheus中有哪些metrics"""
    series_url = '{}/api/v1/label/__name__/values'.format(prometheus_url)
    try:
        series = query_prometheus(series_url, auth)
        if not series:
            raise ValueError("获取指标失败,返回结果为空")
        all_metrics = [metric for metric in series.get('data', []) if 'elasticsearch' in metric]
        return all_metrics
    except Exception as e:
        log_error("查询指标时发生错误: {}".format(e))
        sys.exit(1)


def setting_extra_metrics(metrics_list, prometheus_url, auth):
    """列出所有可用且未被选择的Elasticsearch指标供用户选择,支持用户添加额外的查询指标"""
    all_metrics = get_all_metrics(prometheus_url, auth)
    available_metrics = set(all_metrics) - set(metrics_list)
    if available_metrics:
        log_info("以下是未添加的可用的Elasticsearch指标列表,可以在指导下添加:")
        print(",".join(sorted(available_metrics)))
        while True:
            add_metrics = one_input("可以在指导下补充额外的查询指标,逗号分隔,回车跳过:")
            if not add_metrics:
                break
            add_metrics_list = [metric.strip() for metric in add_metrics.split(',')]
            for metric in add_metrics_list:
                if metric:
                    if metric in metrics_list:
                        log_info("{}指标已经添加,跳过".format(metric))
                    elif metric in all_metrics:
                        metrics_list.append(metric)
                        log_info("添加指标:{}".format(metric))
                    else:
                        log_warning("不支持的指标{}".format(metric))
    else:
        log_info("所有的Elasticsearch指标已添加。")
    return metrics_list


def get_active_indices(prometheus_url, auth, cluster, start_time, hours):
    top_n = None
    log_info("为了提高查询效率,我们提供了查看特定数量最活跃或最大的索引的功能。")
    while top_n is None:
        user_input = one_input("请输入要查看的索引数量,(仅列出,供复制)")
        if user_input.isdigit():
            top_n = int(user_input)
        else:
            log_warning("输入错误,请输入一个有效的数字表示要查看的索引数量。")
    start_time_dt = datetime.datetime.strptime(start_time, '%Y-%m-%dT%H:%M:%SZ')
    end_time_dt = start_time_dt + datetime.timedelta(hours=hours)
    end_time_str = end_time_dt.strftime('%Y-%m-%dT%H:%M:%SZ')
    top_docs_query = (
        'topk({top_n}, max_over_time(elasticsearch_indices_docs_total{{cluster="{cluster}"}}[{hours}h]))'
        .format(top_n=top_n, cluster=cluster, hours=hours))
    top_docs_query = quote(top_docs_query)
    top_docs_url = ('{}/api/v1/query?query={}&time={}'
                    .format(prometheus_url, top_docs_query, end_time_str))
    top_docs_data = query_prometheus(top_docs_url, auth)
    if not top_docs_data:
        log_error("查询文档数最多的索引失败,请检查了是否采集了elasticsearch_indices_docs_total指标")
        sys.exit(1)
    top_activity_query = (
        'topk({top_n}, max_over_time(elasticsearch_index_stats_indexing_index_total{{cluster="{cluster}"}}[{hours}h]) '
        '+ max_over_time(elasticsearch_index_stats_search_query_total{{cluster="{cluster}"}}[{hours}h]))'
        .format(top_n=top_n, cluster=cluster, hours=hours))
    top_activity_query = quote(top_activity_query)
    top_activity_url = ('{}/api/v1/query?query={}&time={}'
                        .format(prometheus_url, top_activity_query, end_time_str))
    top_activity_data = query_prometheus(top_activity_url, auth)
    if not top_activity_data:
        log_error("查询活跃度最高的索引失败,请检查是否采集了elasticsearch_index_stats_indexing_index_total指标")
        sys.exit(1)
    top_docs_indices = [result['metric']['index'] for result in top_docs_data['data']['result']]
    top_activity_indices = [result['metric']['index'] for result in top_activity_data['data']['result']]
    big_and_active_indices = set(top_docs_indices) | set(top_activity_indices)
    log_info("文档数最多的前{}个索引:\n{}".format(len(top_docs_indices), ', '.join(top_docs_indices)))
    log_info("活跃度最高的前{}个索引:\n{}".format(len(top_activity_indices), ', '.join(top_activity_indices)))
    log_info("文档数最多和活跃度最高的前{}个索引:\n{}"
             .format(len(big_and_active_indices), ', '.join(big_and_active_indices)))
    return top_docs_indices, top_activity_indices, big_and_active_indices


def setting_indices(prometheus_url, auth, cluster, start_time, hours, unique_options):
    """等待用户选择要查询的索引"""
    indices_list = []
    if '6' in unique_options:
        top_docs_indices, top_activity_indices, big_and_active_indices = get_active_indices(prometheus_url, auth,
                                                                                            cluster, start_time, hours)
        while True:
            indices = one_input("选择了查询特定索引的指标(直接回车可以取消),请指定要查询的索引,逗号分隔,回车结束:")
            if not indices:
                log_info("没有输入任何索引,将取消选项6的查询。")
                unique_options.remove('6')
                break
            else:
                indices_list = [index.strip() for index in indices.split(',') if index.strip()]
                if all(index in big_and_active_indices for index in indices_list):
                    log_info("已选择索引: {}".format(", ".join(indices_list)))
                    return indices_list, unique_options
                else:
                    indices_list = []  # 清空indices_list以便重新输入
                    log_warning("输入的索引不存在于最活跃或最大的索引列表中,请重新输入有效的索引。")
    return indices_list, unique_options


def generate_promql(cluster, metrics_list, indices_list, metrics_with_index):
    """promql生成,如果是pass就使用pod,如果是不pass就使用cluster,如果指标带有index并且提供了indices就填充promql"""
    promql_list = []
    template_with_index = '{}{{cluster=~"{}.*", index="{}"}}'
    template_without_index = '{}{{cluster=~"{}.*"}}'
    for metric in metrics_list:
        needs_index = metric in GRAFANA_METRICS['6'] or metric in metrics_with_index
        if needs_index and indices_list:
            for index in indices_list:
                promql_list.append(template_with_index.format(metric, cluster, index))
        else:
            promql_list.append(template_without_index.format(metric, cluster))
    if not promql_list:
        log_error("没有生成任何PromQL查询")
        sys.exit(1)
    return promql_list


def get_buffer_size():
    """计算合适的文件缓冲区大小。"""
    mem_bytes = os.sysconf('SC_PAGE_SIZE') * os.sysconf('SC_PHYS_PAGES')
    mem_gib = mem_bytes / (1024 ** 3)  # 转换为GiB
    if mem_gib >= 16:
        return 262144  # 256KiB
    elif mem_gib >= 8:
        return 131072  # 128KiB
    elif mem_gib >= 4:
        return 65536  # 64KiB
    else:
        return 32768  # 32KiB


def get_metrics_with_index_label(prometheus_url, auth, start_time):
    """查询Prometheus中在指定时间范围内所有带有index标签的指标"""
    series_api_url = ("{}/api/v1/series?match[]=%7Bindex%3D~%22.%2B%22%7D&start={}&end={}"
                      .format(prometheus_url, quote(start_time), quote(start_time)))
    try:
        data = query_prometheus(series_api_url, auth)
        metrics_with_index = set()
        for series in data.get('data', []):
            metric_name = series['__name__']
            metrics_with_index.add(metric_name)
        return metrics_with_index
    except Exception as e:
        log_error("查询带有index的指标时发生错误:{}".format(e))
        sys.exit(1)


def get_metrics(prometheus_url, auth, promql_list, start_time, end_time, cluster, unique_options, hours, step):
    """使用线程池对Prometheus进行请求,计算进度,使用队列边查边写边压缩,节省时间"""
    metrics_data_queue = Queue()
    query_pool = ThreadPool(GLOBAL_THREAD_COUNT)
    write_thread = Thread(target=write_metrics_to_file,
                          args=(metrics_data_queue, cluster, unique_options, hours, step, promql_list))
    write_thread.start()
    tasks = []
    for promql in promql_list:
        encoded_promql = quote(promql)
        metrics_url = "{}/api/v1/query_range?query={}&start={}&end={}&step={}s".format(prometheus_url, encoded_promql,
                                                                                       start_time, end_time, step)
        headers = {'Content-Type': 'application/json'}
        task = query_pool.apply_async(query_prometheus, (metrics_url, auth, headers))
        tasks.append(task)
    total_queries = len(tasks)
    queries_done = 0
    try:
        for task in tasks:
            try:
                prometheus_data = task.get(timeout=300)
                if 'data' in prometheus_data and 'result' in prometheus_data['data']:
                    for result in prometheus_data['data']['result']:
                        metrics_data_queue.put(result)
                queries_done += 1
                if queries_done % 10 == 0 or queries_done == total_queries:
                    percent_complete = (queries_done / total_queries) * 100
                    log_info(
                        '#################  进度:{}/{} ({:.2f}%)'.format(queries_done, total_queries, percent_complete))
            except Exception as e:
                log_error("解析数据出错{}".format(e))
                queries_done += 1
                if queries_done % 10 == 0 or queries_done == total_queries:
                    percent_complete = (queries_done / total_queries) * 100
                    log_info(
                        '#################  进度:{}/{} ({:.2f}%)'.format(queries_done, total_queries, percent_complete))
            # queue_size = metrics_data_queue.qsize()
            # log_info("当前队列大小:{}".format(queue_size))  # 用于调试查询和读写队列状态
            # log_info('线程池活动线程数: {}'.format(len([t for t in query_pool._pool if t.is_alive()])))
    except Exception as e:
        log_error('在处理任务时发生错误: {}'.format(e))
    finally:
        metrics_data_queue.put(None)
        query_pool.close()
        query_pool.join()
    write_thread.join()


def write_metrics_to_file(metrics_data_queue, cluster, unique_options, hours, step, promql_list):
    """将指标写入文件,按文件行数分割后压缩"""
    compress_pool = Pool(GLOBAL_CPU_CORES // 2)  # CPU密集任务,使用多进程,比多进程更快,使用多核
    unique_options = ''.join(unique_options)
    file_index = 1
    line_count = 0
    total_query_count = int(3600 / step * hours)
    total_line = total_query_count * len(promql_list)
    file_count = 32  # 定义了文件数量,根据文件数,来计算单个文件的行数
    max_lines_per_file = total_line // file_count
    max_lines_per_file = max(max_lines_per_file, 50000)  # 最少行数,避免太多小文件
    filename_pattern = "openmetrics{}_{}_{}_{}h_{}.txt"
    filename = filename_pattern.format(file_index, cluster, unique_options, hours, step)
    buffer_size = get_buffer_size()
    with io.open(filename, 'w', buffering=buffer_size) as f:
        while True:
            try:
                result = metrics_data_queue.get(timeout=10)  # 设置超时时间为10秒
            except Empty:
                log_warning("队列已经10秒没有数据了,检查是否所有查询都已经完成。")
                continue
            if result is None:
                break
            metric_name = result['metric']['__name__']
            labels = ','.join(
                '{}="{}"'.format(key, value) for key, value in result['metric'].items() if key != '__name__')
            for value in result['values']:
                timestamp, metric_value = value
                if line_count >= max_lines_per_file:  # 文件数超过上限打印结束符号,index递增
                    f.write('# EOF\n')
                    f.flush()
                    f.close()
                    compress_pool.apply_async(compress_single, (filename,))  # 加入到压缩进程池
                    file_index += 1
                    filename = filename_pattern.format(file_index, cluster, unique_options, hours, step)
                    f = io.open(filename, 'w', buffering=buffer_size)  # 用同样的缓冲区大小打开新文件
                    line_count = 0
                f.write(u"{}{{{}}} {} {}\n".format(metric_name, labels, metric_value, timestamp))
                line_count += 1
    f.write('# EOF\n')
    f.flush()
    f.close()
    compress_pool.apply_async(compress_single, (filename,))
    compress_pool.close()
    compress_pool.join()


def compress_single(filename):
    """压缩单个txt文件,压缩后删除"""
    print("开始压缩文件:{}, 请不要终止脚本".format(filename))
    file_size = os.path.getsize(filename)
    if file_size > 1024 * 1024:
        gzip_filename = filename + '.gz'
        with open(filename, 'rb') as f_in, gzip.open(gzip_filename, 'wb') as f_out:
            shutil.copyfileobj(f_in, f_out)
        os.remove(filename)


def compress_all(directory, output_filename, cluster, unique_options, hours, step):
    """打包压缩全部符合格式的gz文件"""
    unique_options = ''.join(unique_options)
    expected_filename_part = "{}_{}_{}h_{}".format(cluster, unique_options, hours, step)
    with tarfile.open(output_filename, "w:gz") as tar:
        for filename in os.listdir(directory):
            if filename.endswith('.gz') and expected_filename_part in filename:
                tar.add(os.path.join(directory, filename), arcname=filename)


def initialize(step, prometheus_url, auth):
    """初始化,拿到用户输入的时间和选择的指标列表和选择项"""
    start_time, end_time, hours = setting_time(step)
    metrics_list, unique_options = setting_metrics(step, hours)
    metrics_list = setting_extra_metrics(metrics_list, prometheus_url, auth)
    return start_time, end_time, hours, metrics_list, unique_options


def main():
    intro_message = """##############################################################
脚本将通过Prometheus内置的API接口导出ElasticSearch相关的监控数据
只会获取ElasticSearch的各项状态信息,不会获取ElasticSearch的数据
设置的爬取间隔越大,能够查询的范围就越长,需要保证所在路径空间大于30G
脚本会对Prometheus造成小幅查询压力,运行前请检查Prometheus负载状态
脚本涉及到压缩和批量请求,在配置更高的机器上,导出的效率更高,耗时更短
脚本导出后,开始压缩不要终止,将当前路径的transportMe.tar.gz传回即可
##############################################################"""
    print(intro_message)
    setup_session()
    prometheus_url = setting_prometheus_url()
    auth = setting_auth()
    setting_threading()
    clusters = get_clusters(prometheus_url, auth)
    cluster = setting_cluster(clusters)
    step = setting_step(prometheus_url, auth, cluster)
    start_time, end_time, hours, metrics_list, unique_options = initialize(step, prometheus_url, auth)
    indices_list, unique_options = setting_indices(prometheus_url, auth, cluster, start_time, hours, unique_options)
    metrics_with_index = get_metrics_with_index_label(prometheus_url, auth, start_time)
    promql_list = generate_promql(cluster, metrics_list, indices_list, metrics_with_index)
    start_query_time = time.time()
    get_metrics(prometheus_url, auth, promql_list, start_time, end_time, cluster, unique_options, hours, step)
    compress_all(".", "transportMe.tar.gz", cluster, unique_options, hours, step)
    end_query_time = time.time()
    query_duration = round(end_query_time - start_query_time, 2)
    print("导出成功,总耗时{}秒,将transportMe.tar.gz传回即可".format(query_duration))


if __name__ == "__main__":
    main()

目前可行性最高的方案,本地测试和测试环境测试均已成功导出导入,由于Openmetric重复率较高,有较高的压缩比例,4GB的文本文件,可以压缩为40MB,导入到本地的 Prometheus ,重启 Prometheus (让其重载块文件)在Prometheus 和Grafana均可正常查询。

耗时估算

测试环境10个节点,846个索引,使用requests,耗时约4分钟

真实环境61个节点,2168个索引,使用requests,耗时约17分钟

更新记录

2023.12.4更新:

新增了一种请求方式,即使没有requests,也可以通过curl来发送请求,没有任何的外部依赖

2023.12.8更新:

新增了错误重输的功能,当用户输入不满足要求时,会提示重新输入并不会脚本退出

新增了指定时间范围查询,假如故障发生在上周,既可以接受小时数输入也可以接受日期范围的输入

2023.12.14更新:

新增了支持额外添加指标的功能,目前Grafana看板并没有用到exporter采集的全部指标

新增了分段查询的功能,最后写入多个文件,能够在导入时,提高导入速度,并且能够实现同步的查询压缩写入

2023.12.19更新:

新增了索引过滤功能,利用指标列举出被监控的索引中最大的,最活跃的,供用户选择复制

新增了线程池,加快了请求Prometheus的速度,会小幅增加Prometheus的查询压力

新增了查询进度的估算功能,用户可根据进度估算耗时,决定是否取消查询