4 种 MySQL 同步 ES 方案

以下是针对“4 种 MySQL 同步 ES 方案”的简要解释和示例代码:

  1. 使用 MySQL binlog 同步数据到 Elasticsearch:



DELIMITER $$
 
CREATE TRIGGER `db_trigger` AFTER INSERT ON `db_table` FOR EACH ROW
BEGIN
    INSERT INTO es_table (id, data) VALUES (NEW.id, JSON_OBJECT('key', NEW.value));
END$$
 
DELIMITER ;
  1. 使用 Logstash 读取 MySQL 数据库并同步到 Elasticsearch:



input {
  jdbc {
    jdbc_driver_library => "/path/to/mysql-connector-java-x.x.x-bin.jar"
    jdbc_driver_class => "com.mysql.jdbc.Driver"
    jdbc_connection_string => "jdbc:mysql://localhost:3306/database_name"
    jdbc_user => "your_username"
    jdbc_password => "your_password"
    schedule => "* * * * *"
    statement => "SELECT id, value FROM db_table"
  }
}
 
output {
  elasticsearch {
    hosts => ["localhost:9200"]
    index => "es_index"
    document_id => "%{id}"
    document_type => "es_type"
  }
}
  1. 使用 Kafka 作为中介,将 MySQL 数据发送到 Elasticsearch:



// Java 示例代码,使用了开源库Debezium
DebeziumEngine<Object> engine = DebeziumEngine.create(
    Configuration.create()
    .with(ServiceLoader.load())
    .with(Json.class)
    .with(ObjectMapperType.NONE)
    .with(List.of(
        "io.debezium.config.Configuration"
    ))
    .with("name", "my-sql-connector")
    .with("connector.class", "io.debezium.connector.mysql.MySqlConnector")
    .with("tasks.max", "1")
    .with("database.hostname", "dbserver1")
    .with("database.port", "3306")
    .with("database.user", "myuser")
    .with("database.password", "mypass")
    .with("database.server.id", "184054")
    .with("database.server.name", "my-app-connector")
    .with("database.include.list", "mydb")
    .with("database.history.kafka.bootstrap.servers", "kafka:9092")
    .with("database.history.kafka.topic", "schema-changes.mydb")
    .with("include.schema.changes", "true")
    .build()
);
  1. 使用自定义应用程序同步数据:



import pymysql
from elasticsearch import Elasticsearch, helpers
 
# 连接到 MySQL
connection = pymysql.connect(host='localhost', user='user', password='pass', db='db', charset='utf8mb4')
 
# 连接到 Elasticsearch
es = Elasticsearch(['http://localhost:9200/'])
 
# 查询 MySQL 数据
with connection.cursor() as cursor:
    cursor.execute("SELECT id, value FROM db_table")
    rows = cursor.fetchall()
 
# 将数据批量写入 Elasticsearc

评论已关闭

推荐阅读

DDPG 模型解析,附Pytorch完整代码
2024年11月24日
DQN 模型解析,附Pytorch完整代码
2024年11月24日
AIGC实战——Transformer模型
2024年12月01日
Socket TCP 和 UDP 编程基础(Python)
2024年11月30日
python , tcp , udp
如何使用 ChatGPT 进行学术润色?你需要这些指令
2024年12月01日
AI
最新 Python 调用 OpenAi 详细教程实现问答、图像合成、图像理解、语音合成、语音识别(详细教程)
2024年11月24日
ChatGPT 和 DALL·E 2 配合生成故事绘本
2024年12月01日
omegaconf,一个超强的 Python 库!
2024年11月24日
【视觉AIGC识别】误差特征、人脸伪造检测、其他类型假图检测
2024年12月01日
[超级详细]如何在深度学习训练模型过程中使用 GPU 加速
2024年11月29日
Python 物理引擎pymunk最完整教程
2024年11月27日
MediaPipe 人体姿态与手指关键点检测教程
2024年11月27日
深入了解 Taipy:Python 打造 Web 应用的全面教程
2024年11月26日
基于Transformer的时间序列预测模型
2024年11月25日
Python在金融大数据分析中的AI应用(股价分析、量化交易)实战
2024年11月25日
AIGC Gradio系列学习教程之Components
2024年12月01日
Python3 `asyncio` — 异步 I/O,事件循环和并发工具
2024年11月30日
llama-factory SFT系列教程:大模型在自定义数据集 LoRA 训练与部署
2024年12月01日
Python 多线程和多进程用法
2024年11月24日
Python socket详解,全网最全教程
2024年11月27日