2024-09-05



import redis.clients.jedis.Jedis;
 
public class RateLimiter {
    private final Jedis jedis;
    private final String keyPrefix;
    private final int limit;
    private final int seconds;
 
    public RateLimiter(Jedis jedis, String keyPrefix, int limit, int seconds) {
        this.jedis = jedis;
        this.keyPrefix = keyPrefix;
        this.limit = limit;
        this.seconds = seconds;
    }
 
    public boolean isAllowed(String userId) {
        String key = keyPrefix + ":" + userId;
        long now = System.currentTimeMillis() / 1000;
        long count = jedis.incr(key);
        if (count == 1) {
            jedis.expire(key, seconds); // 如果是第一次访问,设置键的过期时间
        }
        // 如果当前次数超过限制,则不允许访问,并删除该键以便下次重新计数
        if (count > limit) {
            jedis.del(key);
            return false;
        }
        // 如果没有超过限制,则允许访问
        return true;
    }
}

这段代码定义了一个名为RateLimiter的类,它使用Redis的INCR命令来跟踪给定用户ID的请求次数,并在达到限制时删除键。它展示了如何使用Redis来实现简单的限流工具,并且是学习Redis限流机制的一个很好的起点。

2024-09-05

Redis提供了两种持久化方式:RDB(Redis DataBase)和AOF(Append Only File)。

RDB:定时将内存中的数据快照保存到磁盘的一个压缩二进制文件中。

AOF:每个写命令都通过append操作保存到文件中。

RDB 持久化

配置




save 900 1        # 900秒内至少1个键被修改则触发保存
save 300 10       # 300秒内至少10个键被修改则触发保存
save 60 10000     # 60秒内至少10000个键被修改则触发保存
 
dbfilename dump.rdb            # RDB文件名
dir /path/to/your/redis/dir     # RDB文件存储目录

使用

Redis会根据配置定时执行RDB快照保存操作。

AOF 持久化

配置




appendonly yes           # 开启AOF持久化
appendfilename "appendonly.aof"  # AOF文件名
dir /path/to/your/redis/dir     # AOF文件存储目录
 
# AOF文件的更新频率
appendfsync always       # 每次写入都同步,最慢但最安全
# appendfsync everysec  # 每秒同步一次,折衷
# appendfsync no        # 完全依赖操作系统,最快但不安全

使用

当AOF持久化开启后,Redis会将每个写命令保存到AOF文件中。

其他注意事项

  • 为了性能考虑,可以配置不同的保存策略,并且可以设置不同的同步频率。
  • 可以同时使用RDB和AOF,Redis启动时会优先加载AOF文件来恢复数据。
  • 在大规模或者数据变更频繁的情况下,建议开启AOF,并适当调整同步策略。
  • 对于不太关心数据完整性,或者数据修改不频繁的场景,可以使用RDB。

以上是Redis RDB和AOF的基本概念和配置方法,实际应用中还需根据具体需求进行调整。

2024-09-05

在处理Redis和数据库中的数据一致性问题时,通常推荐的做法是先更新数据库,然后删除或更新Redis缓存。这确保了数据的一致性,并且在缓存和数据库之间维护了最新的数据。

以下是一个简单的Python示例,演示了这种做法:




import redis
import pymysql
 
# 连接Redis
r = redis.StrictRedis(host='localhost', port=6379, db=0)
 
# 连接MySQL
conn = pymysql.connect(host='localhost', user='your_user', password='your_password', db='your_db')
 
def update_database(data):
    # 假设data是要更新的数据
    # 使用pymysql更新MySQL数据库
    with conn.cursor() as cursor:
        cursor.execute("UPDATE your_table SET column = %s WHERE id = %s", (data['value'], data['id']))
        conn.commit()
 
def delete_cache(key):
    # 删除Redis缓存
    r.delete(key)
 
# 假设有一个数据更新操作
data_to_update = {'id': 1, 'value': 'new_value'}
 
# 更新数据库
update_database(data_to_update)
 
# 删除缓存(这里的key是缓存数据时使用的键)
delete_cache(f'cache_key:{data_to_update["id"]}')

在这个例子中,我们首先通过update_database函数更新了数据库中的数据,然后调用delete_cache函数删除了与该数据相关的Redis缓存。这样,下次读取数据时,就会从数据库中读取最新的数据,并将其更新到缓存中。

2024-09-05

处理Redis大key的常见方法包括:

  1. 使用哈希表(Hashes)或有序集合(Sorted Sets)来存储组成大key的小数据单元,而不是存储整个大数据结构。
  2. 对于列表(Lists)、集合(Sets)和有序集合(Sorted Sets),使用SSCANSMEMBERS等命令分批获取元素,而不是一次获取全部。
  3. 对于字符串(Strings),可以使用GETRANGESETRANGE命令分批获取或替换字符串的一部分。
  4. 使用SCAN命令来迭代数据库键空间,避免大key阻塞服务器。

示例代码(Python使用redis-py库):




import redis
 
# 连接Redis
r = redis.Redis(host='localhost', port=6379, db=0)
 
# 大key的键名
large_key = 'large_key'
 
# 对于哈希表
for field, value in r.hscan_iter(large_key, count=100):
    # 处理每个field-value对
    print(field, value)
 
# 对于列表
cursor = '0'
while cursor != 0:
    cursor, items = r.sscan(large_key, cursor=cursor, count=100)
    for item in items:
        # 处理每个item
        print(item)
 
# 分批获取字符串
max_chunk_size = 100
chunk_size = min(max_chunk_size, r.strlen(large_key))
offset = 0
while chunk_size > 0:
    chunk = r.getrange(large_key, offset, offset + chunk_size - 1)
    # 处理字符串块
    print(chunk)
    offset += chunk_size
    chunk_size = min(max_chunk_size, r.strlen(large_key) - offset)

注意:在处理大key时,应该避免在服务器负载较高时执行这些操作,以免影响Redis性能。

2024-09-05

Redis可以通过发布/订阅模式实现消息队列,同时可以通过KEYS命令和notify-keyspace-events配置来监控key的变化。

发布订阅示例:




import redis
 
# 连接Redis
r = redis.Redis(host='localhost', port=6379, db=0)
 
# 发布消息
channel = 'my_channel'
message = 'Hello, World!'
r.publish(channel, message)
 
# 订阅消息
def callback(message):
    print(f"Received: {message['data']}")
 
# 创建一个新的订阅对象
pubsub = r.pubsub()
pubsub.subscribe(**{channel: callback})
pubsub.run_in_thread()

监控key示例:

首先,确保Redis配置了合适的notify-keyspace-events。例如,如果你想监控所有类型的key事件,可以在Redis配置文件中设置:




notify-keyspace-events KEA

然后,你可以使用psubscribe方法来监听所有键空间通知:




import redis
 
# 连接Redis
r = redis.Redis(host='localhost', port=6379, db=0)
 
# 创建一个新的订阅对象
pubsub = r.pubsub()
pubsub.psubscribe('__keyspace@0__:*')
 
# 处理接收到的通知
for message in pubsub.listen():
    if message['type'] == 'pmessage':
        print(f"Key event: {message['data']}")

在这个例子中,__keyspace@0__:*表示监听数据库0中所有key的事件。当key被修改、删除时,会打印出相应的事件。

2024-09-05

Redis 提供了 RDB 和 AOF 两种持久化机制,可以配置不同的持久化策略以满足不同的需求。混合模式是指同时使用 RDB 和 AOF 两种持久化方式,这样可以结合两者的优势,实现快速恢复和数据完整性的保障。

配置参数




# 开启 RDB 持久化
save 900 1      # 900 秒内至少 1 个键被修改则触发 BGSAVE
save 300 10     # 300 秒内至少 10 个键被修改则触发 BGSAVE
save 60 10000   # 60 秒内至少 10000 个键被修改则触发 BGSAVE

# 开启 AOF 持久化
appendonly yes  # 开启 AOF 持久化
appendfsync everysec  # 每秒同步到 AOF 文件

具体原理

RDB:定时将内存中的数据快照保存到磁盘的一个压缩二进制文件中。

AOF:记录每一个写操作,通过append模式追加到文件末尾。

触发方式

RDB:根据配置定时执行BGSAVE命令。

AOF:根据配置策略将写操作追加到文件末尾。

优点与缺点

RDB:

  • 优点:恢复快,适合备份,不会影响正常使用。
  • 缺点:可能会丢失最后一次持久化之后的数据。

AOF:

  • 优点:数据完整性高,可靠性好。
  • 缺点:恢复慢,文件大,对性能有影响。

混合模式:

  • 结合两者优点,快速恢复和数据完整性保障。
2024-09-05

Redis实现延迟任务队列可以通过以下两种方式实现:

  1. 使用Sorted Set(有序集合)

Redis的Sorted Set是一种可以根据score进行排序的集合,我们可以把要执行的任务的ID放入Sorted Set中,并设定执行时间作为score。然后用一个循环来不断地轮询这个Sorted Set,一旦发现有任务到期,就执行之。




# 添加任务
def add_job(job_id, execute_time):
    redis_conn.zadd('jobs', {job_id: execute_time})
 
# 执行任务
def process_jobs():
    while True:
        # 获取当前时间
        now = time.time()
        # 获取所有score小于或等于当前时间的任务
        jobs = redis_conn.zrangebyscore('jobs', 0, now)
        for job in jobs:
            try:
                # 执行任务
                do_job(job)
                # 从集合中移除已经执行的任务
                redis_conn.zrem('jobs', job)
            except Exception as e:
                logger.exception(e)
        # 休眠一段时间后继续执行循环
        time.sleep(1)
  1. 使用Redis的"发布/订阅"模式

我们可以用Redis的"发布/订阅"模式来实现延迟任务队列。首先,我们定期检查是否有新的任务到期,如果有,我们就发布一个消息。然后我们需要一个订阅者在等待这个消息,一旦接收到这个消息,它就会执行任务。




import redis
import time
import threading
 
redis_conn = redis.StrictRedis()
 
def add_job(job_id, execute_time):
    # 发布任务
    redis_conn.set(job_id, execute_time)
    redis_conn.publish('new_job', job_id)
 
def process_jobs():
    pubsub = redis_conn.pubsub()
    pubsub.subscribe('new_job')
    for message in pubsub.listen():
        if message['type'] == 'message':
            job_id = message['data']
            try:
                # 执行任务
                do_job(job_id)
            except Exception as e:
                logger.exception(e)
 
# 启动订阅者
threading.Thread(target=process_jobs).start()
 
# 添加任务
add_job('job1', time.time() + 10)

以上两种方式都可以实现Redis延迟任务队列,但是需要注意的是,这两种方式都需要一直运行来处理任务,所以通常会和一些守护进程服务结合起来使用,如supervisor等。

2024-09-05



import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.SessionCallback;
import org.springframework.stereotype.Service;
 
import java.util.concurrent.TimeUnit;
 
@Service
public class RedisHyperLogLogService {
 
    @Autowired
    private RedisTemplate<String, Object> redisTemplate;
 
    public void addValueToLog(String key, String value) {
        redisTemplate.opsForHyperLogLog().add(key, value);
    }
 
    public long sizeOfLog(String key) {
        return redisTemplate.opsForHyperLogLog().size(key);
    }
 
    public void mergeLogs(String unionKey, String... sourceKeys) {
        redisTemplate.opsForHyperLogLog().union(unionKey, sourceKeys);
    }
 
    public void expireKey(final String key, long timeout, TimeUnit unit) {
        redisTemplate.execute((SessionCallback<Object>) connection -> {
            connection.expire(key.getBytes(), unit.toSeconds(timeout));
            return null;
        });
    }
}

这段代码提供了一个简单的服务类,用于操作Redis的HyperLogLog数据类型。它展示了如何添加元素到日志、计算日志大小、合并日志以及设置键的过期时间。这里使用了Spring Data Redis的RedisTemplate来执行这些操作,并且展示了如何在Spring Boot应用中注入和使用这个模板。

2024-09-05



import redis.clients.jedis.Jedis;
 
public class RedisFuzzySearch {
    private Jedis jedis;
 
    public RedisFuzzySearch() {
        this.jedis = new Jedis("localhost", 6379);
    }
 
    public List<String> fuzzySearch(String keyPattern, int pageNum, int pageSize) {
        // 使用SCAN命令配合MATCH和COUNT选项进行分页和模糊查询
        String cursor = "0";
        List<String> keys = new ArrayList<>();
        int count = pageSize; // 每页的数量
        String matchPattern = "*" + keyPattern + "*"; // 模糊匹配模式
 
        do {
            ScanParams scanParams = new ScanParams().match(matchPattern).count(count);
            ScanResult<String> scanResult = jedis.scan(cursor, scanParams);
            keys.addAll(scanResult.getResult());
            cursor = scanResult.getCursor();
        } while (!cursor.equals("0") && keys.size() < pageSize * pageNum);
 
        // 截取当页数据
        if (keys.size() > pageSize * (pageNum - 1)) {
            keys = keys.subList(pageSize * (pageNum - 1), Math.min(keys.size(), pageSize * pageNum));
        } else {
            keys = new ArrayList<>(); // 没有更多数据时返回空列表
        }
 
        return keys;
    }
 
    public static void main(String[] args) {
        RedisFuzzySearch searcher = new RedisFuzzySearch();
        List<String> keys = searcher.fuzzySearch("user:123", 1, 10); // 第1页,每页10个
        for (String key : keys) {
            System.out.println(key);
        }
    }
}

这段代码使用了Redis的SCAN命令进行分页和模糊查询,通过keyPattern参数接收查询条件,pageNumpageSize分别表示页码和每页的数量。代码中使用ScanParams设置了MATCHCOUNT选项,MATCH用于设置模糊匹配模式,COUNT用于指定每次扫描的数量。通过循环和cursor的管理,实现了分页效果。在main方法中提供了一个使用示例。

2024-09-05



import redis
 
# 连接到Redis服务器
redis_client = redis.StrictRedis(host='localhost', port=6379, db=0)
 
# 使用List作为消息队列
def enqueue_task(queue_name, task):
    """将任务添加到指定的Redis队列中"""
    redis_client.lpush(queue_name, task)
 
def dequeue_task(queue_name):
    """从指定的Redis队列中移除并返回一个任务"""
    task = redis_client.brpop(queue_name, timeout=5)
    return task[1] if task else None
 
# 使用Pub/Sub进行消息传递
def publish_message(channel, message):
    """发布消息到指定的频道"""
    redis_client.publish(channel, message)
 
def subscribe_to_channel(channel):
    """订阅指定频道的消息"""
    pubsub = redis_client.pubsub()
    pubsub.subscribe(channel)
    for message in pubsub.listen():
        if message['type'] == 'message':
            yield message['data']
 
# 使用Streams作为消息队列和发布订阅的替代
def stream_enqueue_task(stream_name, task_id, task):
    """将任务作为消息添加到Stream中"""
    redis_client.xadd(stream_name, {'task': task}, id=task_id)
 
def stream_dequeue_task(stream_name):
    """从Stream中获取一个任务"""
    tasks = redis_client.xrange(stream_name, count=1)
    return tasks[0].decode('utf-8') if tasks else None
 
def stream_subscribe_to_tasks(stream_name):
    """订阅Stream中的消息"""
    for message in redis_client.xread({stream_name: 0}, count=1, block=5):
        yield message[1][0][1].decode('utf-8')

这段代码展示了如何使用Redis的List、Pub/Sub以及Streams数据类型来实现消息队列和消息传递的功能。代码中的enqueue_taskdequeue_task函数分别用于将任务推入和拉出List。publish_messagesubscribe_to_channel函数用于发布和订阅消息。对于Streams,stream_enqueue_taskstream_dequeue_task用于消息队列操作,而stream_subscribe_to_tasks用于消息订阅。这些函数可以帮助开发者理解如何在实际应用中使用Redis的不同特性。