4 种 MySQL 同步 ES 方案
以下是针对“4 种 MySQL 同步 ES 方案”的简要解释和示例代码:
- 使用 MySQL binlog 同步数据到 Elasticsearch:
DELIMITER $$
CREATE TRIGGER `db_trigger` AFTER INSERT ON `db_table` FOR EACH ROW
BEGIN
INSERT INTO es_table (id, data) VALUES (NEW.id, JSON_OBJECT('key', NEW.value));
END$$
DELIMITER ;
- 使用 Logstash 读取 MySQL 数据库并同步到 Elasticsearch:
input {
jdbc {
jdbc_driver_library => "/path/to/mysql-connector-java-x.x.x-bin.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://localhost:3306/database_name"
jdbc_user => "your_username"
jdbc_password => "your_password"
schedule => "* * * * *"
statement => "SELECT id, value FROM db_table"
}
}
output {
elasticsearch {
hosts => ["localhost:9200"]
index => "es_index"
document_id => "%{id}"
document_type => "es_type"
}
}
- 使用 Kafka 作为中介,将 MySQL 数据发送到 Elasticsearch:
// Java 示例代码,使用了开源库Debezium
DebeziumEngine<Object> engine = DebeziumEngine.create(
Configuration.create()
.with(ServiceLoader.load())
.with(Json.class)
.with(ObjectMapperType.NONE)
.with(List.of(
"io.debezium.config.Configuration"
))
.with("name", "my-sql-connector")
.with("connector.class", "io.debezium.connector.mysql.MySqlConnector")
.with("tasks.max", "1")
.with("database.hostname", "dbserver1")
.with("database.port", "3306")
.with("database.user", "myuser")
.with("database.password", "mypass")
.with("database.server.id", "184054")
.with("database.server.name", "my-app-connector")
.with("database.include.list", "mydb")
.with("database.history.kafka.bootstrap.servers", "kafka:9092")
.with("database.history.kafka.topic", "schema-changes.mydb")
.with("include.schema.changes", "true")
.build()
);
- 使用自定义应用程序同步数据:
import pymysql
from elasticsearch import Elasticsearch, helpers
# 连接到 MySQL
connection = pymysql.connect(host='localhost', user='user', password='pass', db='db', charset='utf8mb4')
# 连接到 Elasticsearch
es = Elasticsearch(['http://localhost:9200/'])
# 查询 MySQL 数据
with connection.cursor() as cursor:
cursor.execute("SELECT id, value FROM db_table")
rows = cursor.fetchall()
# 将数据批量写入 Elasticsearc
评论已关闭