@Configuration
public class CanalConfig {
@Value("${canal.serverIp}")
private String serverIp;
@Value("${canal.port}")
private int port;
@Value("${canal.destination}")
private String destination;
@Bean
public CanalConnector canalConnector() {
CanalConnector connector = null;
try {
// 创建连接
connector = CanalConnectors.newSingleConnector(new InetSocketAddress(serverIp, port), destination, "", "");
// 连接
connector.connect();
// 订阅数据库表,全部表
connector.subscribe(".*\\..*");
// 回滚到未进行的位点
connector.rollback();
} catch (IOException e) {
e.printStackTrace();
}
return connector;
}
@Bean
public ServiceCanalConnector serviceCanalConnector(CanalConnector connector) {
return new ServiceCanalConnector(connector);
}
}
// 使用ServiceCanalConnector处理数据
public class ServiceCanalConnector {
private final CanalConnector connector;
public ServiceCanalConnector(CanalConnector connector) {
this.connector = connector;
}
public void start() {
while (true) {
try {
// 开始事务
connector.rollback();
boolean isRunning = true;
while (isRunning) {
Message message = connector.getWithoutAck(1024); // 获取指定数量的数据
long batchId = message.getId();
if (batchId == -1 || message.getEntries().isEmpty()) {
// 没有数据,继续获取
Thread.sleep(1000);
} else {
// 处理数据
processData(message.getEntries());
// 确认数据
connector.ack(batchId);
// 结束循环
isRunning = false;
}
}
} catch (InterruptedException | IOException e) {
e.printStackTrace();
}
}
}
private void processData(List<Entry> entrys) {
for (Entry entry : entrys) {
if (EntryType.ROWDATA == entry.getEntryType()) {
RowChange rowChange = null;
try {
rowChange = RowChange.par
评论已关闭