# 在OpenResty中配置多级缓存
http {
# ...
upstream tomcat_server {
server 127.0.0.1:8080;
}
server {
listen 80;
# 配置本地缓存
location /local_cache/ {
# 设置本地缓存存储路径
proxy_cache_path /data/nginx/cache levels=1:2 keys_zone=my_cache:10m max_size=10g inactive=60m use_temp_path=off;
# 启用缓存
proxy_cache my_cache;
# 缓存有效期
proxy_cache_valid 200 1d;
# 代理设置
proxy_pass http://tomcat_server;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
# 设置缓存键
proxy_cache_key $uri$is_args$args;
}
# 配置Redis缓存
location /redis_cache/ {
# 设置Redis连接参数
set $redis_key $uri$is_args$args;
redis_pass 127.0.0.1:6379;
# 设置缓存查询失败时的回退处理
default_type text/plain;
error_page 404 = @tomcat;
# 从Redis缓存中获取数据
redis_code 200 "get $redis_key";
# 设置缓存有效期
redis_code 200 "expire $redis_key 1d";
}
# 请求未命中Redis缓存时,代理到Tomcat服务器
location @tomcat {
proxy_pass http://tomcat_server;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
# 将响应数据存储到Redis缓存中
proxy_store on;
proxy_store_access user:rw group:rw all:rw;
proxy_temp_path /data/nginx/temp;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_pass_request_body off;
proxy_pass_request_headers on;
proxy_connect_timeout 10s;
proxy_send_timeout 10s;
proxy_read_timeout 10s;
# 设置数据存储到Redis的键和过期时间
set $redis_key $uri$is_args$args;
set_by_lua_block $redis_expire $msec {
return tonumber(ngx.var.msec) + 86400000;
}
lua_shared_dict msec 1m;
lua_shared_dict redis_commands 1m;
content_by_lua_block {
local msec = ngx.shared.msec
redis-benchmark
是 Redis 官方提供的性能测试工具,用于测试 Redis 在特定条件下的性能。
基本使用方法如下:
redis-benchmark [option] [option value]
常用参数:
-h
指定服务器的 host 默认是 127.0.0.1-p
指定服务器端口 默认是 6379-s
指定服务器 socket-c
并发连接数 默认是 50-n
请求总数 默认是 100000-d
数据大小 默认是 3 bytes--csv
将结果以 CSV 格式输出--latency
测试延迟--timeout
超时时间 默认是 30s--idle
测试空闲数据库连接性能--dbnum
测试的数据库数量 默认是 16--acl
指定 ACL 规则--user
指定用户名--pass
指定密码- \`--ssl 使用 SSL 连接
--lua-script
指定 Lua 脚本路径--set-client-output-buffer-limit
设置客户端输出缓冲区大小- \`--no-loop 不循环执行测试
实例代码:
# 测试本地 Redis 默认设置
redis-benchmark
# 测试指定的 host 和 port
redis-benchmark -h localhost -p 6379
# 测试 10000 个请求
redis-benchmark -n 10000
# 使用 50 个并发连接
redis-benchmark -c 50
# 设置数据大小为 1KB
redis-benchmark -d 1024
# 将结果输出为 CSV 格式
redis-benchmark --csv
# 设置超时时间为 60 秒
redis-benchmark --timeout 60
# 测试 Lua 脚本性能
redis-benchmark --lua-script my_script.lua
以上命令行参数可以根据实际需要进行组合使用,以测试不同条件下的 Redis 性能。
# 安装ZanRedisDB之前,请确保已经安装了Redis和Jemalloc
# 安装Redis和Jemalloc可以使用以下命令
sudo apt-get install redis-server libjemalloc-dev
# 下载ZanRedisDB源码
git clone https://github.com/ZanKr/ZanRedisDB.git
cd ZanRedisDB
# 编译ZanRedisDB
make
# 安装ZanRedisDB
sudo make install
# 配置ZanRedisDB
# 修改配置文件 /path/to/ZanRedisDB/conf/redis.conf
# 比如设置最大内存为2GB
maxmemory 2gb
# 启动ZanRedisDB实例
# 使用自定义的配置文件启动
redis-server /path/to/ZanRedisDB/conf/redis.conf
# 注意:以上命令仅为示例,具体路径和配置需要根据实际环境进行调整。
这个示例展示了如何在类Unix系统中从源码安装ZanRedisDB,并进行基本的配置。在实际部署时,需要根据具体的操作系统和环境进行调整。
在Spring Boot中,可以使用spring-boot-starter-data-redis
库来实现Redis消息的订阅与分布。以下是一个简单的例子,展示如何使用Spring Data Redis实现发布/订阅模式。
首先,在pom.xml
中添加依赖:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
然后,配置Redis消息监听器容器:
@Configuration
public class RedisConfig {
@Autowired
private RedisConnectionFactory redisConnectionFactory;
@Bean
RedisMessageListenerContainer redisMessageListenerContainer() {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(redisConnectionFactory);
return container;
}
}
创建一个消息监听器来处理接收到的消息:
@Component
public class RedisMessageSubscriber implements MessageListener {
@Override
public void onMessage(Message message, byte[] pattern) {
String receivedMessage = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println("Received Message: " + receivedMessage);
// 根据接收到的消息进行业务逻辑处理
}
}
最后,发布消息:
@Service
public class RedisMessagePublisher {
@Autowired
private StringRedisTemplate stringRedisTemplate;
public void publish(String channel, String message) {
stringRedisTemplate.convertAndSend(channel, message);
}
}
在上述代码中,我们定义了一个配置类RedisConfig
来创建RedisMessageListenerContainer
,并设置了连接工厂。我们还实现了一个RedisMessageSubscriber
类来监听消息,并在其中处理接收到的消息。RedisMessagePublisher
服务用于发布消息到指定的频道。
要订阅一个频道,你需要在RedisMessageSubscriber
中设置你感兴趣的频道,并将其注册到RedisMessageListenerContainer
。
@Component
public class RedisMessageSubscriber implements MessageListener {
@Autowired
private RedisMessageListenerContainer redisMessageListenerContainer;
@PostConstruct
public void subscribe()
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.serializer.StringRedisSerializer;
@Configuration
public class RedisConfig {
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory) {
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(redisConnectionFactory);
// 使用Jackson2JsonRedisSerializer来序列化和反序列化redis的value值
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
jackson2JsonRedisSerializer.setObjectMapper(om);
// 配置template的序列化方式
template.setDefaultSerializer(jackson2JsonRedisSerializer);
return template;
}
@Bean
public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory redisConnectionFactory) {
StringRedisTemplate template = new StringRedisTemplate();
template.setConnectionFactory(redisConnectionFactory);
// 设置键的序列化方式
template.setKeySerializer(new StringRedisSerializer());
// 设置值的序列化方式
template.setValueSerializer(new StringRedisSerializer());
return template;
}
}
这段代码定义了两个Bean,一个用于序列化Object的RedisTemplate和一个用于操作字符串的StringRedisTemplate。通过自定义Jackson2JsonRedisSerializer来实现对象的序列化与反序列化,以便于存储复杂类型的数据。同时,它展示了如何设置RedisTemplate的默认序列化方式和键值的特定序列化方式。这是一个在Spring Boot项目中整合Redis的实践案例。
关于Redis分布式锁的误删问题,可以通过设置锁的过期时间和使用唯一的客户端标识来解决。
- 设置锁的过期时间:当获取锁时,设置一个合理的过期时间,以防止服务器宕机或者其他意外情况导致锁未能释放。
- 唯一客户端标识:为每个客户端生成一个唯一的标识,比如UUID,用于标识锁的拥有者。在释放锁时,只有拥有该标识的客户端才能释放锁。
原子性问题可以通过使用Lua脚本来保证。Lua脚本在Redis中是原子性的,可以确保包含在脚本中的多个命令一次性执行,不会被其他操作打断。
以下是一个简单的Lua脚本示例,用于解决原子性问题和误删问题:
-- 获取锁的Lua脚本
local lock_key = KEYS[1]
local lock_value = ARGV[1]
local expire_time = ARGV[2]
-- 尝试获取锁
if (redis.call('exists', lock_key) == 0) then
-- 锁不存在,设置锁并设置过期时间
redis.call('hset', lock_key, lock_value, 1)
redis.call('expire', lock_key, expire_time)
return true
elseif (redis.call('hexists', lock_key, lock_value) == 1) then
-- 已经拥有锁,可以重入
redis.call('hincrby', lock_key, lock_value, 1)
return true
else
-- 其他客户端拥有锁
return false
end
-- 释放锁的Lua脚本
local lock_key = KEYS[1]
local lock_value = ARGV[1]
-- 检查是否是锁的拥有者
local counter = redis.call('hget', lock_key, lock_value)
if (counter and counter > 0) then
-- 减少锁的计数
counter = redis.call('hincrby', lock_key, lock_value, -1)
if (counter == 0) then
-- 释放锁
redis.call('del', lock_key)
end
return true
else
-- 非法释放锁
r
Redis 的 Sorted Set(有序集合)是一种数据类型,它不仅存储元素,而且还将每个元素关联到一个浮点数的分数。在 Sorted Set 中,元素是唯一的,但分数可以重复。Sorted Set 中的元素按分数进行排序,分数可以升序或降序排序。
一、命令语法:
ZADD key [NX|XX] [CH] [INCR] score member [score member ...]
:将一个或多个成员元素及其分数值加入到有序集当中。ZCARD key
:获取有序集的成员数。ZCOUNT key min max
:计算在有序集中分数在 min 和 max 之间的成员的数量。ZINCRBY key increment member
:有序集成员的分数加上增量 increment。ZRANGE key start stop [WITHSCORES]
:通过索引区间返回有序集成员。ZRANGEBYSCORE key min max [WITHSCORES] [LIMIT offset count]
:根据分数返回有序集成员的列表。ZRANK key member
:返回有序集成员的排名。ZREM key member [member ...]
:移除有序集中的一个或多个成员。ZREVRANGE key start stop [WITHSCORES]
:有序集成员按分数从高到低排列。ZREVRANGEBYSCORE key max min [WITHSCORES] [LIMIT offset count]
:有序集成员按分数从高到低排列。ZREVRANK key member
:返回有序集成员的排名,成员按分数从高到低排列。ZSCORE key member
:获取有序集中成员的分数。ZUNIONSTORE destination numkeys key [key ...] [WEIGHTS weight [weight ...]] [SUM|MIN|MAX]
:计算给定的一个或多个有序集的并集,并存储在新的有序集合 key 中。ZINTERSTORE destination numkeys key [key ...] [WEIGHTS weight [weight ...]] [SUM|MIN|MAX]
:计算给定的一个或多个有序集的交集,并存储在新的有序集合 key 中。
二、操作示例:
- 添加成员到 Sorted Set:
ZADD myzset 1 "one"
ZADD myzset 2 "two" 3 "three"
- 获取 Sorted Set 的成员数:
ZCARD myzset
- 获取 Sorted Set 指定分数范围内的成员:
ZRANGEBYSCORE myzset 1 3
- 获取 Sorted Set 的成员分数:
ZSCORE myzset "two"
- 移除 Sorted Set 的成员:
ZREM myzset "one"
三、命令返回值:
ZADD
:返回被成功添加的新成员的数量,不包括那些被更新的、已经存在的成员的分数。ZCARD
:返回有序集的成员数。ZCOUNT
:返回分数在 min 和 max 之间的成员的数量。ZINCRBY
:返回新的成员分数值。ZRANGE
和ZRANGEBYSCORE
:返回指定范围内的成员列表。ZRANK
和
import { Module, Global } from '@nestjs/common';
import { RedisModule } from 'nestjs-redis-module';
@Global() // 全局模块
@Module({
imports: [
RedisModule.register({
host: 'localhost',
port: 6379,
db: 0,
}),
],
exports: [RedisModule],
})
export class RedisConfigModule {}
// 在其他模块中使用
import { Module } from '@nestjs/common';
import { RedisConfigModule } from './redis-config.module';
@Module({
imports: [RedisConfigModule],
// 其他配置...
})
export class AnyModule {}
这段代码展示了如何在NestJS中设置一个全局的Redis配置模块,并在其他模块中导入它。这样做可以确保Redis客户端在整个应用程序中是共享的,并且可以在一个地方管理配置。
import redis
import time
import random
import uuid
# 假设已经有了Redis连接对象redis_conn
redis_conn = redis.StrictRedis(host='localhost', port=6379, db=0)
# 定义加锁和解锁的函数
def acquire_lock(lock_name, acquire_timeout=10, lock_timeout=10):
identifier = str(uuid.uuid4()) # 生成一个唯一的ID
end = time.time() + acquire_timeout
while time.time() < end:
if redis_conn.set(lock_name, identifier, ex=lock_timeout, nx=True):
# 如果成功设置了锁,并设置了过期时间,返回True
return identifier
time.sleep(0.001) # 避免无意的CPU使用
return False
def release_lock(lock_name, identifier):
# 使用Lua脚本来保证释放锁的操作的原子性
lua_script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
result = redis_conn.eval(lua_script, 1, lock_name, identifier)
return result == 1 # 如果成功删除了key,返回True
# 使用互斥锁处理缓存击穿问题
def get_data_with_lock(key, lock_name, acquire_timeout, lock_timeout, fetch_func):
identifier = acquire_lock(lock_name, acquire_timeout, lock_timeout)
if identifier:
try:
data = redis_conn.get(key)
if not data:
data = fetch_func() # 假设这是一个从数据库加载数据的函数
redis_conn.set(key, data)
finally:
release_lock(lock_name, identifier)
return data
else:
# 如果未能获得锁,则从Redis缓存中获取数据
return redis_conn.get(key)
# 示例:假设fetch_data是一个从数据库加载数据的函数
def fetch_data():
return "database data"
# 假设'my_key'是缓存数据的key,'my_lock'是用作互斥锁的key
cached_data = get_data_with_lock('my_key', 'my_lock', 10, 10, fetch_data)
print(cached_data)
这段代码首先定义了获取锁和释放锁的函数,然后定义了一个使用互斥锁处理缓存击穿问题的函数。在这个函数中,如果获取锁失败,则直接从Redis缓存中获取数据。这样既能保证大部分请求能够直接从缓存中获取数据,也能在需要时重新从数据源加载数据。
Redis内存碎片是指Redis在内存分配时由于频繁的动态内存分配造成的内存空间不连续,导致效率降低的情况。
清理内存碎片的方法:
- 手动重启Redis服务:这是最简单的方法,重启Redis会释放掉所有的内存碎片,并且让Redis重新分配内存。
- 使用
MEMORY PURGE
命令:在Redis 4.0及以上版本,可以使用MEMORY PURGE
命令来清理内存碎片。这个命令会尝试合并内存碎片,并且在完成后返回合并的内存大小。
redis-cli MEMORY PURGE
- 调整内存分配器:Redis支持多种内存分配器,如jemalloc, tcmalloc等。可以尝试更换这些分配器来减少内存碎片。
- 配置volatile-lru或allkeys-lru驱动的淘汰:通过配置Redis的淘汰策略为
volatile-lru
或allkeys-lru
,当内存不足时,Redis会根据最近最少使用算法(LRU)淘汰一部分key,减少内存碎片。
请注意,这些方法都有其适用场景,在生产环境中应谨慎使用,并在清理内存碎片之前备份好数据。