elasticsearch-dump、logstash、reindex、snapshot方式进行数据迁移,实际上这几种工具大体上可以分为以下几类: scroll query + bulk:批量读取旧集群的数据然后再批量写入新集群,elasticsearch-dump、logstash、reindex都是采用这种方 snapshot:直接把旧集群的底层的文件进行备份,在新的集群中恢复出来,相比较scroll query + bulk的方式,snapshot的方式迁移速度最快。 从源 ES 集群通过备份api创建数据快照,然后在目标 ES 集群中进行恢复,无网络互通要求、迁移速度快、运维配置简单、适用于数据量大,接受离线数据迁移的场景,Snapshot and restore 模块允许创建单个索引或者整个集群的快照到远程仓库。所以首先需要创建一个存储快照的地方,存储方案可以选择一个NFS的共享存储,或者对象存储。

方案对比

根据以上汇总出如下四种方案,并对其各项特点进行分析,最终选择适合我们自身的迁移方案。

方案elasticsearch-dumpreindexsnapshotlogstash
基本原理逻辑备份,类似mysqldump将数据一条一条导出后再执行导入reindex 是 Elasticsearch 提供的一个 API 接口,可以把数据从一个集群迁移到另外一个集群从源集群通过Snapshot API 创建数据快照,然后在目标集群中进行恢复从一个集群中读取数据然后写入到另一个集群
网络要求集群间互导需要网络互通,先导出文件再通过文件导入集群则不需要网络互通网络需要互通无网络互通要求网络需要互通
迁移速度一般
适合场景适用于数据量小的场景适用于数据量不大,在线迁移数据的场景适用于数据量大,接受离线数据迁移的场景适用于数据量一般,近实时数据传输
配置复杂度中等简单复杂中等

方案测试

开始对以上各种方案进行测试对比,准备两台测试用的elasticsearch,要求有kibana,x-pack白金版

http://10.23.188.99:5601/  ###一台在采集/var/log/messages日志的ELK全套
http://10.23.191.99:5601/  ###一台空的elasticsearch

后面将以188和191区分二者

测试:snapshot

因为只是测试,选择最基础的配置,在elasticsearch的配置文件末尾加上path.repo: ["/data/backup"]创建对应的文件夹,并且需要对es用户赋权限,因为我们的elasticsearch是专门创建了一个es的用户去运行的,然后重启elasticsearch生效。

访问188的kibana的开发工具界面,执行以下。

#这里注册了一个名为backup的共享文件系统仓库,快照会存储在/home/backup目录,需要给es用户所有权,不然创建失败
PUT _snapshot/backup
{
    "type": "fs",
    "settings": {
        "location": "/data/backup",
        "compress": true
    }
}
GET _snapshot?pretty    #查看文件仓库创建的执行结果?pretty是美化返回结果
GET _snapshot/_all?pretty   #查看所有的仓库
 
PUT /_snapshot/backup/snapshot_test   #创建一个快照名叫snapshot_test,它包含所有以`messages_`开头的索引,经过测试它是支持通配符的。
{
    "indices": "messages_*",
    "ignore_unavailable": "true",   
    "include_global_state": false,
}

各项参数解释,为了提高快照和恢复的速度,保证过程不出错,需要详细对每一项参数掌握。

以下是创建快照的可选参数

SettingDescription
indicesThe indices you want to include in the snapshot. You can use , to create a list of indices, * to specify an index pattern, and - to exclude certain indices. Don’t put spaces between items. Default is all indices.(不建议把旧es的kibana索引制作快照同步到新es,会导致kibana出错)
ignore_unavailableIf an index from the indices list doesn’t exist, whether to ignore it rather than fail the snapshot. Default is false.
include_global_stateWhether to include cluster state in the snapshot. Default is true.
partialWhether to allow partial snapshots. Default is false, which fails the entire snapshot if one or more shards fails to store.是否允许部分快照。默认为 false,如果一个或多个分片存储失败,则整个快照失败。

以下是设置快照仓库的可选参数

location指定快照的存储位置。必须要有
compress指定是否对快照文件进行压缩. 默认是 true.
chunk_size如果需要在做快照的时候大文件可以被分解成几块。这个参数指明了每块的字节数。也可用不同的单位标识。 比如,1g,10m,5k等。默认是 null (表示不限制块大小)。
max_restore_bytes_per_sec每个节点恢复数据的最高速度限制. 默认是 20mb/s
max_snapshot_bytes_per_sec每个节点做快照的最高速度限制。默认是 20mb/s

以下是恢复快照的可选参数

SettingDescription
indicesThe indices you want to restore. You can use , to create a list of indices, * to specify an index pattern, and - to exclude certain indices. Don’t put spaces between items. Default is all indices.
ignore_unavailableIf an index from the indices list doesn’t exist, whether to ignore it rather than fail the restore operation. Default is false.
include_global_stateWhether to restore the cluster state. Default is false.
include_aliasesWhether to restore aliases alongside their associated indices. Default is true.
partialWhether to allow the restoration of partial snapshots. Default is false.
rename_patternIf you want to rename indices as you restore them, use this option to specify a regular expression that matches all indices you want to restore. Use capture groups (()) to reuse portions of the index name.
rename_replacementIf you want to rename indices as you restore them, use this option to specify the replacement pattern. Use $0 to include the entire matching index name, $1 to include the content of the first capture group, etc.
index_settingsIf you want to change index settings on restore, specify them here.
ignore_index_settingsRather than explicitly specifying new settings with index_settings, you can ignore certain index settings in the snapshot and use the cluster defaults on restore.

在开发工具界面看到执行成功后,可以前往刚才配置的仓库路径查看

[root@network_test_elk home]# cd backup/
[root@network_test_elk backup]# ll
总用量 32
-rw-r--r--  1 es es 12931 1月   3 19:30 index-0
-rw-r--r--  1 es es     8 1月   3 19:30 index.latest
drwxr-xr-x 51 es es  4096 1月   3 19:30 indices
-rw-r--r--  1 es es   193 1月   3 19:30 meta-82OPqtLbS-y1sgz6hZBCuw.dat
-rw-r--r--  1 es es   419 1月   3 19:30 snap-82OPqtLbS-y1sgz6hZBCuw.dat
####################################这些就是创建一次快照产生的文件#############################################
[root@network_test_elk backup]# ll >ll.txt ###将ll的结果保存到ll.txt,然后第二条重新创建了一次快照,对比文件变化。
[root@network_test_elk backup]# ll
总用量 36
-rw-r--r--  1 es   es   12931 1月   3 19:30 index-0
-rw-r--r--  1 es   es       8 1月   3 19:30 index.latest
drwxr-xr-x 51 es   es    4096 1月   3 19:30 indices
-rw-r--r--  1 root root   383 1月   4 14:54 ll.txt
-rw-r--r--  1 es   es     193 1月   3 19:30 meta-82OPqtLbS-y1sgz6hZBCuw.dat
-rw-r--r--  1 es   es     419 1月   3 19:30 snap-82OPqtLbS-y1sgz6hZBCuw.dat
[root@network_test_elk backup]# ll
总用量 48
-rw-r--r--  1 es   es   17517 1月   4 14:54 index-1
-rw-r--r--  1 es   es       8 1月   4 14:54 index.latest
drwxr-xr-x 52 es   es    4096 1月   4 14:54 indices
-rw-r--r--  1 root root   383 1月   4 14:54 ll.txt
-rw-r--r--  1 es   es     193 1月   3 19:30 meta-82OPqtLbS-y1sgz6hZBCuw.dat
-rw-r--r--  1 es   es     193 1月   4 14:54 meta-e129JOLcScyCjseeC-K6Fg.dat
-rw-r--r--  1 es   es     419 1月   3 19:30 snap-82OPqtLbS-y1sgz6hZBCuw.dat
-rw-r--r--  1 es   es     420 1月   4 14:54 snap-e129JOLcScyCjseeC-K6Fg.dat
[root@network_test_elk backup]# cat ll.txt 
总用量 32
-rw-r--r--  1 es   es   12931 1月   3 19:30 index-0
-rw-r--r--  1 es   es       8 1月   3 19:30 index.latest
drwxr-xr-x 51 es   es    4096 1月   3 19:30 indices
-rw-r--r--  1 root root     0 1月   4 14:54 ll.txt
-rw-r--r--  1 es   es     193 1月   3 19:30 meta-82OPqtLbS-y1sgz6hZBCuw.dat
-rw-r--r--  1 es   es     419 1月   3 19:30 snap-82OPqtLbS-y1sgz6hZBCuw.dat

创建快照的工作已经完成,在191新建一个文件仓库(跟前一个的es操作一样),重启后打开kibana界面

将刚才在另一台的快照文件通过scp -r 发送到当前es,然后可以通过kibana查看快照是否被正常识别

GET _snapshot/backup/_all?pretty
# 查看所有的快照
POST _snapshot/backup/snapshot_test/_restore?pretty
# 恢复snapshot_test这个快照

其实在kibana的开发工具的操作,也是可以通过可视化界面通过鼠标操作实现,测试中一共导入了85M,50个索引,制作快照和恢复快照的时间累计不超过五秒,虽然snapshot是最快的方案,但是仍要对其速度有所了解,准备一个1G的索引再次测试。

测试:elasticsearch-dump

elasticsearch-dump是一个开源的Elasticsearch数据迁移工具,详细信息请参见elasticsearch-dump官方文档

安装

yum install nodejs -y
yum install npm -y 
npm install elasticdump -g

在网络ELK中选取一个大小为1G的索引,用elasticdump ,填写基本用户认证开始测试。

elasticdump \
  --input=http://name:passw@localhost:9200/netops_pa_device-2022-12-15 \
  --output=http://name:passw@10.23.191.99:9200/netops_pa_device-2022-12-15 \
  --type=mapping

image-20230106182859431

直接从原es导入到新es,导入mapping用户基本可以忽略。

elasticdump \
  --input=http://name:passw@localhost:9200/netops_pa_device-2022-12-15 \
  --output=http://name:password@10.23.191.99:9200/netops_pa_device-2022-12-15 \
  --type=data

img

导入1G索引的data,用时十分漫长,由于时间太长,会话中断后,传输也会被中断,于是我们借助一个工具

yum install screen -y

screen 是一个非常有用的命令,提供从单个 SSH 会话中使用多个 shell 窗口(会话)的能力。当会话被分离或网络中断时,screen 会话中启动的进程仍将运行,你可以随时重新连接到 screen 会话

screen -S session_name           # 新建一个叫session_name的session
screen -ls(或者screen -list)     # 列出当前所有的session
screen -r session_name            # 回到session_name这个session
screen -d session_name           # 远程detach某个session
screen -d -r session_name        # 结束当前session并回到session_name这个session

具体操作如下

#!/bin/sh
echo `date` >~/dump.log
elasticdump \
  --input=http://elastic:Xhs@12345678@localhost:9200/netops_pa_device-2022-12-15 \
  --output=http://elastic:Xhs@12345678@10.23.191.99:9200/netops_pa_device-2022-12-15 \
  --type=mapping >>~/dump.log

elasticdump \
  --input=http://elastic:Xhs@12345678@localhost:9200/netops_pa_device-2022-12-15 \
  --output=http://elastic:Xhs@12345678@10.23.191.99:9200/netops_pa_device-2022-12-15 \
  --type=data >> ~/dump.log 

echo `date` >>~/dump.log

用screen新建一个会话,在会话中执行这个脚本。最后将两个时间相减就能够得到整体的运行耗时。

[root@es_network ~]# grep 日 dump.log 
2023年 01月 05日 星期四 13:05:59 CST
2023年 01月 05日 星期四 20:03:48 CST

七小时后终于把1GB的索引传到了新的es。

测试:logstash

迁移全量数据

input{
    elasticsearch{
        # 源端ES地址。
        hosts =>  ["http://localhost:9200"]
        # 安全集群配置登录用户名密码。
        user => "xxxxxx"
        password => "xxxxxx"
        # 需要迁移的索引列表,多个索引以英文以逗号(,)分隔。
        index => "kibana_sample_data_*"
        # 以下三项保持默认即可,包含线程数和迁移数据大小和Logstash JVM配置相关。
        docinfo=>true
        slices => 5
        size => 5000
    }
}

filter {
  # 去掉一些Logstash自己加的字段。
  mutate {
    remove_field => ["@timestamp", "@version"]
  }
}

output{
    elasticsearch{
        # 目标端ES地
        hosts => ["http://8.8.8.8:9200"]
        # 安全集群配置登录用户名密码。
        user => "elastic"
        password => "xxxxxx"
        # 目标端索引名称,以下配置表示索引与源端保持一致。
        index => "%{[@metadata][_index]}"
        # 目标端索引type,以下配置表示索引类型与源端保持一致。
        document_type => "%{[@metadata][_type]}"
        # 目标端数据的id,如果不需要保留原id,可以删除以下这行,删除后性能会更好。
        document_id => "%{[@metadata][_id]}"
        ilm_enabled => false
        manage_template => false
    }
}

迁移增量数据

input{
    elasticsearch{
        # 源端ES地址。
        hosts =>  ["http://localhost:9200"]
        # 安全集群配置登录用户名密码。
        user => "xxxxxx"
        password => "xxxxxx"
        # 需要迁移的索引列表,多个索引使用英文逗号(,)分隔。
        index => "kibana_sample_data_logs"
        # 按时间范围查询增量数据,以下配置表示查询最近5分钟的数据。
        query => '{"query":{"range":{"@timestamp":{"gte":"now-5m","lte":"now/m"}}}}'
        # 定时任务,以下配置表示每分钟执行一次。
        schedule => "* * * * *"
        scroll => "5m"
        docinfo=>true
        size => 5000
    }
}
filter {
  # 去掉一些Logstash自己加的字段.
  mutate {
    remove_field => ["@timestamp", "@version"]
  }
}
output{
    elasticsearch{
        # 目标端ES地址
        hosts => ["http://8.8.8.8:9200"]
        # 安全集群配置登录用户名密码.
        user => "elastic"
        password => "xxxxxx"
        # 目标端索引名称,以下配置表示索引与源端保持一致。
        index => "%{[@metadata][_index]}"
        # 目标端索引type,以下配置表示索引类型与源端保持一致。
        document_type => "%{[@metadata][_type]}"
        # 目标端数据的id,如果不需要保留原id,可以删除以下这行,删除后性能会更好。
        document_id => "%{[@metadata][_id]}"
        ilm_enabled => false
        manage_template => false
    }
}

logstash的迁移速度跟正常发送差不多,相对来说也是很慢的。

测试:reindex

ES在创建好索引后,mapping的properties属性类型是不能更改的,只能添加。如果说需要修改字段就需要重新建立索引然后把旧数据导到新索引。这时候就可以用reindex

首先为新的索引新建一个mapping,mapping中可以修改你要修改的字段的类型,再加入以下内容,优化导入效率。

修改了number_of_replicas和refresh_interval。

设置number_of_replicas为0防止我们迁移文档的同时又发送到副本节点,影响性能。

设置refresh_interval为-1是限制其刷新。默认是1秒

当我们数据导入完成后再把上面两个值进行修改即可。

POST /_reindex
{
  "source": {
    "index": "topic"
      },
  "dest": {
    "index": "topic-new"
  }
}

以上是reindex在同一集群中的使用,在跨集群时也能用于索引迁移。

在目标 ES 集群中配置 elasticsearch.yml中的reindex.remote.whitelist 参数,指明能够 reindex 的远程集群的白名单。

和同集群数据迁移基本一样,就是多了一个设置白名单而已。

设置好索引、number_of_replicas: 0、refresh_interval: -1

在remote中设置远程集群的地址与账号密码(如果配置了的话)。

也可以添加query属性,只查询符号条件的。

在新机器的kibana开发工具界面执行如下,实测1GB索引导入用时10分钟

POST _reindex
{
  "conflicts": "proceed", 
  "source": {
    "remote": {
      "host": "http://10.23.188.108:9200",
      "username": "elastic",
      "password": "**********",
      "socket_timeout": "1m",
      "connect_timeout": "1m"
    },
    "index": "netops_pa_device-2022-12-15",
    "size": 5000
  },
  "dest": {
    "index": "netops_pa_device-2022-12-15"
  }
}

完成之后记得重新配置number_of_replicas、refresh_interval。

Using wait_for_completion=false as query param will return the task id and you will be able to monitor the reindex progress using GET /_tasks/<task_id>.
In order to improve the speed, you should reduce the replicas to 0 and set refresh_interval=-1 in the destination index bevore reindexing and reset the values afterwards.