Redis与MySQL数据库数据一致性保持策略‌

Redis与MySQL数据库数据一致性保持策略

在高并发系统中,Redis 常被用作缓存层,MySQL 作为持久化存储。如何保证两者之间数据的一致性,是设计时必须解决的关键问题。本文将从以下几个方面展开讲解,并配以代码示例Mermaid 图解详细说明,帮助读者快速理解并上手实践。


1. 引言

  • 背景

    • Redis:高性能内存缓存,读写速度极快。
    • MySQL:可靠的关系型数据库,负责持久化存储。
  • 挑战

    • 当数据在 Redis(缓存)和 MySQL(数据库)之间存在更新操作时,如果操作顺序或策略不当,就可能导致“脏数据”或“缓存击穿”等问题。
    • 典型场景:应用先修改数据库,再同步/删除缓存;或先删除缓存,再修改数据库;中间一旦出现异常或并发,就会出现一致性问题。
  • 目标

    • 介绍主流的缓存一致性模式:Cache Aside、Write Through、Write Behind、延迟双删等。
    • 用代码示例体现核心思想,并通过 Mermaid 图解展示整体数据流。

2. 数据一致性挑战

2.1 缓存与数据库的常见不一致场景

  1. 先写缓存,后写数据库,写数据库失败

    • 现象:缓存已更新,但数据库写入出错,导致数据库中仍是旧值,一旦缓存失效,读取到旧值。
  2. 先写数据库,后删除缓存,删除失败

    • 现象:缓存仍存旧值,业务读取到脏数据。
  3. 并发更新导致的“脏写”

    • 两个线程同时更新某条数据,线程 A 先删除缓存、更新数据库;线程 B 读取数据库写入缓存,导致 A 的更新被 B 的旧值覆盖。

2.2 常见一致性指标

  • 强一致性:对所有客户端而言,读到的数据与最新写操作保持一致。
  • 最终一致性:允许短暂的不一致,但经过一定时间后,缓存与数据库最终会达到一致。
  • 弱一致性:对并发操作不作保证,不一致窗口可能较长。

在绝大多数业务场景里,我们追求最终一致性,并通过设计将不一致窗口尽可能缩短。


3. 基本缓存策略概述

Redis 与 MySQL 保持一致性,通常依赖以下几种模式:

  1. Cache Aside(旁路缓存,懒加载 + 延迟双删)
  2. Write Through(写缓存同时写数据库)
  3. Write Behind(写缓存后异步落库)
  4. Read Through(先读缓存,缓存未命中则读库并回写缓存)
  5. 分布式锁 + 事务补偿/事务消息
  6. 两阶段提交 / TCC 方案(对于强一致性要求极高的场景)

下面依次展开。


4. Cache Aside 模式

4.1 概述

  • 核心思想

    • 业务先操作数据库,再删除/更新缓存。
    • 读取时:先查 Redis 缓存,若命中则直接返回;若未命中,再从 MySQL 读取,并将结果回写到 Redis。
  • 优点

    • 简单易懂,适用广泛。
    • 读多写少场景下,能极大提升读性能。
  • 缺点

    • 写操作存在短暂的不一致窗口(数据库提交到缓存删除/更新之间)。
    • 需要结合“延迟双删”或“分布式锁”来进一步缩短不一致时间。

4.2 延迟双删防止并发写导致脏数据

当并发写操作发生时,单纯的“先删除缓存,再写数据库”并不能完全消除脏数据。常见的延迟双删策略如下:

  1. 线程 A / B 都准备更新 key=K:

    • 先删除缓存:DEL K
    • 更新数据库
    • 等待一定时间(例如 50ms)
    • 再次删除缓存:DEL K

通过两次删除,尽量避免另一线程在数据库更新完成后把旧值重新写入缓存。

4.3 工作流程图(Mermaid 图解)

flowchart LR
    subgraph 读请求
        A1[应用] -->|get(K)| B1[Redis: GET K]
        B1 -->|命中| C1[返回数据]
        B1 -->|未命中| D1[MySQL: SELECT * FROM table WHERE id=K]
        D1 --> E1[返回结果]
        E1 -->|SET K ...| B1
        E1 --> F1[返回数据]
    end

    subgraph 写请求(延迟双删)
        A2[应用] -->|DEL K| B2[Redis: DEL K]
        B2 -->|执行| C2[MySQL: UPDATE table SET ... WHERE id=K]
        C2 --> D2[等待 ∆t (如 50ms)]
        D2 --> E2[Redis: DEL K]
    end
  • 图示说明图示说明

    上图展示了读请求和写请求的主要流程,其中写请求使用了“延迟双删”策略:先删缓存、更新数据库、最后再删一次缓存。

4.4 代码示例(Java + Jedis + JDBC)

以下示例代码演示如何在 Java 中使用 Jedis 操作 Redis,并使用 JDBC 操作 MySQL,实现 Cache Aside + 延迟双删。

import redis.clients.jedis.Jedis;
import java.sql.*;
import java.time.Duration;

public class CacheAsideExample {
    private static final String REDIS_HOST = "localhost";
    private static final int REDIS_PORT = 6379;
    private static final String JDBC_URL = "jdbc:mysql://localhost:3306/testdb";
    private static final String JDBC_USER = "root";
    private static final String JDBC_PASS = "password";
    private Jedis jedis;
    private Connection conn;

    public CacheAsideExample() throws SQLException {
        jedis = new Jedis(REDIS_HOST, REDIS_PORT);
        conn = DriverManager.getConnection(JDBC_URL, JDBC_USER, JDBC_PASS);
    }

    /**
     * 读取操作:先查缓存,未命中则查库并回写缓存
     */
    public String getUserById(String userId) throws SQLException {
        String cacheKey = "user:" + userId;
        // 1. 先查询 Redis 缓存
        String userJson = jedis.get(cacheKey);
        if (userJson != null) {
            return userJson; // 缓存命中
        }
        // 2. 缓存未命中,查询 MySQL
        String sql = "SELECT id, name, age FROM users WHERE id = ?";
        try (PreparedStatement ps = conn.prepareStatement(sql)) {
            ps.setString(1, userId);
            ResultSet rs = ps.executeQuery();
            if (rs.next()) {
                // 假设将用户信息转换为 JSON 字符串
                userJson = String.format("{\"id\":\"%s\",\"name\":\"%s\",\"age\":%d}",
                        rs.getString("id"), rs.getString("name"), rs.getInt("age"));
                // 3. 回写 Redis,设置合理过期时间
                jedis.setex(cacheKey, (int) Duration.ofMinutes(5).getSeconds(), userJson);
                return userJson;
            } else {
                return null;
            }
        }
    }

    /**
     * 写操作:延迟双删策略
     */
    public void updateUser(String userId, String newName, int newAge) throws SQLException, InterruptedException {
        String cacheKey = "user:" + userId;
        // 1. 删除缓存
        jedis.del(cacheKey);

        // 2. 更新数据库
        String sqlUpdate = "UPDATE users SET name = ?, age = ? WHERE id = ?";
        try (PreparedStatement ps = conn.prepareStatement(sqlUpdate)) {
            ps.setString(1, newName);
            ps.setInt(2, newAge);
            ps.setString(3, userId);
            ps.executeUpdate();
        }

        // 3. 延迟一段时间再次删除缓存,防止脏数据
        Thread.sleep(50); // 延迟 50ms
        jedis.del(cacheKey);
    }

    public void close() {
        jedis.close();
        try { conn.close(); } catch (SQLException ignored) {}
    }

    public static void main(String[] args) throws Exception {
        CacheAsideExample example = new CacheAsideExample();

        // 演示写操作
        example.updateUser("1001", "张三", 30);

        // 演示读操作
        String userData = example.getUserById("1001");
        System.out.println("User Data: " + userData);

        example.close();
    }
}

代码说明

  1. getUserById

    • 先尝试从 Redis 获取 user:1001
    • 如果命中直接返回,如果未命中则查询 MySQL,得到结果后写入 Redis 并设置过期时间(5 分钟)。
  2. updateUser

    • 第一次 jedis.del(cacheKey) 删除缓存,防止旧值被读取。
    • 执行 MySQL 更新。
    • 睡眠 50ms 后,再次 jedis.del(cacheKey) 二次删除,以避免并发写入脏数据。
注意:延迟时长 50ms 并非固定值,根据业务场景可调整,但要确保比典型数据库写入并发场景稍长,足以避免同一时刻另一个线程将“旧值”写入缓存。

5. Write Through 模式

5.1 概述

  • 核心思想

    • 应用对数据的 写操作先写入 Redis 缓存,然后再写入 MySQL。
    • 同时也可将写操作封装在一个统一接口中,保证读写一致性。
  • 优点

    • 读写均在缓存层完成,读速度非常快。
    • 保证了缓存与数据库数据几乎同时更新,若写数据库失败(回滚),需要同步将缓存回滚或删除。
  • 缺点

    • 写操作的吞吐量受 Redis & MySQL 并发写性能影响,通常写延迟较高。
    • 写失败时,需要考虑保证缓存与数据库回滚一致,否则会出现脏数据。

5.2 工作流程图(Mermaid 图解)

flowchart LR
    A[应用] -->|SET K->V| B[Redis: SET K V]
    B -->|OK| C[MySQL: INSERT/UPDATE table SET ...]
    C -->|失败?| D{失败?}
    D -- 是 --> E[Redis: DEL K 或 回滚操作]
    D -- 否 --> F[写操作结束,返回成功]

5.3 代码示例(Java + Jedis + JDBC)

public class WriteThroughExample {
    private Jedis jedis;
    private Connection conn;

    public WriteThroughExample() throws SQLException {
        jedis = new Jedis("localhost", 6379);
        conn = DriverManager.getConnection(
                "jdbc:mysql://localhost:3306/testdb", "root", "password");
    }

    /**
     * 写操作:先写 Redis,再写 MySQL。
     */
    public void saveUser(String userId, String name, int age) {
        String cacheKey = "user:" + userId;
        String userJson = String.format("{\"id\":\"%s\",\"name\":\"%s\",\"age\":%d}", userId, name, age);

        // 1. 写 Redis 缓存
        jedis.setex(cacheKey, 300, userJson); // 5 分钟过期

        // 2. 写 MySQL
        String sql = "REPLACE INTO users(id, name, age) VALUES(?, ?, ?)";
        try (PreparedStatement ps = conn.prepareStatement(sql)) {
            ps.setString(1, userId);
            ps.setString(2, name);
            ps.setInt(3, age);
            ps.executeUpdate();
        } catch (SQLException e) {
            // 3. 如果写数据库失败,则删除缓存,避免脏数据
            jedis.del(cacheKey);
            throw new RuntimeException("保存用户失败,已删除缓存", e);
        }
    }

    public void close() {
        jedis.close();
        try { conn.close(); } catch (SQLException ignored) {}
    }

    public static void main(String[] args) throws Exception {
        WriteThroughExample example = new WriteThroughExample();
        example.saveUser("1002", "李四", 28);
        example.close();
    }
}

代码说明

  1. 先写 Redis:确保缓存层保存了最新数据,后续读操作会从缓存命中。
  2. 再写 MySQL:若插入/更新 MySQL 成功,流程结束;若失败则删除缓存,避免数据不一致。

注意事项

  • 事务原子性:若存在复杂逻辑,需要确保 Redis 和 MySQL 的写操作要么同时成功,要么同时失败。
  • 在高并发场景下,Write Through 会降低写性能,因为必须等待两端都写完才能返回。

6. Write Behind 模式

6.1 概述

  • 核心思想

    • 应用只写入 Redis 缓存,不立即写数据库。
    • Cache Layer 维护一个异步队列/队列缓存,将写请求累积并在后台定期或触发条件时批量刷入 MySQL。
  • 优点

    • 写操作速度非常快,仅操作 Redis。
    • 利用批量写库,提升数据库写入吞吐量。
  • 缺点

    • 如果异步刷库任务出现故障或服务宕机,将导致数据丢失。
    • 数据最终一致性延迟较高,不适合对实时性要求高的场景。

6.2 工作流程图(Mermaid 图解)

flowchart LR
    A[应用] -->|SET K->V| B[Redis: SET K V 并将 K 加入待刷库队列]
    B --> C[返回成功]
    subgraph 刷库线程
        D[检查待刷库队列] -->|批量取出若干条| E[MySQL: BATCH UPDATE]
        E -->|刷入成功?| F{成功?}
        F -- 是 --> G[从队列移除相应 Key]
        F -- 否 --> H[日志/重试机制]
    end

6.3 代码示例(Java + Jedis)

以下示例演示一种简化版的 Write Behind:

  • 使用 Redis 列表(List)维护待刷库的 Key 列表。
  • 后台线程每隔固定时间(如 1s)批量从队列读取,一次性执行 MySQL 更新。
import redis.clients.jedis.Jedis;
import redis.clients.jedis.Pipeline;

import java.sql.*;
import java.util.List;
import java.util.Timer;
import java.util.TimerTask;

public class WriteBehindExample {
    private static final String REDIS_HOST = "localhost";
    private static final int REDIS_PORT = 6379;
    private Jedis jedis;
    private Connection conn;
    private static final String QUEUE_KEY = "cache_to_db_queue";

    public WriteBehindExample() throws SQLException {
        jedis = new Jedis(REDIS_HOST, REDIS_PORT);
        conn = DriverManager.getConnection(
                "jdbc:mysql://localhost:3306/testdb", "root", "password");
        // 启动后台刷库定时任务
        startFlushTimer();
    }

    /**
     * 写操作:写 Redis 缓存,并将 Key 放入队列
     */
    public void saveUserAsync(String userId, String name, int age) {
        String cacheKey = "user:" + userId;
        String userJson = String.format("{\"id\":\"%s\",\"name\":\"%s\",\"age\":%d}", userId, name, age);

        // 1. 写 Redis,并将待刷库的 Key 放入 List 列表
        Pipeline p = jedis.pipelined();
        p.setex(cacheKey, 300, userJson); // 5 分钟过期
        p.lpush(QUEUE_KEY, cacheKey);
        p.sync();
    }

    /**
     * 后台定时任务:批量刷库
     */
    private void startFlushTimer() {
        Timer timer = new Timer(true);
        timer.schedule(new TimerTask() {
            @Override
            public void run() {
                flushCacheToDb();
            }
        }, 1000, 1000); // 延迟 1s 启动,每 1s 执行一次
    }

    /**
     * 从 Redis 列表中批量取出待刷库 Key,查询对应缓存值并写入 MySQL
     */
    private void flushCacheToDb() {
        try {
            // 一次性取出最多 100 条待刷库 key
            List<String> keys = jedis.lrange(QUEUE_KEY, 0, 99);
            if (keys == null || keys.isEmpty()) {
                return;
            }

            // 开启事务
            conn.setAutoCommit(false);
            String sql = "REPLACE INTO users(id, name, age) VALUES(?, ?, ?)";
            try (PreparedStatement ps = conn.prepareStatement(sql)) {
                for (String cacheKey : keys) {
                    String userJson = jedis.get(cacheKey);
                    if (userJson == null) {
                        // 缓存可能已过期或被删除,跳过
                        jedis.lrem(QUEUE_KEY, 0, cacheKey);
                        continue;
                    }
                    // 简单解析 JSON(生产环境请使用更健壮的 JSON 序列化库)
                    // 假设格式为 {"id":"1003","name":"王五","age":25}
                    String[] parts = userJson.replaceAll("[{}\"]", "")
                            .split(",");
                    String id = parts[0].split(":")[1];
                    String name = parts[1].split(":")[1];
                    int age = Integer.parseInt(parts[2].split(":")[1]);

                    ps.setString(1, id);
                    ps.setString(2, name);
                    ps.setInt(3, age);
                    ps.addBatch();
                }
                ps.executeBatch();
                conn.commit();
                // 批量删除已刷库 key
                jedis.ltrim(QUEUE_KEY, keys.size(), -1);
            } catch (SQLException e) {
                conn.rollback();
                // 日志记录,生产环境可加入重试机制
                System.err.println("刷库失败,稍后重试:" + e.getMessage());
            } finally {
                conn.setAutoCommit(true);
            }
        } catch (Exception ex) {
            // 捕获 Redis 或其他异常,保证定时任务不中断
            System.err.println("flushCacheToDb 异常:" + ex.getMessage());
        }
    }

    public void close() {
        jedis.close();
        try { conn.close(); } catch (SQLException ignored) {}
    }

    public static void main(String[] args) throws Exception {
        WriteBehindExample example = new WriteBehindExample();
        // 演示写操作
        example.saveUserAsync("1003", "王五", 25);
        // 程序可继续处理其他逻辑,后台线程负责刷库
    }
}

代码说明

  1. saveUserAsync

    • 仅写入 Redis,并把 user:1003 压入 cache_to_db_queue 列表,表示待落库。
  2. flushCacheToDb

    • 定时任务每秒执行一次,从列表中批量获取待落库的 Key,比如最多 100 条。
    • 对每个 Key,从 Redis 中读取缓存值(JSON 字符串),将解析后的字段写入 MySQL。
    • 成功后调用 ltrim 将已处理的队列数据清除。
    • 若写库失败则回滚,并记录日志,下一次任务会重新读取队列继续写入。

风险提示

  • 如果应用或后台线程进程意外挂掉,Redis 列表中的数据可能长时间无法落库,导致缓存与数据库不一致。
  • 建议在生产环境结合消息队列(如 Kafka、RabbitMQ)或 Redis Stream,以保证刷库任务的高可靠性。

7. 分布式锁与事务补偿

7.1 分布式锁

当并发写同一条数据时,可通过 Redis 分布式锁(如 Redisson、Jedis 的 SETNX)为写操作上锁,保证同一时刻只有一台应用实例执行更新,从而避免脏写。例如:

// 简化示例,建议使用 Redisson 等成熟的分布式锁库
public void updateUserWithLock(String userId, String newName, int newAge) throws InterruptedException {
    String lockKey = "lock:user:" + userId;
    String requestId = UUID.randomUUID().toString();
    // 尝试获取锁
    boolean locked = jedis.set(lockKey, requestId, "NX", "PX", 5000) != null;
    if (!locked) {
        throw new RuntimeException("获取锁失败,请稍后重试");
    }
    try {
        // 1. 删除缓存
        jedis.del("user:" + userId);
        // 2. 更新数据库
        // ...
        // 3. 延迟双删或直接更新缓存
        Thread.sleep(50);
        jedis.del("user:" + userId);
    } finally {
        // 释放锁,必须确保 requestId 一致才删除
        String val = jedis.get(lockKey);
        if (requestId.equals(val)) {
            jedis.del(lockKey);
        }
    }
}

7.2 事务补偿 / 消息队列

对于写入数据库失败后,缓存已被更新/删除但数据库未提交的场景,还可以结合本地事务消息二阶段提交进行补偿。典型思路:

  1. 写本地事务消息表

    • 将待执行的缓存操作与数据库操作放在同一个本地事务里。
    • 如果数据库提交成功,则消息表写入成功;若提交失败,则本地事务回滚,缓存也不更新。
  2. 异步投递/确认

    • 后台异步线程扫描消息表,将消息投递到消息队列(如 Kafka)。
    • 消费端收到消息后执行缓存更新与数据库最终落库或补偿逻辑。

该方案较为复杂,适用于对强一致性要求极高的场景。


8. 其他一致性模式简介

8.1 Read Through

  • 描述:应用直接对缓存层发起读请求,若缓存未命中,缓存层自身会从数据库加载并回写缓存。
  • 特点:用起来更像“透明缓存”,业务不需要显式编写“先查缓存、未命中查库、回写缓存”的逻辑。但需要使用支持 Read Through 功能的缓存客户端或中间件(如某些商业缓存解决方案)。

8.2 两阶段提交(2PC)/ TCC

  • 2PC(两阶段提交)

    • 要求分布式事务协调者(Coordinator)协调缓存更新与数据库更新两个阶段。
    • 阶段 1(Prepare):通知各参与者预备提交;如果所有参与者都准备就绪,则进入阶段 2。
    • 阶段 2(Commit/Rollback):通知各参与者正式提交或回滚。
  • TCC(Try-Confirm-Cancel)

    • Try:各参与者尝试预占资源(如锁定缓存、预写日志等)。
    • Confirm:各参与者确认实际提交。
    • Cancel:各参与者进行回滚。
  • 优缺点

    • 优点:能保证严格的强一致性
    • 缺点:性能开销大,编程复杂度高,且存在锁等待、阻塞等问题,不适用于极高吞吐场景。

9. 总结与最佳实践

  1. 优先采用 Cache Aside(延迟双删 + 分布式锁)模式

    • 简单、易实现,对于大部分读多写少场景能满足一致性要求。
    • 延迟双删能够在高并发下显著减少脏数据出现概率。
    • 分布式锁可以进一步控制并发更新并发写时对缓存的多次操作顺序。
  2. 针对写多场景,可考虑 Write Through 或 Write Behind

    • Write Through:适合对读取延迟要求极高、写性能要求相对一般的场景。
    • Write Behind:适合对写性能要求极高,但可容忍一定最终一致性延迟的场景。注意后台刷库任务的高可靠性及消息持久化。
  3. 严谨场景下可使用分布式事务或 TCC

    • 对一致性要求绝对严格且能够接受额外延迟与复杂度的业务,比如金融系统的流水账务。
    • 尽量减少全链路分布式事务的使用范围,只将关键操作纳入。
  4. 合理设计缓存过期时间与热点数据策略

    • 常见做法是:热点数据设置较长的过期时间,非热点数据使用合理的过期策略以节省内存。
    • 对于热点“雪崩”场景,可结合随机化过期时间、互斥锁重建缓存或提前预热等方式。
  5. 监控与报警

    • 建立缓存命中率监控、数据库写入失败监控、后台刷库积压监控等。
    • 及时发现缓存与数据库不一致的风险,并进行人工或自动补偿。

10. 全文小结

  • Redis 与 MySQL 保持数据一致性,核心在于设计合理的缓存读写策略,将不一致窗口尽量缩短,并根据业务需求权衡性能与一致性。
  • 本文重点介绍了常见的 Cache Aside(延迟双删)、Write Through、Write Behind 模式,并配以 Mermaid 图解,帮助你快速理解整体流程。
  • 代码示例(Java + Jedis + JDBC) 则直观演示各模式下的具体实现细节。
  • 最后,还简要介绍了分布式锁、事务补偿、两阶段提交等进阶方案,供对一致性要求更高的场景参考。
最后修改于:2025年06月09日 11:11

评论已关闭

推荐阅读

DDPG 模型解析,附Pytorch完整代码
2024年11月24日
DQN 模型解析,附Pytorch完整代码
2024年11月24日
AIGC实战——Transformer模型
2024年12月01日
Socket TCP 和 UDP 编程基础(Python)
2024年11月30日
python , tcp , udp
如何使用 ChatGPT 进行学术润色?你需要这些指令
2024年12月01日
AI
最新 Python 调用 OpenAi 详细教程实现问答、图像合成、图像理解、语音合成、语音识别(详细教程)
2024年11月24日
ChatGPT 和 DALL·E 2 配合生成故事绘本
2024年12月01日
omegaconf,一个超强的 Python 库!
2024年11月24日
【视觉AIGC识别】误差特征、人脸伪造检测、其他类型假图检测
2024年12月01日
[超级详细]如何在深度学习训练模型过程中使用 GPU 加速
2024年11月29日
Python 物理引擎pymunk最完整教程
2024年11月27日
MediaPipe 人体姿态与手指关键点检测教程
2024年11月27日
深入了解 Taipy:Python 打造 Web 应用的全面教程
2024年11月26日
基于Transformer的时间序列预测模型
2024年11月25日
Python在金融大数据分析中的AI应用(股价分析、量化交易)实战
2024年11月25日
AIGC Gradio系列学习教程之Components
2024年12月01日
Python3 `asyncio` — 异步 I/O,事件循环和并发工具
2024年11月30日
llama-factory SFT系列教程:大模型在自定义数据集 LoRA 训练与部署
2024年12月01日
Python 多线程和多进程用法
2024年11月24日
Python socket详解,全网最全教程
2024年11月27日