ES8生产实践——ES跨集群数据迁移方案测评
// 假设有两个ES集群,cluster1和cluster2,我们要从cluster1迁移数据到cluster2
// 引入Elasticsearch客户端
const elasticsearch = require('elasticsearch');
// 创建源集群和目标集群的客户端
const client1 = new elasticsearch.Client({ host: 'http://cluster1:9200', log: 'trace' });
const client2 = new elasticsearch.Client({ host: 'http://cluster2:9200', log: 'trace' });
// 定义迁移函数
async function migrateData(index, sourceClient, destClient) {
// 获取索引映射
const mapping = await sourceClient.indices.getMapping({ index });
// 创建目标索引并设置映射
await destClient.indices.create({ index, body: mapping[index].mappings });
// 获取索引的总文档数
const { count } = await sourceClient.count({ index });
let offset = 0;
const size = 1000;
// 分批获取数据并批量写入目标索引
while (offset < count) {
const { body } = await sourceClient.search({
index,
body: {
query: { match_all: {} },
size,
from: offset
}
});
const docs = body.hits.hits.map(hit => hit._source);
await destClient.bulk({
body: docs.map(doc => ({ index: { _index: index } })).concat(docs)
});
offset += size;
}
}
// 执行迁移操作
migrateData('my_index', client1, client2)
.then(() => console.log('迁移完成'))
.catch(err => console.error('迁移过程中出现错误:', err));
这段代码展示了如何使用Elasticsearch的JavaScript客户端从一个集群迁移数据到另一个集群。它首先获取源索引的映射,然后在目标集群中创建索引并设置相同的映射。接下来,它分批获取源索引的数据,并使用Elasticsearch的bulk API批量写入到目标索引。这个过程会循环执行,直到所有的文档都迁移完成。
评论已关闭