# -*- coding: utf-8 -*-
"""
Created on 2017-01-03 16:06
---------
@summary: parser æ§å¶ç±»
---------
@author: Boris
@email: [email protected]
"""
import random
import threading
import time
from collections.abc import Iterable
import feapder.setting as setting
import feapder.utils.tools as tools
from feapder.buffer.item_buffer import ItemBuffer
from feapder.db.memory_db import MemoryDB
from feapder.network.item import Item
from feapder.network.request import Request
from feapder.utils import metrics
from feapder.utils.log import log
class PaserControl(threading.Thread):
DOWNLOAD_EXCEPTION = "download_exception"
DOWNLOAD_SUCCESS = "download_success"
DOWNLOAD_TOTAL = "download_total"
PAESERS_EXCEPTION = "parser_exception"
is_show_tip = False
# 宿¶ç»è®¡å·²å任塿°åå¤±è´¥ä»»å¡æ°ï¼è¥å¤±è´¥ä»»å¡æ°/å·²å任塿°>0.5 忥è¦
_success_task_count = 0
_failed_task_count = 0
def __init__(self, collector, redis_key, request_buffer, item_buffer):
super(PaserControl, self).__init__()
self._parsers = []
self._collector = collector
self._redis_key = redis_key
self._request_buffer = request_buffer
self._item_buffer = item_buffer
self._thread_stop = False
self._wait_task_time = 0
def run(self):
self._thread_stop = False
while not self._thread_stop:
try:
requests = self._collector.get_requests(setting.SPIDER_TASK_COUNT)
if not requests:
if not self.is_show_tip:
log.debug("parser çå¾
ä»»å¡...")
self.is_show_tip = True
# log.debug('parser çå¾
ä»»å¡{}...'.format(tools.format_seconds(self._wait_task_time)))
time.sleep(1)
self._wait_task_time += 1
continue
self.is_show_tip = False
self.deal_requests(requests)
except Exception as e:
log.exception(e)
time.sleep(3)
def is_not_task(self):
return self.is_show_tip
@classmethod
def get_task_status_count(cls):
return cls._failed_task_count, cls._success_task_count
def deal_requests(self, requests):
for request in requests:
response = None
request_redis = request["request_redis"]
request = request["request_obj"]
del_request_redis_after_item_to_db = False
del_request_redis_after_request_to_db = False
for parser in self._parsers:
if parser.name == request.parser_name:
used_download_midware_enable = False
try:
# è®°å½éä¸è½½çææ¡£
self.record_download_status(
PaserControl.DOWNLOAD_TOTAL, parser.name
)
# è§£ærequest
if request.auto_request:
request_temp = None
response = None
# ä¸è½½ä¸é´ä»¶
if request.download_midware:
if isinstance(request.download_midware, (list, tuple)):
request_temp = request
for download_midware in request.download_midware:
download_midware = (
download_midware
if callable(download_midware)
else tools.get_method(
parser, download_midware
)
)
request_temp = download_midware(request_temp)
else:
download_midware = (
request.download_midware
if callable(request.download_midware)
else tools.get_method(
parser, request.download_midware
)
)
request_temp = download_midware(request)
elif request.download_midware != False:
request_temp = parser.download_midware(request)
# 请æ±
if request_temp:
if (
isinstance(request_temp, (tuple, list))
and len(request_temp) == 2
):
request_temp, response = request_temp
if not isinstance(request_temp, Request):
raise Exception(
"download_midware need return a request, but received type: {}".format(
type(request_temp)
)
)
used_download_midware_enable = True
if not response:
response = (
request_temp.get_response()
if not setting.RESPONSE_CACHED_USED
else request_temp.get_response_from_cached(
save_cached=False
)
)
else:
response = (
request.get_response()
if not setting.RESPONSE_CACHED_USED
else request.get_response_from_cached(
save_cached=False
)
)
if response == None:
raise Exception(
"è¿æ¥è¶
æ¶ url: %s" % (request.url or request_temp.url)
)
else:
response = None
# æ ¡éª
if parser.validate(request, response) == False:
continue
if request.callback: # 妿æparserçåè°å½æ°ï¼åç¨åè°å¤ç
callback_parser = (
request.callback
if callable(request.callback)
else tools.get_method(parser, request.callback)
)
results = callback_parser(request, response)
else: # å¦åé»è®¤ç¨parserå¤ç
results = parser.parse(request, response)
if results and not isinstance(results, Iterable):
raise Exception(
"%s.%sè¿åå¼å¿
é¡»å¯è¿ä»£"
% (parser.name, request.callback or "parse")
)
# æ è¯ä¸ä¸ä¸ªresultæ¯ä»ä¹
result_type = 0 # 0\1\2 (åå§å¼\request\item)
# æ¤å¤å¤ææ¯request è¿æ¯ item
for result in results or []:
if isinstance(result, Request):
result_type = 1
# ç»requestç parser_name èµå¼
result.parser_name = result.parser_name or parser.name
# 夿æ¯åæ¥çcallbackè¿æ¯å¼æ¥ç
if result.request_sync: # 忥
request_dict = {
"request_obj": result,
"request_redis": None,
}
requests.append(request_dict)
else: # 弿¥
# å°next_request å
¥åº
self._request_buffer.put_request(result)
del_request_redis_after_request_to_db = True
elif isinstance(result, Item):
result_type = 2
# å°itemå
¥åº
self._item_buffer.put_item(result)
# éå 餿£å¨åçrequest
del_request_redis_after_item_to_db = True
elif callable(result): # resultä¸ºå¯æ§è¡çæ å彿°
if (
result_type == 2
): # item ç callbackï¼bufferéçitemåå
¥åºååæ§è¡
self._item_buffer.put_item(result)
del_request_redis_after_item_to_db = True
else: # result_type == 1: # request ç callbackï¼bufferéçrequeståå
¥åºååæ§è¡ãå¯è½æçparserç´æ¥è¿åcallback
self._request_buffer.put_request(result)
del_request_redis_after_request_to_db = True
elif result is not None:
function_name = "{}.{}".format(
parser.name,
(
request.callback
and callable(request.callback)
and getattr(request.callback, "__name__")
or request.callback
)
or "parse",
)
raise TypeError(
f"{function_name} result expect RequestãItem or callback, bug get type: {type(result)}"
)
except Exception as e:
exception_type = (
str(type(e)).replace("