forked from Boris-code/feapder
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathredis_lock.py
More file actions
115 lines (99 loc) · 3.35 KB
/
redis_lock.py
File metadata and controls
115 lines (99 loc) · 3.35 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
# -*- coding: utf-8 -*-
"""
Created on 2019/11/5 5:25 PM
---------
@summary:
---------
@author: Boris
@email: boris_liu@foxmail.com
"""
import threading
import time
from feapder.db.redisdb import RedisDB
from feapder.utils.log import log
class RedisLock:
redis_cli = None
def __init__(self, key, redis_cli=None, wait_timeout=0, lock_timeout=86400):
"""
redis超时锁
:param key: 存储锁的key redis_lock:[key]
:param redis_cli: redis客户端对象
:param wait_timeout: 等待加锁超时时间,为0时则不等待加锁,加锁失败
:param lock_timeout: 锁超时时间 为0时则不会超时,直到锁释放或意外退出,默认超时为1天
用法示例:
with RedisLock(key="test") as _lock:
if _lock.locked:
# 用来判断是否加上了锁
# do somethings
"""
self.redis_conn = redis_cli
self.lock_key = "redis_lock:{}".format(key)
# 锁超时时间
self.lock_timeout = lock_timeout
# 等待加锁时间
self.wait_timeout = wait_timeout
self.locked = False
self.stop_prolong_life = False
@property
def redis_conn(self):
if not self.__class__.redis_cli:
self.__class__.redis_cli = RedisDB().get_redis_obj()
return self.__class__.redis_cli
@redis_conn.setter
def redis_conn(self, cli):
self.__class__.redis_cli = cli
def __enter__(self):
if not self.locked:
self.acquire()
# 延长锁的时间
thread = threading.Thread(target=self.prolong_life)
thread.setDaemon(True)
thread.start()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.stop_prolong_life = True
self.release()
def __repr__(self):
return "<RedisLock: {} >".format(self.lock_key)
def acquire(self):
start = time.time()
while True:
# 尝试加锁
if self.redis_conn.set(self.lock_key, time.time(), nx=True, ex=5):
self.locked = True
break
if self.wait_timeout > 0:
if time.time() - start > self.wait_timeout:
log.info("加锁失败")
break
else:
break
log.debug("等待加锁: {} wait:{}".format(self, time.time() - start))
if self.wait_timeout > 10:
time.sleep(5)
else:
time.sleep(1)
return
def release(self):
if self.locked:
self.redis_conn.delete(self.lock_key)
self.locked = False
return
def prolong_life(self):
"""
延长锁的过期时间
:return:
"""
spend_time = 0
while not self.stop_prolong_life:
expire = self.redis_conn.ttl(self.lock_key)
if expire < 0: # key 不存在
time.sleep(1)
continue
self.redis_conn.expire(self.lock_key, expire + 5) # 延长5秒
time.sleep(expire) # 临过期5秒前,再次延长
spend_time += expire
if self.lock_timeout and spend_time > self.lock_timeout:
log.info("锁超时,释放")
self.redis_conn.delete(self.lock_key)
break