forked from Boris-code/feapder
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrequest_buffer.py
More file actions
151 lines (119 loc) · 4.59 KB
/
request_buffer.py
File metadata and controls
151 lines (119 loc) · 4.59 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
# -*- coding: utf-8 -*-
"""
Created on 2018-06-19 17:17
---------
@summary: request 管理器, 负责缓冲添加到数据库中的request
---------
@author: Boris
@email: boris_liu@foxmail.com
"""
import collections
import threading
import feapder.setting as setting
import feapder.utils.tools as tools
from feapder.db.redisdb import RedisDB
from feapder.dedup import Dedup
from feapder.utils.log import log
MAX_URL_COUNT = 1000 # 缓存中最大request数
class RequestBuffer(threading.Thread):
dedup = None
def __init__(self, redis_key):
if not hasattr(self, "_requests_deque"):
super(RequestBuffer, self).__init__()
self._thread_stop = False
self._is_adding_to_db = False
self._requests_deque = collections.deque()
self._del_requests_deque = collections.deque()
self._db = RedisDB()
self._table_request = setting.TAB_REQUSETS.format(redis_key=redis_key)
self._table_failed_request = setting.TAB_FAILED_REQUSETS.format(
redis_key=redis_key
)
if not self.__class__.dedup and setting.REQUEST_FILTER_ENABLE:
self.__class__.dedup = Dedup(
name=redis_key, to_md5=False, **setting.REQUEST_FILTER_SETTING
) # 默认过期时间为一个月
def run(self):
self._thread_stop = False
while not self._thread_stop:
try:
self.__add_request_to_db()
except Exception as e:
log.exception(e)
tools.delay_time(1)
def stop(self):
self._thread_stop = True
self._started.clear()
def put_request(self, request):
self._requests_deque.append(request)
if self.get_requests_count() > MAX_URL_COUNT: # 超过最大缓存,主动调用
self.flush()
def put_del_request(self, request):
self._del_requests_deque.append(request)
def put_failed_request(self, request, table=None):
try:
request_dict = request.to_dict
self._db.zadd(
table or self._table_failed_request, request_dict, request.priority
)
except Exception as e:
log.exception(e)
def flush(self):
try:
self.__add_request_to_db()
except Exception as e:
log.exception(e)
def get_requests_count(self):
return len(self._requests_deque)
def is_adding_to_db(self):
return self._is_adding_to_db
def __add_request_to_db(self):
request_list = []
prioritys = []
callbacks = []
while self._requests_deque:
request = self._requests_deque.popleft()
self._is_adding_to_db = True
if callable(request):
# 函数
# 注意:应该考虑闭包情况。闭包情况可写成
# def test(xxx = xxx):
# # TODO 业务逻辑 使用 xxx
# 这么写不会导致xxx为循环结束后的最后一个值
callbacks.append(request)
continue
priority = request.priority
# 如果需要去重并且库中已重复 则continue
if (
request.filter_repeat
and setting.REQUEST_FILTER_ENABLE
and not self.__class__.dedup.add(request.fingerprint)
):
log.debug("request已存在 url = %s" % request.url)
continue
else:
request_list.append(str(request.to_dict))
prioritys.append(priority)
if len(request_list) > MAX_URL_COUNT:
self._db.zadd(self._table_request, request_list, prioritys)
request_list = []
prioritys = []
# 入库
if request_list:
self._db.zadd(self._table_request, request_list, prioritys)
# 执行回调
for callback in callbacks:
try:
callback()
except Exception as e:
log.exception(e)
# 删除已做任务
if self._del_requests_deque:
request_done_list = []
while self._del_requests_deque:
request_done_list.append(self._del_requests_deque.popleft())
# 去掉request_list中的requests, 否则可能会将刚添加的request删除
request_done_list = list(set(request_done_list) - set(request_list))
if request_done_list:
self._db.zrem(self._table_request, request_done_list)
self._is_adding_to_db = False