# -*- coding: utf-8 -*- """ Created on 2018-06-19 17:17 --------- @summary: request 管çå¨ï¼ è´è´£ç¼å²æ·»å å°æ°æ®åºä¸çrequest --------- @author: Boris @email: [email protected] """ import collections import threading import feapder.setting as setting import feapder.utils.tools as tools from feapder.db.redisdb import RedisDB from feapder.dedup import Dedup from feapder.utils.log import log MAX_URL_COUNT = 1000 # ç¼å䏿大requestæ° class RequestBuffer(threading.Thread): dedup = None def __init__(self, redis_key): if not hasattr(self, "_requests_deque"): super(RequestBuffer, self).__init__() self._thread_stop = False self._is_adding_to_db = False self._requests_deque = collections.deque() self._del_requests_deque = collections.deque() self._db = RedisDB() self._table_request = setting.TAB_REQUSETS.format(redis_key=redis_key) self._table_failed_request = setting.TAB_FAILED_REQUSETS.format( redis_key=redis_key ) if not self.__class__.dedup and setting.REQUEST_FILTER_ENABLE: self.__class__.dedup = Dedup( name=redis_key, to_md5=False, **setting.REQUEST_FILTER_SETTING ) # é»è®¤è¿ææ¶é´ä¸ºä¸ä¸ªæ def run(self): self._thread_stop = False while not self._thread_stop: try: self.__add_request_to_db() except Exception as e: log.exception(e) tools.delay_time(1) def stop(self): self._thread_stop = True self._started.clear() def put_request(self, request): self._requests_deque.append(request) if self.get_requests_count() > MAX_URL_COUNT: # è¶ è¿æå¤§ç¼åï¼ä¸»å¨è°ç¨ self.flush() def put_del_request(self, request): self._del_requests_deque.append(request) def put_failed_request(self, request, table=None): try: request_dict = request.to_dict self._db.zadd( table or self._table_failed_request, request_dict, request.priority ) except Exception as e: log.exception(e) def flush(self): try: self.__add_request_to_db() except Exception as e: log.exception(e) def get_requests_count(self): return len(self._requests_deque) def is_adding_to_db(self): return self._is_adding_to_db def __add_request_to_db(self): request_list = [] prioritys = [] callbacks = [] while self._requests_deque: request = self._requests_deque.popleft() self._is_adding_to_db = True if callable(request): # 彿° # 注æï¼åºè¯¥èèéå æ åµãéå æ åµå¯åæ # def test(xxx = xxx): # # TODO ä¸å¡é»è¾ ä½¿ç¨ xxx # è¿ä¹åä¸ä¼å¯¼è´xxx为循ç¯ç»æåçæåä¸ä¸ªå¼ callbacks.append(request) continue priority = request.priority # 妿éè¦å»éå¹¶ä¸åºä¸å·²éå¤ åcontinue if ( request.filter_repeat and setting.REQUEST_FILTER_ENABLE and not self.__class__.dedup.add(request.fingerprint) ): log.debug("requestå·²åå¨ url = %s" % request.url) continue else: request_list.append(str(request.to_dict)) prioritys.append(priority) if len(request_list) > MAX_URL_COUNT: self._db.zadd(self._table_request, request_list, prioritys) request_list = [] prioritys = [] # å ¥åº if request_list: self._db.zadd(self._table_request, request_list, prioritys) # æ§è¡åè° for callback in callbacks: try: callback() except Exception as e: log.exception(e) # å é¤å·²åä»»å¡ if self._del_requests_deque: request_done_list = [] while self._del_requests_deque: request_done_list.append(self._del_requests_deque.popleft()) # 廿request_listä¸çrequestsï¼ å¦åå¯è½ä¼å°åæ·»å çrequestå é¤ request_done_list = list(set(request_done_list) - set(request_list)) if request_done_list: self._db.zrem(self._table_request, request_done_list) self._is_adding_to_db = False