2024-08-28



import redis
 
# 连接到 Redis
redis_host = 'localhost'
redis_port = 6379
redis_db = 0
r = redis.StrictRedis(host=redis_host, port=redis_port, db=redis_db)
 
# 使用 Redis 列表实现消息队列
def enqueue_task(task):
    """将任务添加到 Redis 列表中"""
    r.lpush('tasks', task)
 
def process_tasks():
    """处理 Redis 列表中的任务"""
    while True:
        # 从列表右端弹出任务
        task = r.brpop('tasks', timeout=5)
        if task:
            # 假设我们这里有一个处理任务的函数
            handle_task(task[1])  # task[1] 是任务内容
 
def handle_task(task):
    """处理任务的函数"""
    print(f"处理任务: {task}")
    # 这里可以添加处理任务的代码
 
# 生产者:添加任务到消息队列
enqueue_task('task1')
enqueue_task('task2')
 
# 消费者:处理消息队列中的任务
process_tasks()

这个简单的例子展示了如何使用 Redis 的列表数据结构来实现一个消息队列,并且如何使用 Python 代码来处理这些任务。这个例子只是用于演示目的,并且不是生产环境中的代码。在实际应用中,你需要考虑更多的因素,例如错误处理、优先级队列、可靠的消息传递等。

2024-08-28

在实现分布式锁和消息队列时,可以使用Redis和RabbitMQ。以下是两种场景的简化示例代码:

分布式锁的实现:

使用Redis实现分布式锁:




import redis
import uuid
 
def acquire_lock(conn, lock_name):
    identifier = str(uuid.uuid4())
    lock_key = 'lock:' + lock_name
    end = time.time() + 10  # 10秒后自动释放锁
 
    while time.time() < end:
        if conn.setnx(lock_key, identifier):  # 尝试获取锁
            conn.expire(lock_key, 10)  # 设置锁的过期时间
            return identifier
        time.sleep(0.001)
 
    return False
 
def release_lock(conn, lock_name, identifier):
    lock_key = 'lock:' + lock_name
 
    with conn.pipeline() as pipe:
        while True:
            try:
                pipe.watch(lock_key)
                if pipe.get(lock_key) == identifier:
                    pipe.multi()
                    pipe.delete(lock_key)
                    pipe.execute()
                    return True
                pipe.unwatch()
                break
            except redis.exceptions.WatchError:
                pass
    return False

消息队列的实现:

使用RabbitMQ实现消息队列:




import pika
 
def setup_queue(queue_name):
    connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
    channel = connection.channel()
 
    channel.queue_declare(queue=queue_name, durable=True)
    return connection, channel
 
def publish_message(queue_name, message):
    connection, channel = setup_queue(queue_name)
 
    channel.basic_publish(
        exchange='',
        routing_key=queue_name,
        body=message,
        properties=pika.BasicProperties(
            delivery_mode=2,  # 使消息持久化
        ),
    )
    print(" [x] Sent %r" % message)
    connection.close()
 
def consume_message(queue_name, on_message_callback):
    connection, channel = setup_queue(queue_name)
 
    def callback(ch, method, properties, body):
        on_message_callback(body)
 
    channel.basic_consume(
        queue=queue_name, on_message_callback=callback, auto_ack=True
    )
 
    print(" [*] Waiting for messages. To exit press CTRL+C")
    channel.start_consuming()

使用这两段代码,你可以实现分布式锁和消息队列的功能。在实际应用中,你需要根据具体需求调整超时时间、重试策略、持久化设置等参数。

2024-08-28

以下是使用Jedis实现Redis分布式锁的三种方法:

  1. 使用SETNX命令和EXPIRE命令



public boolean lockWithSetNxAndExpire(String lockKey, String requestId, int expireTime) {
    Long result = jedis.setnx(lockKey, requestId);
    if (result == 1) {
        jedis.expire(lockKey, expireTime);
        return true;
    }
    return false;
}
  1. 使用SET命令的EX和NX选项



public boolean lockWithSetExAndNx(String lockKey, String requestId, int expireTime) {
    String result = jedis.set(lockKey, requestId, "NX", "EX", expireTime);
    return "OK".equals(result);
}
  1. 使用Lua脚本



public boolean lockWithLuaScript(String lockKey, String requestId, int expireTime) {
    String script = "if redis.call('setnx', KEYS[1], ARGV[1]) == 1 then " +
                    "redis.call('expire', KEYS[1], ARGV[2]) return 1 else return 0 end";
    Object result = jedis.eval(script, Collections.singletonList(lockKey),
                               Collections.singletonList(requestId),
                               Collections.singletonList(String.valueOf(expireTime)));
    return (Long) result == 1L;
}

以上三种方法均可实现Redis分布式锁,但第三种方法相较于其他两种在解决竞争问题上更加可靠,因为它是一个原子操作,可以防止因为客户端延迟或其他问题导致的锁无限期持有问题。

2024-08-28

Redis 本身不直接支持分布式事务,但是可以通过 Redis 的 Lua 脚本以及分布式锁机制来实现类似事务的行为。

以下是一个使用 Redis 分布式锁和 Lua 脚本实现分布式事务的简单例子:




-- 使用分布式锁来保证事务的原子性
local lock_key = "my_lock"
local lock_value = "unique_value"
local unlock_script = redis.call("GET", lock_key)
if lock_value == unlock_key then
    -- 执行事务操作
    redis.call("DEL", lock_key)
    return true
else
    return false
end

在实际应用中,你需要先尝试获取锁,然后执行 Lua 脚本:




import redis
 
# 连接 Redis
r = redis.Redis(host='localhost', port=6379, db=0)
 
# 尝试获取锁
lock_key = "my_lock"
lock_value = "unique_value"
result = r.set(lock_key, lock_value, ex=5, nx=True)  # 设置锁,有效期5秒,如果锁不存在才设置
 
if result:
    try:
        # 执行事务
        # ...
 
        # 执行 Lua 脚本
        unlock_script = """
            if redis.call("GET", KEYS[1]) == ARGV[1] then
                return redis.call("DEL", KEYS[1])
            else
                return 0
            end
        """
        res = r.eval(unlock_script, 1, lock_key, lock_value)
 
        # 检查解锁是否成功
        if res == 1:
            print("事务和解锁成功")
        else:
            print("解锁失败")
    except Exception as e:
        print("事务执行异常:", e)
        # 如果发生异常,可以考虑不释放锁,避免其他线程获取到锁后产生数据不一致
    finally:
        # 不在 finally 中释放锁,避免业务异常时解锁失败
        pass
else:
    print("无法获取锁")

请注意,这个例子仅用于演示目的,实际应用中需要考虑更多的边界情况,例如锁的自动续期、异常处理、锁的可重入性等。

2024-08-28

点赞业务涉及到数据的存储和读取,对于点赞数量较大的情况,需要考虑使用不同的数据库系统来优化读写性能。

MySQL:适合存储用户点赞和取消点赞的历史记录,以及点赞和取消点赞的频率等统计信息。




CREATE TABLE `user_likes` (
  `user_id` INT NOT NULL,
  `item_id` INT NOT NULL,
  `like_status` ENUM('like', 'unlike') NOT NULL,
  `created_at` DATETIME NOT NULL,
  PRIMARY KEY (`user_id`, `item_id`)
);

Redis:适合存储用户的点赞状态,点赞频率,以及需要实时更新并且访问频繁的点赞信息。




# 设置用户点赞状态
redis.set(f"user:{user_id}:item:{item_id}:like", "1")
 
# 获取用户点赞状态
like_status = redis.get(f"user:{user_id}:item:{item_id}:like")
 
# 点赞数+1
redis.incr(f"item:{item_id}:like_count")
 
# 获取点赞数
like_count = redis.get(f"item:{item_id}:like_count")

MongoDB:适合存储大量点赞历史数据,以及点赞频率等信息,不适合存储点赞状态。




from pymongo import MongoClient
 
# 连接MongoDB
client = MongoClient('mongodb://localhost:27017/')
db = client['mydatabase']
collection = db['user_likes']
 
# 插入点赞记录
collection.insert_one({
  'user_id': user_id,
  'item_id': item_id,
  'like_status': 'like',
  'created_at': datetime.now()
})
 
# 获取用户点赞状态
like_status = collection.find_one({'user_id': user_id, 'item_id': item_id})

在实际应用中,可以根据业务需求和数据访问模式选择合适的数据库,并通过合理的缓存策略结合Redis等内存数据库来提升点赞业务的响应速度。

2024-08-28

在Redis中实现限流通常使用Lua脚本与Redis提供的数据结构和命令结合使用。以下是一个简单的示例,展示了如何使用Lua脚本和Redis的INCR命令来实现一个简单的限流器:




-- Lua脚本实现限流
-- KEYS[1] 是用来存储计数器的键
-- ARGV[1] 是时间窗口的大小(例如,60表示60秒)
-- ARGV[2] 是允许的最大请求数(例如,100表示每秒最多100个请求)
 
-- 将key的值增加1
redis.call('INCR', KEYS[1])
 
-- 如果是第一次执行,初始化为0
if redis.call('EXISTS', KEYS[1]) == 0 then
    redis.call('EXPIRE', KEYS[1], ARGV[1])
end
 
-- 获取当前计数
local count = redis.call('GET', KEYS[1])
 
-- 如果计数超过了限制,返回0,表示被限流
if count and tonumber(count) > tonumber(ARGV[2]) then
    return 0
else
    -- 如果没有超过限制,返回1,表示通过限流
    return 1
end

在实际应用中,你可以通过Redis的EVAL命令来运行这个Lua脚本。例如,如果你想要设置一个限制每秒钟不超过100个请求的限流器,可以这样使用:




redis-cli EVAL "$(cat script.lua)" 1 mykey:limit 60 100

这里mykey:limit是用来存储计数器的键,60是时间窗口,100是允许的最大请求数。根据实际情况调整这些参数。脚本会返回1如果请求被允许,返回0如果请求被限流。

2024-08-28

Redis在项目中有许多使用场景,以下是一些常见的使用场景:

  1. 缓存:Redis提供了键过期功能,可以用来控制缓存的生命周期。



import redis
r = redis.Redis(host='localhost', port=6379, db=0)
r.setex("key", 10, "value")  # 设置键的过期时间为10秒
  1. 会话管理:可以使用Redis来存储用户会话,从而在整个应用程序中进行访问。



import redis
r = redis.Redis(host='localhost', port=6379, db=0)
r.set("session:123", "value")
  1. 队列:Redis提供了列表和发布/订阅功能,可以用作消息队列。



import redis
r = redis.Redis(host='localhost', port=6379, db=0)
r.lpush("queue", "item")  # 入队
item = r.brpop("queue", 1)  # 出队
  1. 排行榜:可以使用Redis的有序集合来实现排行榜。



import redis
r = redis.Redis(host='localhost', port=6379, db=0)
r.zadd("leaderboard", {name: score})
  1. 分布式锁:可以使用Redis的SETNX命令实现分布式锁。



import redis
r = redis.Redis(host='localhost', port=6379, db=0)
if r.setnx("lock:key", "value"):
    # 获取锁成功
    pass
    # 执行操作
    r.delete("lock:key")  # 释放锁
  1. 计数器:可以使用Redis的INCR命令来实现计数器。



import redis
r = redis.Redis(host='localhost', port=6379, db=0)
r.incr("counter:key")
  1. 搜索:使用Redis的集合类型可以创建一个简单的搜索功能。



import redis
r = redis.Redis(host='localhost', port=6379, db=0)
r.sadd("search:key", "member1")
r.sinter("search:key")
  1. 缓存数据库查询结果:可以将数据库查询结果存储在Redis中,以减少数据库的负载。



import redis
r = redis.Redis(host='localhost', port=6379, db=0)
r.set("db:select:key", "value")
  1. 分布式Session存储:可以使用Redis存储Web应用的用户会话。
  2. 分布式Bitarray:可以使用Redis的String或Bitmap类型来实现分布式的布隆过滤器。
  3. 分布式信号量:可以使用Redis的有限长度列表来实现分布式信号量。
  4. 分布式请求限流:可以使用Redis的计数器来实现分布式请求限流。
  5. 分布式队列:可以使用Redis的列表或发布/订阅来实现分布式队列。
  6. 分布式锁:可以使用Redis的SETNX命令来实现分布式锁。
  7. 分布式全局唯一ID:可以使用Redis的INCR命令来生成全局唯一ID。
  8. 分布式排行榜:可以使用Redis的有序集合来实现分布式排行榜。
  9. 时间线等活动数据
2024-08-28



# 更新WSL
wsl --update
 
# 安装Redis
# 1. 下载Redis压缩包
wget http://download.redis.io/releases/redis-6.2.6.tar.gz
 
# 2. 解压压缩包
tar xzf redis-6.2.6.tar.gz
 
# 3. 编译Redis
cd redis-6.2.6
make
 
# 4. 安装Redis
sudo make install
 
# 5. 运行Redis服务器
redis-server
 
# 6. 测试Redis是否正常工作
# 新开一个WSL窗口或者连接到WSL实例
redis-cli ping
 
# 如果返回PONG,则表示Redis已成功安装并运行。

这段代码提供了在Windows WSL环境中安装Redis的简化版本。它首先更新WSL,然后下载并解压Redis压缩包,接着编译和安装Redis,最后启动Redis服务器并测试其是否正常工作。

2024-08-28



import redis
 
# 连接Redis
r = redis.Redis(host='localhost', port=6379, db=0)
 
# 添加元素到有序集合,如果成功返回1,如果已存在且分数不变返回0,如果分数更新返回2
# 这里的分数score可以是整数或浮点数,元素member必须是字符串
result = r.zadd('myzset', {'member1': 1, 'member2': 2})
print(f"添加元素的结果: {result}")  # 输出添加元素的结果
 
# 如果需要更新分数,可以使用`XX`选项
# 这里的`XX`选项确保只有当元素存在时才更新其分数
result_with_xx = r.zadd('myzset', {'member1': 5, 'member2': 10}, xx=True)
print(f"使用'XX'选项更新分数的结果: {result_with_xx}")  # 输出使用'XX'选项更新分数的结果
 
# 查看有序集合的元素
members = r.zrange('myzset', 0, -1, withscores=True)
print(f"有序集合的元素: {members}")  # 输出有序集合的元素

这段代码演示了如何使用zadd方法来添加元素到Redis的有序集合中。它首先连接到Redis,然后添加两个成员到有序集合,并打印出操作结果。接着,使用XX选项来更新已存在成员的分数,并再次打印结果。最后,它检索并打印出有序集合中的所有成员及其分数。

2024-08-28

在Spring Boot中配置RedisTemplateRedisson用于分布式锁的示例代码如下:

  1. 添加依赖到pom.xml



<!-- Redis -->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- Redisson -->
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson-spring-boot-starter</artifactId>
    <version>3.16.2</version> <!-- 请使用最新版本 -->
</dependency>
  1. 配置RedisTemplateRedissonClient



@Configuration
public class RedisConfig {
 
    @Bean
    public RedisTemplate<String, Object> redisTemplate(LettuceConnectionFactory lettuceConnectionFactory) {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setKeySerializer(new StringRedisSerializer());
        template.setValueSerializer(new GenericJackson2JsonRedisSerializer());
        template.setConnectionFactory(lettuceConnectionFactory);
        return template;
    }
 
    @Bean
    public RedissonClient redissonClient(LettuceConnectionFactory lettuceConnectionFactory) {
        // 使用Lettuce客户端连接
        RedissonClient redissonClient = Redisson.create(Config.fromJSON(
                "{\"singleServerConfig\":{\"address\":\"redis://127.0.0.1:6379\",\"clientType\":\"LETTUCE\"}}"
        ));
        return redissonClient;
    }
}
  1. 使用RedissonClient获取分布式锁:



@Service
public class DistributedLockService {
 
    @Autowired
    private RedissonClient redissonClient;
 
    public void lockWithRedisson() {
        // 获取锁对象
        RLock lock = redissonClient.getLock("myLock");
        try {
            // 尝试加锁,最多等待100秒,锁定后最多持有锁10秒
            boolean isLocked = lock.tryLock(100, 10, TimeUnit.SECONDS);
            if (isLocked) {
                // 业务逻辑
                System.out.println("Lock acquired");
            } else {
                System.out.println("Lock not acquired");
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            // 释放锁
            if (lock.isHeldByCurrentThread()) {
                lock.unlock();
            }
        }
    }
}

在这个例子中,我们配置了RedisTemplateRedissonClientRedisTemplate用于非锁相关的操作,而RedissonClient用于获取分布式锁对象。在DistributedLockService服务中,我们使用RedissonClient获取一个锁对象,并尝试加锁。加锁成功后执行业务逻辑,执行完毕后释放锁。