-
Notifications
You must be signed in to change notification settings - Fork 542
Expand file tree
/
Copy pathbase_parser.py
More file actions
231 lines (192 loc) · 6.35 KB
/
base_parser.py
File metadata and controls
231 lines (192 loc) · 6.35 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
# -*- coding: utf-8 -*-
"""
Created on 2018-07-25 11:41:57
---------
@summary: parser 的基类
---------
@author: Boris
@email: boris_liu@foxmail.com
"""
import os
import feapder.utils.tools as tools
from feapder.db.mysqldb import MysqlDB
from feapder.network.item import UpdateItem
from feapder.utils.log import log
from feapder.network.request import Request
from feapder.network.response import Response
from feapder.utils.perfect_dict import PerfectDict
class BaseParser(object):
def start_requests(self):
"""
@summary: 添加初始url
---------
---------
@result: yield Request()
"""
pass
def download_midware(self, request: Request):
"""
@summary: 下载中间件 可修改请求的一些参数, 或可自定义下载,然后返回 request, response
---------
@param request:
---------
@result: return request / request, response
"""
pass
def validate(self, request: Request, response: Response):
"""
@summary: 校验函数, 可用于校验response是否正确
若函数内抛出异常,则重试请求
若返回True 或 None,则进入解析函数
若返回False,则抛弃当前请求
可通过request.callback_name 区分不同的回调函数,编写不同的校验逻辑
---------
@param request:
@param response:
---------
@result: True / None / False
"""
pass
def parse(self, request: Request, response: Response):
"""
@summary: 默认的解析函数
---------
@param request:
@param response:
---------
@result:
"""
pass
def exception_request(self, request: Request, response: Response, e: Exception):
"""
@summary: 请求或者parser里解析出异常的request
---------
@param request:
@param response:
@param e: 异常
---------
@result: request / callback / None (返回值必须可迭代)
"""
pass
def failed_request(self, request: Request, response: Response, e: Exception):
"""
@summary: 超过最大重试次数的request
可返回修改后的request 若不返回request,则将传进来的request直接人redis的failed表。否则将修改后的request入failed表
---------
@param request:
@param response:
@param e: 异常
---------
@result: request / item / callback / None (返回值必须可迭代)
"""
pass
def start_callback(self):
"""
@summary: 程序开始的回调
---------
---------
@result: None
"""
pass
def end_callback(self):
"""
@summary: 程序结束的回调
---------
---------
@result: None
"""
pass
@property
def name(self):
return self.__class__.__name__
def close(self):
pass
class TaskParser(BaseParser):
def __init__(self, task_table, task_state, mysqldb=None):
self._mysqldb = mysqldb or MysqlDB() # mysqldb
self._task_state = task_state # mysql中任务表的state字段名
self._task_table = task_table # mysql中的任务表
def add_task(self):
"""
@summary: 添加任务, 每次启动start_monitor 都会调用,且在init_task之前调用
---------
---------
@result:
"""
def start_requests(self, task: PerfectDict):
"""
@summary:
---------
@param task: 任务信息 list
---------
@result:
"""
def update_task_state(self, task_id, state=1, **kwargs):
"""
@summary: 更新任务表中任务状态,做完每个任务时代码逻辑中要主动调用。可能会重写
调用方法为 yield lambda : self.update_task_state(task_id, state)
---------
@param task_id:
@param state:
---------
@result:
"""
kwargs["id"] = task_id
kwargs[self._task_state] = state
sql = tools.make_update_sql(
self._task_table, kwargs, condition="id = {task_id}".format(task_id=task_id)
)
if self._mysqldb.update(sql):
log.debug("置任务%s状态成功" % task_id)
else:
log.error("置任务%s状态失败 sql=%s" % (task_id, sql))
update_task = update_task_state
def update_task_batch(self, task_id, state=1, **kwargs):
"""
批量更新任务 多处调用,更新的字段必须一致
注意:需要 写成 yield update_task_batch(...) 否则不会更新
@param task_id:
@param state:
@param kwargs:
@return:
"""
kwargs["id"] = task_id
kwargs[self._task_state] = state
update_item = UpdateItem(**kwargs)
update_item.table_name = self._task_table
update_item.name_underline = self._task_table + "_item"
return update_item
class BatchParser(TaskParser):
"""
@summary: 批次爬虫模版
---------
"""
def __init__(
self, task_table, batch_record_table, task_state, date_format, mysqldb=None
):
super(BatchParser, self).__init__(
task_table=task_table, task_state=task_state, mysqldb=mysqldb
)
self._batch_record_table = batch_record_table # mysql 中的批次记录表
self._date_format = date_format # 批次日期格式
@property
def batch_date(self):
"""
@summary: 获取批次时间
---------
---------
@result:
"""
batch_date = os.environ.get("batch_date")
if not batch_date:
sql = 'select date_format(batch_date, "{date_format}") from {batch_record_table} order by id desc limit 1'.format(
date_format=self._date_format.replace(":%M", ":%i"),
batch_record_table=self._batch_record_table,
)
batch_info = MysqlDB().find(sql) # (('2018-08-19'),)
if batch_info:
os.environ["batch_date"] = batch_date = batch_info[0][0]
else:
log.error("需先运行 start_monitor_task()")
os._exit(137) # 使退出码为35072 方便爬虫管理器重启
return batch_date