Redis与MySQL数据库数据一致性保持策略
Redis与MySQL数据库数据一致性保持策略
在高并发系统中,Redis 常被用作缓存层,MySQL 作为持久化存储。如何保证两者之间数据的一致性,是设计时必须解决的关键问题。本文将从以下几个方面展开讲解,并配以代码示例、Mermaid 图解及详细说明,帮助读者快速理解并上手实践。
1. 引言
背景
- Redis:高性能内存缓存,读写速度极快。
- MySQL:可靠的关系型数据库,负责持久化存储。
挑战
- 当数据在 Redis(缓存)和 MySQL(数据库)之间存在更新操作时,如果操作顺序或策略不当,就可能导致“脏数据”或“缓存击穿”等问题。
- 典型场景:应用先修改数据库,再同步/删除缓存;或先删除缓存,再修改数据库;中间一旦出现异常或并发,就会出现一致性问题。
目标
- 介绍主流的缓存一致性模式:Cache Aside、Write Through、Write Behind、延迟双删等。
- 用代码示例体现核心思想,并通过 Mermaid 图解展示整体数据流。
2. 数据一致性挑战
2.1 缓存与数据库的常见不一致场景
先写缓存,后写数据库,写数据库失败
- 现象:缓存已更新,但数据库写入出错,导致数据库中仍是旧值,一旦缓存失效,读取到旧值。
先写数据库,后删除缓存,删除失败
- 现象:缓存仍存旧值,业务读取到脏数据。
并发更新导致的“脏写”
- 两个线程同时更新某条数据,线程 A 先删除缓存、更新数据库;线程 B 读取数据库写入缓存,导致 A 的更新被 B 的旧值覆盖。
2.2 常见一致性指标
- 强一致性:对所有客户端而言,读到的数据与最新写操作保持一致。
- 最终一致性:允许短暂的不一致,但经过一定时间后,缓存与数据库最终会达到一致。
- 弱一致性:对并发操作不作保证,不一致窗口可能较长。
在绝大多数业务场景里,我们追求最终一致性,并通过设计将不一致窗口尽可能缩短。
3. 基本缓存策略概述
Redis 与 MySQL 保持一致性,通常依赖以下几种模式:
- Cache Aside(旁路缓存,懒加载 + 延迟双删)
- Write Through(写缓存同时写数据库)
- Write Behind(写缓存后异步落库)
- Read Through(先读缓存,缓存未命中则读库并回写缓存)
- 分布式锁 + 事务补偿/事务消息
- 两阶段提交 / TCC 方案(对于强一致性要求极高的场景)
下面依次展开。
4. Cache Aside 模式
4.1 概述
核心思想
- 业务先操作数据库,再删除/更新缓存。
- 读取时:先查 Redis 缓存,若命中则直接返回;若未命中,再从 MySQL 读取,并将结果回写到 Redis。
优点
- 简单易懂,适用广泛。
- 读多写少场景下,能极大提升读性能。
缺点
- 写操作存在短暂的不一致窗口(数据库提交到缓存删除/更新之间)。
- 需要结合“延迟双删”或“分布式锁”来进一步缩短不一致时间。
4.2 延迟双删防止并发写导致脏数据
当并发写操作发生时,单纯的“先删除缓存,再写数据库”并不能完全消除脏数据。常见的延迟双删策略如下:
线程 A / B 都准备更新 key=K:
- 先删除缓存:
DEL K
- 更新数据库
- 等待一定时间(例如 50ms)
- 再次删除缓存:
DEL K
- 先删除缓存:
通过两次删除,尽量避免另一线程在数据库更新完成后把旧值重新写入缓存。
4.3 工作流程图(Mermaid 图解)
flowchart LR
subgraph 读请求
A1[应用] -->|get(K)| B1[Redis: GET K]
B1 -->|命中| C1[返回数据]
B1 -->|未命中| D1[MySQL: SELECT * FROM table WHERE id=K]
D1 --> E1[返回结果]
E1 -->|SET K ...| B1
E1 --> F1[返回数据]
end
subgraph 写请求(延迟双删)
A2[应用] -->|DEL K| B2[Redis: DEL K]
B2 -->|执行| C2[MySQL: UPDATE table SET ... WHERE id=K]
C2 --> D2[等待 ∆t (如 50ms)]
D2 --> E2[Redis: DEL K]
end
4.4 代码示例(Java + Jedis + JDBC)
以下示例代码演示如何在 Java 中使用 Jedis 操作 Redis,并使用 JDBC 操作 MySQL,实现 Cache Aside + 延迟双删。
import redis.clients.jedis.Jedis;
import java.sql.*;
import java.time.Duration;
public class CacheAsideExample {
private static final String REDIS_HOST = "localhost";
private static final int REDIS_PORT = 6379;
private static final String JDBC_URL = "jdbc:mysql://localhost:3306/testdb";
private static final String JDBC_USER = "root";
private static final String JDBC_PASS = "password";
private Jedis jedis;
private Connection conn;
public CacheAsideExample() throws SQLException {
jedis = new Jedis(REDIS_HOST, REDIS_PORT);
conn = DriverManager.getConnection(JDBC_URL, JDBC_USER, JDBC_PASS);
}
/**
* 读取操作:先查缓存,未命中则查库并回写缓存
*/
public String getUserById(String userId) throws SQLException {
String cacheKey = "user:" + userId;
// 1. 先查询 Redis 缓存
String userJson = jedis.get(cacheKey);
if (userJson != null) {
return userJson; // 缓存命中
}
// 2. 缓存未命中,查询 MySQL
String sql = "SELECT id, name, age FROM users WHERE id = ?";
try (PreparedStatement ps = conn.prepareStatement(sql)) {
ps.setString(1, userId);
ResultSet rs = ps.executeQuery();
if (rs.next()) {
// 假设将用户信息转换为 JSON 字符串
userJson = String.format("{\"id\":\"%s\",\"name\":\"%s\",\"age\":%d}",
rs.getString("id"), rs.getString("name"), rs.getInt("age"));
// 3. 回写 Redis,设置合理过期时间
jedis.setex(cacheKey, (int) Duration.ofMinutes(5).getSeconds(), userJson);
return userJson;
} else {
return null;
}
}
}
/**
* 写操作:延迟双删策略
*/
public void updateUser(String userId, String newName, int newAge) throws SQLException, InterruptedException {
String cacheKey = "user:" + userId;
// 1. 删除缓存
jedis.del(cacheKey);
// 2. 更新数据库
String sqlUpdate = "UPDATE users SET name = ?, age = ? WHERE id = ?";
try (PreparedStatement ps = conn.prepareStatement(sqlUpdate)) {
ps.setString(1, newName);
ps.setInt(2, newAge);
ps.setString(3, userId);
ps.executeUpdate();
}
// 3. 延迟一段时间再次删除缓存,防止脏数据
Thread.sleep(50); // 延迟 50ms
jedis.del(cacheKey);
}
public void close() {
jedis.close();
try { conn.close(); } catch (SQLException ignored) {}
}
public static void main(String[] args) throws Exception {
CacheAsideExample example = new CacheAsideExample();
// 演示写操作
example.updateUser("1001", "张三", 30);
// 演示读操作
String userData = example.getUserById("1001");
System.out.println("User Data: " + userData);
example.close();
}
}
代码说明
getUserById
- 先尝试从 Redis 获取
user:1001
。 - 如果命中直接返回,如果未命中则查询 MySQL,得到结果后写入 Redis 并设置过期时间(5 分钟)。
- 先尝试从 Redis 获取
updateUser
- 第一次
jedis.del(cacheKey)
删除缓存,防止旧值被读取。 - 执行 MySQL 更新。
- 睡眠 50ms 后,再次
jedis.del(cacheKey)
二次删除,以避免并发写入脏数据。
- 第一次
注意:延迟时长 50ms
并非固定值,根据业务场景可调整,但要确保比典型数据库写入并发场景稍长,足以避免同一时刻另一个线程将“旧值”写入缓存。
5. Write Through 模式
5.1 概述
核心思想
- 应用对数据的 写操作先写入 Redis 缓存,然后再写入 MySQL。
- 同时也可将写操作封装在一个统一接口中,保证读写一致性。
优点
- 读写均在缓存层完成,读速度非常快。
- 保证了缓存与数据库数据几乎同时更新,若写数据库失败(回滚),需要同步将缓存回滚或删除。
缺点
- 写操作的吞吐量受 Redis & MySQL 并发写性能影响,通常写延迟较高。
- 写失败时,需要考虑保证缓存与数据库回滚一致,否则会出现脏数据。
5.2 工作流程图(Mermaid 图解)
flowchart LR
A[应用] -->|SET K->V| B[Redis: SET K V]
B -->|OK| C[MySQL: INSERT/UPDATE table SET ...]
C -->|失败?| D{失败?}
D -- 是 --> E[Redis: DEL K 或 回滚操作]
D -- 否 --> F[写操作结束,返回成功]
5.3 代码示例(Java + Jedis + JDBC)
public class WriteThroughExample {
private Jedis jedis;
private Connection conn;
public WriteThroughExample() throws SQLException {
jedis = new Jedis("localhost", 6379);
conn = DriverManager.getConnection(
"jdbc:mysql://localhost:3306/testdb", "root", "password");
}
/**
* 写操作:先写 Redis,再写 MySQL。
*/
public void saveUser(String userId, String name, int age) {
String cacheKey = "user:" + userId;
String userJson = String.format("{\"id\":\"%s\",\"name\":\"%s\",\"age\":%d}", userId, name, age);
// 1. 写 Redis 缓存
jedis.setex(cacheKey, 300, userJson); // 5 分钟过期
// 2. 写 MySQL
String sql = "REPLACE INTO users(id, name, age) VALUES(?, ?, ?)";
try (PreparedStatement ps = conn.prepareStatement(sql)) {
ps.setString(1, userId);
ps.setString(2, name);
ps.setInt(3, age);
ps.executeUpdate();
} catch (SQLException e) {
// 3. 如果写数据库失败,则删除缓存,避免脏数据
jedis.del(cacheKey);
throw new RuntimeException("保存用户失败,已删除缓存", e);
}
}
public void close() {
jedis.close();
try { conn.close(); } catch (SQLException ignored) {}
}
public static void main(String[] args) throws Exception {
WriteThroughExample example = new WriteThroughExample();
example.saveUser("1002", "李四", 28);
example.close();
}
}
代码说明
- 先写 Redis:确保缓存层保存了最新数据,后续读操作会从缓存命中。
- 再写 MySQL:若插入/更新 MySQL 成功,流程结束;若失败则删除缓存,避免数据不一致。
注意事项
- 事务原子性:若存在复杂逻辑,需要确保 Redis 和 MySQL 的写操作要么同时成功,要么同时失败。
- 在高并发场景下,Write Through 会降低写性能,因为必须等待两端都写完才能返回。
6. Write Behind 模式
6.1 概述
核心思想
- 应用只写入 Redis 缓存,不立即写数据库。
- Cache Layer 维护一个异步队列/队列缓存,将写请求累积并在后台定期或触发条件时批量刷入 MySQL。
优点
- 写操作速度非常快,仅操作 Redis。
- 利用批量写库,提升数据库写入吞吐量。
缺点
- 如果异步刷库任务出现故障或服务宕机,将导致数据丢失。
- 数据最终一致性延迟较高,不适合对实时性要求高的场景。
6.2 工作流程图(Mermaid 图解)
flowchart LR
A[应用] -->|SET K->V| B[Redis: SET K V 并将 K 加入待刷库队列]
B --> C[返回成功]
subgraph 刷库线程
D[检查待刷库队列] -->|批量取出若干条| E[MySQL: BATCH UPDATE]
E -->|刷入成功?| F{成功?}
F -- 是 --> G[从队列移除相应 Key]
F -- 否 --> H[日志/重试机制]
end
6.3 代码示例(Java + Jedis)
以下示例演示一种简化版的 Write Behind:
- 使用 Redis 列表(List)维护待刷库的 Key 列表。
- 后台线程每隔固定时间(如 1s)批量从队列读取,一次性执行 MySQL 更新。
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;
import java.sql.*;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
public class WriteBehindExample {
private static final String REDIS_HOST = "localhost";
private static final int REDIS_PORT = 6379;
private Jedis jedis;
private Connection conn;
private static final String QUEUE_KEY = "cache_to_db_queue";
public WriteBehindExample() throws SQLException {
jedis = new Jedis(REDIS_HOST, REDIS_PORT);
conn = DriverManager.getConnection(
"jdbc:mysql://localhost:3306/testdb", "root", "password");
// 启动后台刷库定时任务
startFlushTimer();
}
/**
* 写操作:写 Redis 缓存,并将 Key 放入队列
*/
public void saveUserAsync(String userId, String name, int age) {
String cacheKey = "user:" + userId;
String userJson = String.format("{\"id\":\"%s\",\"name\":\"%s\",\"age\":%d}", userId, name, age);
// 1. 写 Redis,并将待刷库的 Key 放入 List 列表
Pipeline p = jedis.pipelined();
p.setex(cacheKey, 300, userJson); // 5 分钟过期
p.lpush(QUEUE_KEY, cacheKey);
p.sync();
}
/**
* 后台定时任务:批量刷库
*/
private void startFlushTimer() {
Timer timer = new Timer(true);
timer.schedule(new TimerTask() {
@Override
public void run() {
flushCacheToDb();
}
}, 1000, 1000); // 延迟 1s 启动,每 1s 执行一次
}
/**
* 从 Redis 列表中批量取出待刷库 Key,查询对应缓存值并写入 MySQL
*/
private void flushCacheToDb() {
try {
// 一次性取出最多 100 条待刷库 key
List<String> keys = jedis.lrange(QUEUE_KEY, 0, 99);
if (keys == null || keys.isEmpty()) {
return;
}
// 开启事务
conn.setAutoCommit(false);
String sql = "REPLACE INTO users(id, name, age) VALUES(?, ?, ?)";
try (PreparedStatement ps = conn.prepareStatement(sql)) {
for (String cacheKey : keys) {
String userJson = jedis.get(cacheKey);
if (userJson == null) {
// 缓存可能已过期或被删除,跳过
jedis.lrem(QUEUE_KEY, 0, cacheKey);
continue;
}
// 简单解析 JSON(生产环境请使用更健壮的 JSON 序列化库)
// 假设格式为 {"id":"1003","name":"王五","age":25}
String[] parts = userJson.replaceAll("[{}\"]", "")
.split(",");
String id = parts[0].split(":")[1];
String name = parts[1].split(":")[1];
int age = Integer.parseInt(parts[2].split(":")[1]);
ps.setString(1, id);
ps.setString(2, name);
ps.setInt(3, age);
ps.addBatch();
}
ps.executeBatch();
conn.commit();
// 批量删除已刷库 key
jedis.ltrim(QUEUE_KEY, keys.size(), -1);
} catch (SQLException e) {
conn.rollback();
// 日志记录,生产环境可加入重试机制
System.err.println("刷库失败,稍后重试:" + e.getMessage());
} finally {
conn.setAutoCommit(true);
}
} catch (Exception ex) {
// 捕获 Redis 或其他异常,保证定时任务不中断
System.err.println("flushCacheToDb 异常:" + ex.getMessage());
}
}
public void close() {
jedis.close();
try { conn.close(); } catch (SQLException ignored) {}
}
public static void main(String[] args) throws Exception {
WriteBehindExample example = new WriteBehindExample();
// 演示写操作
example.saveUserAsync("1003", "王五", 25);
// 程序可继续处理其他逻辑,后台线程负责刷库
}
}
代码说明
saveUserAsync
- 仅写入 Redis,并把
user:1003
压入cache_to_db_queue
列表,表示待落库。
- 仅写入 Redis,并把
flushCacheToDb
- 定时任务每秒执行一次,从列表中批量获取待落库的 Key,比如最多 100 条。
- 对每个 Key,从 Redis 中读取缓存值(JSON 字符串),将解析后的字段写入 MySQL。
- 成功后调用
ltrim
将已处理的队列数据清除。 - 若写库失败则回滚,并记录日志,下一次任务会重新读取队列继续写入。
风险提示
- 如果应用或后台线程进程意外挂掉,Redis 列表中的数据可能长时间无法落库,导致缓存与数据库不一致。
- 建议在生产环境结合消息队列(如 Kafka、RabbitMQ)或 Redis Stream,以保证刷库任务的高可靠性。
7. 分布式锁与事务补偿
7.1 分布式锁
当并发写同一条数据时,可通过 Redis 分布式锁(如 Redisson、Jedis 的 SETNX)为写操作上锁,保证同一时刻只有一台应用实例执行更新,从而避免脏写。例如:
// 简化示例,建议使用 Redisson 等成熟的分布式锁库
public void updateUserWithLock(String userId, String newName, int newAge) throws InterruptedException {
String lockKey = "lock:user:" + userId;
String requestId = UUID.randomUUID().toString();
// 尝试获取锁
boolean locked = jedis.set(lockKey, requestId, "NX", "PX", 5000) != null;
if (!locked) {
throw new RuntimeException("获取锁失败,请稍后重试");
}
try {
// 1. 删除缓存
jedis.del("user:" + userId);
// 2. 更新数据库
// ...
// 3. 延迟双删或直接更新缓存
Thread.sleep(50);
jedis.del("user:" + userId);
} finally {
// 释放锁,必须确保 requestId 一致才删除
String val = jedis.get(lockKey);
if (requestId.equals(val)) {
jedis.del(lockKey);
}
}
}
7.2 事务补偿 / 消息队列
对于写入数据库失败后,缓存已被更新/删除但数据库未提交的场景,还可以结合本地事务消息或二阶段提交进行补偿。典型思路:
写本地事务消息表
- 将待执行的缓存操作与数据库操作放在同一个本地事务里。
- 如果数据库提交成功,则消息表写入成功;若提交失败,则本地事务回滚,缓存也不更新。
异步投递/确认
- 后台异步线程扫描消息表,将消息投递到消息队列(如 Kafka)。
- 消费端收到消息后执行缓存更新与数据库最终落库或补偿逻辑。
该方案较为复杂,适用于对强一致性要求极高的场景。
8. 其他一致性模式简介
8.1 Read Through
- 描述:应用直接对缓存层发起读请求,若缓存未命中,缓存层自身会从数据库加载并回写缓存。
- 特点:用起来更像“透明缓存”,业务不需要显式编写“先查缓存、未命中查库、回写缓存”的逻辑。但需要使用支持 Read Through 功能的缓存客户端或中间件(如某些商业缓存解决方案)。
8.2 两阶段提交(2PC)/ TCC
2PC(两阶段提交)
- 要求分布式事务协调者(Coordinator)协调缓存更新与数据库更新两个阶段。
- 阶段 1(Prepare):通知各参与者预备提交;如果所有参与者都准备就绪,则进入阶段 2。
- 阶段 2(Commit/Rollback):通知各参与者正式提交或回滚。
TCC(Try-Confirm-Cancel)
- Try:各参与者尝试预占资源(如锁定缓存、预写日志等)。
- Confirm:各参与者确认实际提交。
- Cancel:各参与者进行回滚。
优缺点
- 优点:能保证严格的强一致性。
- 缺点:性能开销大,编程复杂度高,且存在锁等待、阻塞等问题,不适用于极高吞吐场景。
9. 总结与最佳实践
优先采用 Cache Aside(延迟双删 + 分布式锁)模式
- 简单、易实现,对于大部分读多写少场景能满足一致性要求。
- 延迟双删能够在高并发下显著减少脏数据出现概率。
- 分布式锁可以进一步控制并发更新并发写时对缓存的多次操作顺序。
针对写多场景,可考虑 Write Through 或 Write Behind
- Write Through:适合对读取延迟要求极高、写性能要求相对一般的场景。
- Write Behind:适合对写性能要求极高,但可容忍一定最终一致性延迟的场景。注意后台刷库任务的高可靠性及消息持久化。
严谨场景下可使用分布式事务或 TCC
- 对一致性要求绝对严格且能够接受额外延迟与复杂度的业务,比如金融系统的流水账务。
- 尽量减少全链路分布式事务的使用范围,只将关键操作纳入。
合理设计缓存过期时间与热点数据策略
- 常见做法是:热点数据设置较长的过期时间,非热点数据使用合理的过期策略以节省内存。
- 对于热点“雪崩”场景,可结合随机化过期时间、互斥锁重建缓存或提前预热等方式。
监控与报警
- 建立缓存命中率监控、数据库写入失败监控、后台刷库积压监控等。
- 及时发现缓存与数据库不一致的风险,并进行人工或自动补偿。
10. 全文小结
- Redis 与 MySQL 保持数据一致性,核心在于设计合理的缓存读写策略,将不一致窗口尽量缩短,并根据业务需求权衡性能与一致性。
- 本文重点介绍了常见的 Cache Aside(延迟双删)、Write Through、Write Behind 模式,并配以 Mermaid 图解,帮助你快速理解整体流程。
- 代码示例(Java + Jedis + JDBC) 则直观演示各模式下的具体实现细节。
- 最后,还简要介绍了分布式锁、事务补偿、两阶段提交等进阶方案,供对一致性要求更高的场景参考。
评论已关闭