学习小记-使用Redis的令牌桶算法实现分布式限流
import redis
import time
import random
# 连接Redis
redis_client = redis.StrictRedis(host='localhost', port=6379, db=0)
# 令牌桶限流的装饰器
def token_bucket_rate_throttle(key, rate):
# 计算时间窗口内允许的最大令牌数和时间窗口大小
tokens_per_second = rate
window_size = 1.0 / tokens_per_second
def middleware(func):
def inner(*args, **kwargs):
# 生成一个唯一的key
unique_key = key.format(**dict(args=args, kwargs=kwargs))
# 获取当前时间和令牌桶的容量
current_time = time.time()
last_request_time, _ = redis_client.hmget(unique_key, 't', 'c')
last_request_time = float(last_request_time) if last_request_time else 0
token_bucket_capacity = max(0, (current_time - last_request_time - window_size))
# 添加或更新请求记录
redis_client.hmset(unique_key, {
't': current_time,
'c': token_bucket_capacity
})
# 随机产生令牌
tokens_to_add = random.uniform(0, 1.0 / tokens_per_second)
current_tokens = min(token_bucket_capacity + tokens_to_add, window_size)
if current_tokens < 1:
return "Too many requests, please try again later"
# 调用原函数
return func(*args, **kwargs)
return inner
return middleware
# 使用装饰器
@token_bucket_rate_throttle('user-{}', rate=2) # 每秒不超过2个请求
def my_function_to_throttle(user_id):
print(f"Function called for user {user_id}")
return f"Success for user {user_id}"
# 测试函数
for i in range(10):
response = my_function_to_throttle(user_id=1)
print(response)
time.sleep(0.5)
这个代码实例使用了装饰器来实现令牌桶算法,并且可以限制特定用户的请求频率。在实际使用中,你可以将my_function_to_throttle
替换为你需要限流的函数,并且通过装饰器的参数来设置允许的最大请求频率。这个例子中,令牌桶的容量是固定的,但在实际应用中,可以根据需要动态调整。
评论已关闭