@Component
public class RedisMysqlSyncService {
@Autowired
private RedisTemplate<String, String> redisTemplate;
@Autowired
private JdbcTemplate jdbcTemplate;
@Autowired
private CanalClient canalClient;
// ... 其他代码
// 处理Redis事件
public void processRedisEvent(RedisEvent event) {
String key = event.getKey();
String command = event.getCommand();
String value = event.getValue();
// 根据不同的命令执行不同的操作
switch (command) {
case "set":
jdbcTemplate.update("REPLACE INTO your_table (id, data) VALUES (?, ?)", key, value);
break;
case "del":
jdbcTemplate.update("DELETE FROM your_table WHERE id = ?", key);
break;
// ... 其他命令处理
}
}
// 监听Canal变更事件
public void listenCanalEvent() {
canalClient.connect();
canalClient.subscribe("your_canal_filter_rule");
canalClient.rollback();
while (true) {
Message message = canalClient.getWithoutAck(100); // 获取100条数据
if (message == null) {
// 没有数据,休眠一会儿
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
} else {
// 处理数据
processCanalEvent(message);
canalClient.ack(message.getId()); // 确认消息
}
}
}
// 处理Canal事件
public void processCanalEvent(Message message) {
for (Entry entry : message.getEntries()) {
if (EntryType.ROWDATA == entry.getEntryType()) {
RowChange rowChange = RowChange.parseFrom(entry.getStoreValue());
EventType eventType = rowChange.getEventType();
for (RowData rowData : rowChange.getRowDatasList()) {
if (eventType == EventType.DELETE) {
redisTemplate.delete(rowData.getBeforeColumnsList().get(0).getValue());
} else if (eventType == EventType.INSERT || eventType == EventType.UPDATE) {
redisTemplate.opsForValue().set(rowData.getAfterColumnsList().get(0).getValue(),
rowData.getAfterColumnsList().get(1).getValue());
}
}
评论已关闭