forked from Boris-code/feapder
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathspider.py
More file actions
437 lines (355 loc) · 14.6 KB
/
Copy pathspider.py
File metadata and controls
437 lines (355 loc) · 14.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
# -*- coding: utf-8 -*-
"""
Created on 2020/4/22 12:05 AM
---------
@summary:
---------
@author: Boris
@email: [email protected]
"""
import time
import warnings
from collections.abc import Iterable
import feapder.setting as setting
import feapder.utils.tools as tools
from feapder.core.base_parser import BaseParser
from feapder.core.scheduler import Scheduler
from feapder.db.redisdb import RedisDB
from feapder.network.item import Item
from feapder.network.request import Request
from feapder.utils.log import log
CONSOLE_PIPELINE_PATH = "feapder.pipelines.console_pipeline.ConsolePipeline"
class Spider(
BaseParser, Scheduler
): # threading 中有name函数, 必须先继承BaseParser 否则其内部的name会被Schedule的基类threading.Thread的name覆盖
"""
@summary: 为了简化搭建爬虫
---------
"""
def __init__(
self,
redis_key=None,
min_task_count=1,
check_task_interval=5,
thread_count=None,
begin_callback=None,
end_callback=None,
delete_keys=(),
keep_alive=None,
auto_start_requests=None,
batch_interval=0,
wait_lock=True,
**kwargs
):
"""
@summary: 爬虫
---------
@param redis_key: 任务等数据存放在redis中的key前缀
@param min_task_count: 任务队列中最少任务数, 少于这个数量才会添加任务,默认1。start_monitor_task 模式下生效
@param check_task_interval: 检查是否还有任务的时间间隔;默认5秒
@param thread_count: 线程数,默认为配置文件中的线程数
@param begin_callback: 爬虫开始回调函数
@param end_callback: 爬虫结束回调函数
@param delete_keys: 爬虫启动时删除的key,类型: 元组/bool/string。 支持正则; 常用于清空任务队列,否则重启时会断点续爬
@param keep_alive: 爬虫是否常驻
@param auto_start_requests: 爬虫是否自动添加任务
@param batch_interval: 抓取时间间隔 默认为0 天为单位 多次启动时,只有当前时间与第一次抓取结束的时间间隔大于指定的时间间隔时,爬虫才启动
@param wait_lock: 下发任务时否等待锁,若不等待锁,可能会存在多进程同时在下发一样的任务,因此分布式环境下请将该值设置True
---------
@result:
"""
super(Spider, self).__init__(
redis_key=redis_key,
thread_count=thread_count,
begin_callback=begin_callback,
end_callback=end_callback,
delete_keys=delete_keys,
keep_alive=keep_alive,
auto_start_requests=auto_start_requests,
batch_interval=batch_interval,
wait_lock=wait_lock,
**kwargs
)
self._min_task_count = min_task_count
self._check_task_interval = check_task_interval
self._is_distributed_task = False
self._is_show_not_task = False
def start_monitor_task(self, *args, **kws):
if not self.is_reach_next_spider_time():
return
self._auto_start_requests = False
redisdb = RedisDB()
if not self._parsers: # 不是add_parser 模式
self._parsers.append(self)
while True:
try:
# 检查redis中是否有任务
tab_requests = setting.TAB_REQUSETS.format(redis_key=self._redis_key)
todo_task_count = redisdb.zget_count(tab_requests)
if todo_task_count < self._min_task_count: # 添加任务
# make start requests
self.distribute_task(*args, **kws)
else:
log.info("redis 中尚有%s条积压任务,暂时不派发新任务" % todo_task_count)
except Exception as e:
log.exception(e)
if not self._keep_alive:
break
time.sleep(self._check_task_interval)
def distribute_task(self, *args, **kws):
"""
@summary: 分发任务 并将返回的request入库
---------
@param tasks:
---------
@result:
"""
self._is_distributed_task = False
for parser in self._parsers:
requests = parser.start_requests(*args, **kws)
if requests and not isinstance(requests, Iterable):
raise Exception("%s.%s返回值必须可迭代" % (parser.name, "start_requests"))
result_type = 1
for request in requests or []:
if isinstance(request, Request):
request.parser_name = request.parser_name or parser.name
self._request_buffer.put_request(request)
self._is_distributed_task = True
result_type = 1
elif isinstance(request, Item):
self._item_buffer.put_item(request)
result_type = 2
elif callable(request): # callbale的request可能是更新数据库操作的函数
if result_type == 1:
self._request_buffer.put_request(request)
else:
self._item_buffer.put_item(request)
else:
raise TypeError(
"start_requests yield result type error, expect Request、Item、callback func, bug get type: {}".format(
type(request)
)
)
self._request_buffer.flush()
self._item_buffer.flush()
if self._is_distributed_task: # 有任务时才提示启动爬虫
# begin
self.spider_begin()
self.record_spider_state(
spider_type=1,
state=0,
batch_date=tools.get_current_date(),
spider_start_time=tools.get_current_date(),
batch_interval=self._batch_interval,
)
# 重置已经提示无任务状态为False
self._is_show_not_task = False
elif not self._is_show_not_task: # 无任务,且没推送过无任务信息
# 发送无任务消息
msg = "《%s》start_requests无任务添加" % (self._spider_name)
log.info(msg)
# self.send_msg(msg)
self._is_show_not_task = True
def run(self):
if not self.is_reach_next_spider_time():
return
if not self._parsers: # 不是add_parser 模式
self._parsers.append(self)
self._start()
while True:
try:
if self.all_thread_is_done():
if not self._is_notify_end:
self.spider_end() # 跑完一轮
self.record_spider_state(
spider_type=1,
state=1,
spider_end_time=tools.get_current_date(),
batch_interval=self._batch_interval,
)
self._is_notify_end = True
if not self._keep_alive:
self._stop_all_thread()
break
else:
self._is_notify_end = False
self.check_task_status()
except Exception as e:
log.exception(e)
tools.delay_time(1) # 1秒钟检查一次爬虫状态
@classmethod
def to_DebugSpider(cls, *args, **kwargs):
# DebugSpider 继承 cls
DebugSpider.__bases__ = (cls,)
DebugSpider.__name__ = cls.__name__
return DebugSpider(*args, **kwargs)
class DebugSpider(Spider):
"""
Debug爬虫
"""
__debug_custom_setting__ = dict(
COLLECTOR_SLEEP_TIME=1,
COLLECTOR_TASK_COUNT=1,
# SPIDER
SPIDER_THREAD_COUNT=1,
SPIDER_SLEEP_TIME=0,
SPIDER_TASK_COUNT=1,
SPIDER_MAX_RETRY_TIMES=10,
REQUEST_LOST_TIMEOUT=600, # 10分钟
PROXY_ENABLE=False,
RETRY_FAILED_REQUESTS=False,
# 保存失败的request
SAVE_FAILED_REQUEST=False,
# 过滤
ITEM_FILTER_ENABLE=False,
REQUEST_FILTER_ENABLE=False,
OSS_UPLOAD_TABLES=(),
DELETE_KEYS=True,
ITEM_PIPELINES=[CONSOLE_PIPELINE_PATH],
)
def __init__(self, request=None, request_dict=None, *args, **kwargs):
"""
@param request: request 类对象
@param request_dict: request 字典。 request 与 request_dict 二者选一即可
@param kwargs:
"""
warnings.warn(
"您正处于debug模式下,该模式下不会更新任务状态及数据入库,仅用于调试。正式发布前请更改为正常模式", category=Warning
)
if not request and not request_dict:
raise Exception("request 与 request_dict 不能同时为null")
kwargs["redis_key"] = kwargs["redis_key"] + "_debug"
self.__class__.__custom_setting__.update(
self.__class__.__debug_custom_setting__
)
super(DebugSpider, self).__init__(*args, **kwargs)
self._request = request or Request.from_dict(request_dict)
def save_cached(self, request, response, table):
pass
def delete_tables(self, delete_tables_list):
if isinstance(delete_tables_list, bool):
delete_tables_list = [self._redis_key + "*"]
elif not isinstance(delete_tables_list, (list, tuple)):
delete_tables_list = [delete_tables_list]
redis = RedisDB()
for delete_tab in delete_tables_list:
if delete_tab == "*":
delete_tab = self._redis_key + "*"
tables = redis.getkeys(delete_tab)
for table in tables:
log.debug("正在清理表 %s" % table)
redis.clear(table)
def __start_requests(self):
yield self._request
def distribute_task(self):
"""
@summary: 分发任务 并将返回的request入库
---------
---------
@result:
"""
self._is_distributed_task = False
for parser in self._parsers:
requests = parser.__start_requests()
if requests and not isinstance(requests, Iterable):
raise Exception("%s.%s返回值必须可迭代" % (parser.name, "start_requests"))
result_type = 1
for request in requests or []:
if isinstance(request, Request):
request.parser_name = request.parser_name or parser.name
self._request_buffer.put_request(request)
self._is_distributed_task = True
result_type = 1
elif isinstance(request, Item):
self._item_buffer.put_item(request)
result_type = 2
elif callable(request): # callbale的request可能是更新数据库操作的函数
if result_type == 1:
self._request_buffer.put_request(request)
else:
self._item_buffer.put_item(request)
self._request_buffer.flush()
self._item_buffer.flush()
if self._is_distributed_task: # 有任务时才提示启动爬虫
# begin
self.spider_begin()
self.record_spider_state(
spider_type=1,
state=0,
batch_date=tools.get_current_date(),
spider_start_time=tools.get_current_date(),
batch_interval=self._batch_interval,
)
# 重置已经提示无任务状态为False
self._is_show_not_task = False
elif not self._is_show_not_task: # 无任务,且没推送过无任务信息
# 发送无任务消息
msg = "《%s》start_requests无任务添加" % (self._spider_name)
log.info(msg)
# self.send_msg(msg)
self._is_show_not_task = True
def record_spider_state(
self,
spider_type,
state,
batch_date=None,
spider_start_time=None,
spider_end_time=None,
batch_interval=None,
):
pass
def _start(self):
# 启动parser 的 start_requests
self.spider_begin() # 不自动结束的爬虫此处只能执行一遍
for parser in self._parsers:
results = parser.__start_requests()
# 添加request到请求队列,由请求队列统一入库
if results and not isinstance(results, Iterable):
raise Exception("%s.%s返回值必须可迭代" % (parser.name, "start_requests"))
result_type = 1
for result in results or []:
if isinstance(result, Request):
result.parser_name = result.parser_name or parser.name
self._request_buffer.put_request(result)
result_type = 1
elif isinstance(result, Item):
self._item_buffer.put_item(result)
result_type = 2
elif callable(result): # callbale的request可能是更新数据库操作的函数
if result_type == 1:
self._request_buffer.put_request(result)
else:
self._item_buffer.put_item(result)
self._request_buffer.flush()
self._item_buffer.flush()
# 启动collector
self._collector.start()
# 启动parser control
for i in range(self._thread_count):
parser_control = self._parser_control_obj(
self._collector,
self._redis_key,
self._request_buffer,
self._item_buffer,
)
for parser in self._parsers:
parser_control.add_parser(parser)
parser_control.start()
self._parser_controls.append(parser_control)
# 启动request_buffer
self._request_buffer.start()
# 启动item_buffer
self._item_buffer.start()
def run(self):
if not self._parsers: # 不是add_parser 模式
self._parsers.append(self)
self._start()
while True:
try:
if self.all_thread_is_done():
self._stop_all_thread()
break
except Exception as e:
log.exception(e)
tools.delay_time(1) # 1秒钟检查一次爬虫状态
self.delete_tables([self._redis_key + "*"])