-
Notifications
You must be signed in to change notification settings - Fork 18
Expand file tree
/
Copy pathBatchDelayQueue.py
More file actions
57 lines (41 loc) · 1.19 KB
/
BatchDelayQueue.py
File metadata and controls
57 lines (41 loc) · 1.19 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
"""
延迟队列
"""
import threading
import time
class BatchQueue:
def __init__(self):
self.buf = []
def add_one(self, obj):
self.buf.append(obj)
def get_all(self):
_buf = self.buf.copy()
self.buf = []
return _buf
class DelayQueue:
def __init__(self, interval, process_func):
self.queue = BatchQueue()
self.interval = interval
self.process_func = process_func
# 启动定时器
threading.Timer(self.interval, self.process).start()
def add(self, obj):
self.queue.add_one(obj)
def process(self):
buf = self.queue.get_all()
if len(buf) > 0:
self.process_func(buf)
# 定时器需要再激活才可使用
threading.Timer(self.interval, self.process).start()
if __name__ == '__main__':
def process(buf):
print(buf)
def thread_fun(dq):
for i in range(1000):
dq.add(i)
time.sleep(0.01)
dq = DelayQueue(5, process)
threading.Thread(target=thread_fun, args=(dq,)).start()
threading.Thread(target=thread_fun, args=(dq,)).start()
threading.Thread(target=thread_fun, args=(dq,)).start()
input()