forked from Boris-code/feapder
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcollector.py
More file actions
176 lines (145 loc) · 5.25 KB
/
collector.py
File metadata and controls
176 lines (145 loc) · 5.25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
# -*- coding: utf-8 -*-
"""
Created on 2016-12-23 11:24
---------
@summary: request 管理
---------
@author: Boris
@email: boris_liu@foxmail.com
"""
import collections
import threading
import time
import feapder.setting as setting
import feapder.utils.tools as tools
from feapder.db.redisdb import RedisDB
from feapder.network.request import Request
from feapder.utils.log import log
class Collector(threading.Thread):
def __init__(self, redis_key):
"""
@summary:
---------
@param redis_key:
---------
@result:
"""
super(Collector, self).__init__()
self._db = RedisDB()
self._thread_stop = False
self._todo_requests = collections.deque()
self._tab_requests = setting.TAB_REQUSETS.format(redis_key=redis_key)
self._tab_spider_status = setting.TAB_SPIDER_STATUS.format(redis_key=redis_key)
self._spider_mark = tools.get_localhost_ip() + f"-{time.time()}"
self._interval = setting.COLLECTOR_SLEEP_TIME
self._request_count = setting.COLLECTOR_TASK_COUNT
self._is_collector_task = False
self._first_get_task = True
self.__delete_dead_node()
def run(self):
self._thread_stop = False
while not self._thread_stop:
try:
self.__report_node_heartbeat()
self.__input_data()
except Exception as e:
log.exception(e)
self._is_collector_task = False
time.sleep(self._interval)
def stop(self):
self._thread_stop = True
self._started.clear()
def __input_data(self):
current_timestamp = tools.get_current_timestamp()
if len(self._todo_requests) >= self._request_count:
return
request_count = self._request_count # 先赋值
# 查询最近有心跳的节点数量
spider_count = self._db.zget_count(
self._tab_spider_status,
priority_min=current_timestamp - (self._interval + 10),
priority_max=current_timestamp,
)
# 根据等待节点数量,动态分配request
if spider_count:
# 任务数量
task_count = self._db.zget_count(self._tab_requests)
# 动态分配的数量 = 任务数量 / 休息的节点数量 + 1
request_count = task_count // spider_count + 1
request_count = (
request_count
if request_count <= self._request_count
else self._request_count
)
if not request_count:
return
# 当前无其他节点,并且是首次取任务,则重置丢失的任务
if self._first_get_task and spider_count <= 1:
datas = self._db.zrangebyscore_set_score(
self._tab_requests,
priority_min=current_timestamp,
priority_max=current_timestamp + setting.REQUEST_LOST_TIMEOUT,
score=300,
count=None,
)
self._first_get_task = False
lose_count = len(datas)
if lose_count:
log.info("重置丢失任务完毕,共{}条".format(len(datas)))
# 取任务,只取当前时间搓以内的任务,同时将任务分数修改为 current_timestamp + setting.REQUEST_LOST_TIMEOUT
requests_list = self._db.zrangebyscore_set_score(
self._tab_requests,
priority_min="-inf",
priority_max=current_timestamp,
score=current_timestamp + setting.REQUEST_LOST_TIMEOUT,
count=request_count,
)
if requests_list:
self._is_collector_task = True
# 存request
self.__put_requests(requests_list)
def __report_node_heartbeat(self):
"""
汇报节点心跳,以便任务平均分配
"""
self._db.zadd(
self._tab_spider_status, self._spider_mark, tools.get_current_timestamp()
)
def __delete_dead_node(self):
"""
删除没有心跳的节点信息
"""
self._db.zremrangebyscore(
self._tab_spider_status,
"-inf",
tools.get_current_timestamp() - (self._interval + 10),
)
def __put_requests(self, requests_list):
for request in requests_list:
try:
request_dict = {
"request_obj": Request.from_dict(eval(request)),
"request_redis": request,
}
except Exception as e:
log.exception(
"""
error %s
request %s
"""
% (e, request)
)
request_dict = None
if request_dict:
self._todo_requests.append(request_dict)
def get_requests(self, count):
requests = []
count = count if count <= len(self._todo_requests) else len(self._todo_requests)
while count:
requests.append(self._todo_requests.popleft())
count -= 1
return requests
def get_requests_count(self):
return len(self._todo_requests) or self._db.zget_count(self._tab_requests) or 0
def is_collector_task(self):
return self._is_collector_task