import redis
import time
import uuid
class RedLock:
def __init__(self, connections):
# 连接列表,按顺序排列
self.connections = connections
self.quorum = len(connections) / 2 + 1
def lock(self, resource, ttl):
# 生成唯一的锁标识符
token = str(uuid.uuid4())
# 获取当前时间戳(毫秒)
now = int(time.time() * 1000)
# 锁到期时间戳
valid_until = now + ttl
locks = {}
for conn in self.connections:
# 尝试在每个 Redis 实例上获取锁
identifier = conn.set(resource, token, nx=True, px=ttl)
if identifier:
locks[conn] = (identifier, valid_until)
# 如果已经获得足够数量的锁,则返回 RedLock 实例
if len(locks) >= self.quorum:
return RedLockManager(locks, resource, token, valid_until)
else:
# 如果未能获得足够的锁,则释放已获得的锁并返回 None
self.unlock(locks)
return None
def unlock(self, locks):
# 释放所有已获得的锁
for conn, (identifier, valid_until) in locks.items():
with conn.pipeline() as pipe:
while True:
try:
pipe.watch(resource)
if pipe.get(resource) == identifier:
pipe.multi()
pipe.delete(resource)
pipe.execute()
break
pipe.unwatch()
break
except redis.exceptions.WatchError:
pass
class RedLockManager:
def __init__(self, locks, resource, token, valid_until):
self.locks = locks
self.resource = resource
self.token = token
self.valid_until = valid_until
def is_valid(self):
# 检查锁是否仍然有效
return int(time.time() * 1000) < self.valid_until
def break_lock(self):
# 强制释放锁,不管它是否过期
self.unlock(self.locks)
def unlock(self, locks):
# 释放所有已获得的锁
RedLock.unlock(locks)
# 使用示例
# 假设有三个 Redis 实例的连接对象
redis_connections = [redis_client1, redis_client2, redis_client3]
# 初始化 RedLock
red_lock = RedLock(redis_connections)
# 尝试获取锁
lock = red_lock.lock("my_resource", 5000)
if lock:
try:
# 执行需要互斥访问的代码
pass
finally:
# 释放锁
lock.unlock(lock.
评论已关闭