import redis
import time
import random
# 连接Redis
r = redis.Redis(host='localhost', port=6379, db=0)
# 令牌桶算法实现分布式限流
class RateLimiter(object):
def __init__(self, rate, burst=10):
self.rate = rate
self.burst = burst
self.tokens_key = 'tokens'
self.timestamp_key = 'timestamp'
self.fill_rate = rate / burst
def _get_tokens(self):
timestamp = r.get(self.timestamp_key)
if timestamp is None:
r.set(self.tokens_key, self.burst)
r.set(self.timestamp_key, time.time())
return self.burst
else:
tokens = r.get(self.tokens_key)
if tokens is None:
r.set(self.tokens_key, self.burst)
r.set(self.timestamp_key, time.time())
return self.burst
else:
return int(tokens)
def _reduce_tokens(self, cost):
tokens = self._get_tokens()
if tokens >= cost:
r.decrby(self.tokens_key, cost)
return True
else:
return False
def _fill_token(self):
timestamp = r.get(self.timestamp_key)
if timestamp is not None:
elapsed = time.time() - float(timestamp)
if elapsed > 0:
time_to_wait = self.fill_rate * elapsed
time.sleep(time_to_wait)
r.incrbyfloat(self.tokens_key, self.fill_rate * elapsed)
r.set(self.timestamp_key, time.time())
def allowed(self, cost=1):
self._fill_token()
return self._reduce_tokens(cost)
# 使用示例
limiter = RateLimiter(rate=5, burst=10) # 每秒5个请求,初始令牌桶容量10
# 模拟请求
for i in range(20):
if limiter.allowed():
print(f"Request {i} is allowed!")
time.sleep(random.uniform(0, 1)) # 模拟请求处理时间
else:
print(f"Request {i} is denied!")
这段代码实现了基于Redis的令牌桶算法分布式限流器。它首先连接到Redis,然后定义了一个RateLimiter
类,用于初始化限流器并实现相关的方法。allowed
方法检查是否有足够的令牌来处理请求,如果有,则处理请求并减少令牌数量;如果没有,则拒绝请求。代码还包括了令牌填充的逻辑,确保在超出 burst 限制后能够按照固定的速率进行令牌填充。最后,提供了使用限流器的模拟请求示例。