# -*- coding: utf-8 -*- """ Created on 2018-06-19 17:17 --------- @summary: item 管çå¨ï¼ è´è´£ç¼å²æ·»å å°æ°æ®åºä¸çitemï¼ ç±è¯¥managerç»ä¸æ·»å ã鲿¢å¤çº¿ç¨åæ¶è®¿é®æ°æ®åº --------- @author: Boris @email: [email protected] """ import importlib import threading from queue import Queue import feapder.setting as setting import feapder.utils.tools as tools from feapder.db.redisdb import RedisDB from feapder.dedup import Dedup from feapder.network.item import Item, UpdateItem from feapder.pipelines import BasePipeline from feapder.pipelines.mysql_pipeline import MysqlPipeline from feapder.utils import metrics from feapder.utils.log import log MAX_ITEM_COUNT = 5000 # ç¼å䏿大itemæ° UPLOAD_BATCH_MAX_SIZE = 1000 MYSQL_PIPELINE_PATH = "feapder.pipelines.mysql_pipeline.MysqlPipeline" class ItemBuffer(threading.Thread): dedup = None __redis_db = None def __init__(self, redis_key, task_table=None): if not hasattr(self, "_table_item"): super(ItemBuffer, self).__init__() self._thread_stop = False self._is_adding_to_db = False self._redis_key = redis_key self._task_table = task_table self._items_queue = Queue(maxsize=MAX_ITEM_COUNT) self._table_request = setting.TAB_REQUSETS.format(redis_key=redis_key) self._table_failed_items = setting.TAB_FAILED_ITEMS.format( redis_key=redis_key ) self._item_tables = { # 'item_name': 'table_name' # ç¼åitemåä¸è¡¨å对åºå ³ç³» } self._item_update_keys = { # 'table_name': ['id', 'name'...] # ç¼åtable_nameä¸__update_key__çå ³ç³» } self._pipelines = self.load_pipelines() self._have_mysql_pipeline = MYSQL_PIPELINE_PATH in setting.ITEM_PIPELINES self._mysql_pipeline = None if setting.ITEM_FILTER_ENABLE and not self.__class__.dedup: self.__class__.dedup = Dedup( to_md5=False, **setting.ITEM_FILTER_SETTING ) # 导åºéè¯çæ¬¡æ° self.export_retry_times = 0 # 导åºå¤±è´¥çæ¬¡æ° TODO éairç¬è«ä½¿ç¨redisç»è®¡ self.export_falied_times = 0 @property def redis_db(self): if self.__class__.__redis_db is None: self.__class__.__redis_db = RedisDB() return self.__class__.__redis_db def load_pipelines(self): pipelines = [] for pipeline_path in setting.ITEM_PIPELINES: module, class_name = pipeline_path.rsplit(".", 1) pipeline_cls = importlib.import_module(module).__getattribute__(class_name) pipeline = pipeline_cls() if not isinstance(pipeline, BasePipeline): raise ValueError(f"{pipeline_path} éç»§æ¿ feapder.pipelines.BasePipeline") pipelines.append(pipeline) return pipelines @property def mysql_pipeline(self): if not self._mysql_pipeline: module, class_name = MYSQL_PIPELINE_PATH.rsplit(".", 1) pipeline_cls = importlib.import_module(module).__getattribute__(class_name) self._mysql_pipeline = pipeline_cls() return self._mysql_pipeline def run(self): self._thread_stop = False while not self._thread_stop: self.flush() tools.delay_time(1) self.close() def stop(self): self._thread_stop = True self._started.clear() def put_item(self, item): if isinstance(item, Item): # å ¥åºåçåè° item.pre_to_db() self._items_queue.put(item) def flush(self): try: items = [] update_items = [] requests = [] callbacks = [] items_fingerprints = [] data_count = 0 while not self._items_queue.empty(): data = self._items_queue.get_nowait() data_count += 1 # data åç±» if callable(data): callbacks.append(data) elif isinstance(data, UpdateItem): update_items.append(data) elif isinstance(data, Item): items.append(data) if setting.ITEM_FILTER_ENABLE: items_fingerprints.append(data.fingerprint) else: # request-redis requests.append(data) if data_count >= UPLOAD_BATCH_MAX_SIZE: self.__add_item_to_db( items, update_items, requests, callbacks, items_fingerprints ) items = [] update_items = [] requests = [] callbacks = [] items_fingerprints = [] data_count = 0 if data_count: self.__add_item_to_db( items, update_items, requests, callbacks, items_fingerprints ) except Exception as e: log.exception(e) def get_items_count(self): return self._items_queue.qsize() def is_adding_to_db(self): return self._is_adding_to_db def __dedup_items(self, items, items_fingerprints): """ å»é @param items: @param items_fingerprints: @return: è¿åå»éåçitems, items_fingerprints """ if not items: return items, items_fingerprints is_exists = self.__class__.dedup.get(items_fingerprints) is_exists = is_exists if isinstance(is_exists, list) else [is_exists] dedup_items = [] dedup_items_fingerprints = [] items_count = dedup_items_count = dup_items_count = 0 while is_exists: item = items.pop(0) items_fingerprint = items_fingerprints.pop(0) is_exist = is_exists.pop(0) items_count += 1 if not is_exist: dedup_items.append(item) dedup_items_fingerprints.append(items_fingerprint) dedup_items_count += 1 else: dup_items_count += 1 log.info( "å¾ å ¥åºæ°æ® {} æ¡ï¼ éå¤ {} æ¡ï¼å®é å¾ å ¥åºæ°æ® {} æ¡".format( items_count, dup_items_count, dedup_items_count ) ) return dedup_items, dedup_items_fingerprints def __pick_items(self, items, is_update_item=False): """ å°æ¯ä¸ªè¡¨ä¹é´çæ°æ®åå¼ æåå åitems为空 @param items: @param is_update_item: @return: """ datas_dict = { # 'table_name': [{}, {}] } while items: item = items.pop(0) # åitemä¸åçº¿æ ¼å¼çå # ä¸å线类çåå ä»dictä¸åï¼æ²¡æåç°åï¼ç¶ååå ¥dictãå å¿«ä¸æ¬¡åçé度 item_name = item.item_name table_name = self._item_tables.get(item_name) if not table_name: table_name = item.table_name self._item_tables[item_name] = table_name if table_name not in datas_dict: datas_dict[table_name] = [] datas_dict[table_name].append(item.to_dict) if is_update_item and table_name not in self._item_update_keys: self._item_update_keys[table_name] = item.update_key return datas_dict def __export_to_db(self, table, datas, is_update=False, update_keys=()): # æç¹ æ ¡éª self.check_datas(table=table, datas=datas) for pipeline in self._pipelines: if is_update: if table == self._task_table and not isinstance( pipeline, MysqlPipeline ): continue if not pipeline.update_items(table, datas, update_keys=update_keys): log.error( f"{pipeline.__class__.__name__} æ´æ°æ°æ®å¤±è´¥. table: {table} items: {datas}" ) return False else: if not pipeline.save_items(table, datas): log.error( f"{pipeline.__class__.__name__} ä¿åæ°æ®å¤±è´¥. table: {table} items: {datas}" ) return False # è¥æ¯ä»»å¡è¡¨, ä¸ä¸é¢çpipelineéæ²¡mysqlï¼åéè°ç¨mysqlæ´æ°ä»»å¡ if not self._have_mysql_pipeline and is_update and table == self._task_table: if not self.mysql_pipeline.update_items( table, datas, update_keys=update_keys ): log.error( f"{self.mysql_pipeline.__class__.__name__} æ´æ°æ°æ®å¤±è´¥. table: {table} items: {datas}" ) return False return True def __add_item_to_db( self, items, update_items, requests, callbacks, items_fingerprints ): export_success = True self._is_adding_to_db = True # å»é if setting.ITEM_FILTER_ENABLE: items, items_fingerprints = self.__dedup_items(items, items_fingerprints) # åæ¡ items_dict = self.__pick_items(items) update_items_dict = self.__pick_items(update_items, is_update_item=True) # itemæ¹éå ¥åº failed_items = {"add": [], "update": [], "requests": []} while items_dict: table, datas = items_dict.popitem() log.debug( """ -------------- item æ¹éå ¥åº -------------- 表å: %s datas: %s """ % (table, tools.dumps_json(datas, indent=16)) ) if not self.__export_to_db(table, datas): export_success = False failed_items["add"].append({"table": table, "datas": datas}) # æ§è¡æ¹éupdate while update_items_dict: table, datas = update_items_dict.popitem() log.debug( """ -------------- item æ¹éæ´æ° -------------- 表å: %s datas: %s """ % (table, tools.dumps_json(datas, indent=16)) ) update_keys = self._item_update_keys.get(table) if not self.__export_to_db( table, datas, is_update=True, update_keys=update_keys ): export_success = False failed_items["update"].append({"table": table, "datas": datas}) if export_success: # æ§è¡åè° while callbacks: try: callback = callbacks.pop(0) callback() except Exception as e: log.exception(e) # å é¤åè¿çrequest if requests: self.redis_db.zrem(self._table_request, requests) # å»éå ¥åº if setting.ITEM_FILTER_ENABLE: if items_fingerprints: self.__class__.dedup.add(items_fingerprints, skip_check=True) else: failed_items["requests"] = requests if self.export_retry_times > setting.EXPORT_DATA_MAX_RETRY_TIMES: if self._redis_key != "air_spider": # 失败çitemè®°å½å°redis self.redis_db.sadd(self._table_failed_items, failed_items) # å é¤åè¿çrequest if requests: self.redis_db.zrem(self._table_request, requests) log.error( "å ¥åºè¶ è¿æå¤§éè¯æ¬¡æ°ï¼ä¸åéè¯ï¼æ°æ®è®°å½å°redisï¼items:\n {}".format( tools.dumps_json(failed_items) ) ) self.export_retry_times = 0 else: tip = ["å ¥åºä¸æå"] if callbacks: tip.append("䏿§è¡åè°") if requests: tip.append("ä¸å é¤ä»»å¡") exists = self.redis_db.zexists(self._table_request, requests) for exist, request in zip(exists, requests): if exist: self.redis_db.zadd(self._table_request, requests, 300) if setting.ITEM_FILTER_ENABLE: tip.append("æ°æ®ä¸å ¥å»éåº") if self._redis_key != "air_spider": tip.append("å°èªå¨éè¯") tip.append("失败items:\n {}".format(tools.dumps_json(failed_items))) log.error("ï¼".join(tip)) self.export_falied_times += 1 if self._redis_key != "air_spider": self.export_retry_times += 1 if self.export_falied_times > setting.EXPORT_DATA_MAX_FAILED_TIMES: # æ¥è¦ msg = "ã{}ãç¬è«å¯¼åºæ°æ®å¤±è´¥ï¼å¤±è´¥æ¬¡æ°ï¼{}ï¼è¯·æ£æ¥ç¬è«æ¯å¦æ£å¸¸".format( self._redis_key, self.export_falied_times ) log.error(msg) tools.send_msg( msg=msg, level="error", message_prefix="ã%sãç¬è«å¯¼åºæ°æ®å¤±è´¥" % (self._redis_key), ) self._is_adding_to_db = False def check_datas(self, table, datas): """ æç¹ è®°å½æ»æ¡æ°åæ¯ä¸ªkeyæ åµ @param table: 表å @param datas: æ°æ® å表 @return: """ metrics.emit_counter("total count", len(datas), classify=table) for data in datas: for k, v in data.items(): metrics.emit_counter(k, int(bool(v)), classify=table) def close(self): # è°ç¨pipelineçcloseæ¹æ³ for pipeline in self._pipelines: try: pipeline.close() except: pass