# coding:utf8 """ ä»£çæ± """ import datetime import json import os import random import socket import time from urllib import parse import redis import requests from feapder import setting from feapder.utils import tools from feapder.utils.log import log # å»ºç«æ¬å°ç¼å代çæä»¶å¤¹ proxy_path = os.path.join(os.path.dirname(__file__), "proxy_file") if not os.path.exists(proxy_path): os.mkdir(proxy_path) def get_proxies_by_host(host, port): proxy_id = "{}:{}".format(host, port) return get_proxies_by_id(proxy_id) def get_proxies_by_id(proxy_id): proxies = { "http": "http://{}".format(proxy_id), "https": "https://{}".format(proxy_id), } return proxies def get_proxy_from_url(**kwargs): """ è·åæå®urlç代ç :param kwargs: :return: """ proxy_source_url = kwargs.get("proxy_source_url", []) if not isinstance(proxy_source_url, list): proxy_source_url = [proxy_source_url] proxy_source_url = [x for x in proxy_source_url if x] if not proxy_source_url: raise ValueError("no specify proxy_source_url: {}".format(proxy_source_url)) kwargs = kwargs.copy() kwargs.pop("proxy_source_url") proxies_list = [] for url in proxy_source_url: if url.startswith("http"): proxies_list.extend(get_proxy_from_http(url, **kwargs)) elif url.startswith("redis"): proxies_list.extend(get_proxy_from_redis(url, **kwargs)) if proxies_list: # é¡ºåºæä¹± random.shuffle(proxies_list) return proxies_list def get_proxy_from_http(proxy_source_url, **kwargs): """ ä»æå® http å°åè·å代ç :param proxy_source_url: :param kwargs: :return: """ filename = tools.get_md5(proxy_source_url) + ".txt" abs_filename = os.path.join(proxy_path, filename) update_interval = kwargs.get("local_proxy_file_cache_timeout", 60) update_flag = 0 if not update_interval: # å¼ºå¶æ´æ° update_flag = 1 elif not os.path.exists(abs_filename): # æä»¶ä¸åå¨åæ´æ° update_flag = 1 elif time.time() - os.stat(abs_filename).st_mtime > update_interval: # è¶ è¿æ´æ°é´é update_flag = 1 if update_flag: response = requests.get(proxy_source_url, timeout=20) with open(os.path.join(proxy_path, filename), "w") as f: f.write(response.text) return get_proxy_from_file(filename) def get_proxy_from_file(filename, **kwargs): """ 仿宿¬å°æä»¶è·åä»£ç æä»¶æ ¼å¼ ip:port:https ip:port:http ip:port :param filename: :param kwargs: :return: """ proxies_list = [] with open(os.path.join(proxy_path, filename), "r") as f: lines = f.readlines() for line in lines: line = line.strip() if not line: continue # è§£æ auth = "" if "@" in line: auth, line = line.split("@") # items = line.split(":") if len(items) < 2: continue ip, port, *protocol = items if not all([port, ip]): continue if auth: ip = "{}@{}".format(auth, ip) if not protocol: proxies = { "https": "https://%s:%s" % (ip, port), "http": "http://%s:%s" % (ip, port), } else: proxies = {protocol[0]: "%s://%s:%s" % (protocol[0], ip, port)} proxies_list.append(proxies) return proxies_list def get_proxy_from_redis(proxy_source_url, **kwargs): """ ä»æå® redis å°åè·å代ç @param proxy_source_url: redis://:passwd@host:ip/db redis åå¨ç»æ zset ip:port ts @param kwargs: {"redis_proxies_key": "xxx"} @return: [{'http':'http://xxx.xxx.xxx:xxx', 'https':'https://xxx.xxx.xxx.xxx:xxx'}] """ redis_conn = redis.StrictRedis.from_url(proxy_source_url) key = kwargs.get("redis_proxies_key") assert key, "ä»redisä¸è·å代ç éè¦æå® redis_proxies_key" proxies = redis_conn.zrange(key, 0, -1) proxies_list = [] for proxy in proxies: proxy = proxy.decode() proxies_list.append( {"https": "https://%s" % proxy, "http": "http://%s" % proxy} ) return proxies_list def check_proxy( ip="", port="", proxies=None, type=0, timeout=5, logger=None, show_error_log=True, **kwargs, ): """ ä»£çæææ§æ£æ¥ :param ip: :param port: :param type: 0:socket 1:requests :param timeout: :param logger: :return: """ if not logger: logger = log ok = 0 if type == 0 and ip and port: # socketæ£æµæå ä¸ä»£è¡¨ä»£çä¸å®å¯ç¨ Connection closed by foreign host. è¿ç§æ åµå°±ä¸è¡ with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sk: sk.settimeout(timeout) try: # å¿ é¡»æ£æµ å¦åä»£çæ°¸è¿ä¸å·æ° sk.connect((ip, int(port))) ok = 1 except Exception as e: if show_error_log: logger.debug("check proxy failed: {} {}:{}".format(e, ip, port)) sk.close() else: if not proxies: proxies = { "http": "http://{}:{}".format(ip, port), "https": "https://{}:{}".format(ip, port), } try: r = requests.get( "http://www.baidu.com", proxies=proxies, timeout=timeout, stream=True ) ok = 1 r.close() except Exception as e: if show_error_log: logger.debug( "check proxy failed: {} {}:{} {}".format(e, ip, port, proxies) ) return ok class ProxyItem(object): """å个代ç对象""" # ä»£çæ è®° proxy_tag_list = (-1, 0, 1) def __init__( self, proxies=None, valid_timeout=20, check_interval=180, max_proxy_use_num=10000, delay=30, use_interval=None, **kwargs, ): """ :param proxies: :param valid_timeout: ä»£çæ£æµè¶ æ¶æ¶é´ é»è®¤-1 20181008 é»è®¤ä¸åçæµæææ§ :param check_interval: :param max_proxy_use_num: :param delay: :param use_interval: 使ç¨é´é åä½ç§ é»è®¤ä¸éå¶ :param logger: æ¥å¿å¤çå¨ é»è®¤ log.get_logger() :param kwargs: """ # {"http": ..., "https": ...} self.proxies = proxies # æ£æµè¶ æ¶æ¶é´ ç§ self.valid_timeout = valid_timeout # æ£æµé´é ç§ self.check_interval = check_interval # æ è®° 0:æ£å¸¸ -1:ä¸¢å¼ 1: å¾ ä¼åç¨ ... self.flag = 0 # 䏿¬¡ç¶æååæ¶é´ self.flag_ts = 0 # 䏿¬¡æ´æ°æ¶é´ æææ¶é´ self.update_ts = 0 # æå¤§è¢«ä½¿ç¨æ¬¡æ° self.max_proxy_use_num = max_proxy_use_num # è¢«ä½¿ç¨æ¬¡æ°è®°å½ self.use_num = 0 # å»¶è¿ä½¿ç¨æ¶é´ self.delay = delay # 使ç¨é´é åä½ç§ self.use_interval = use_interval # ä½¿ç¨æ¶é´ self.use_ts = 0 self.proxy_args = self.parse_proxies(self.proxies) self.proxy_ip = self.proxy_args["ip"] self.proxy_port = self.proxy_args["port"] self.proxy_ip_port = "{}:{}".format(self.proxy_ip, self.proxy_port) if self.proxy_args["user"]: self.proxy_id = "{user}:{password}@{ip}:{port}".format(**self.proxy_args) else: self.proxy_id = self.proxy_ip_port # æ¥å¿å¤çå¨ self.logger = log def get_proxies(self): self.use_num += 1 return self.proxies def is_delay(self): return self.flag == 1 def is_valid(self, force=0, type=0): """ æ£æµä»£çæ¯å¦ææ 1 ææ 2 å»¶æ¶ä½¿ç¨ 0 æ æ ç´æ¥å¨ä»£çæ± å é¤ :param force: :param type: :return: """ if self.use_num > self.max_proxy_use_num > 0: self.logger.debug("代çè¾¾å°æå¤§ä½¿ç¨æ¬¡æ°: {} {}".format(self.use_num, self.proxies)) return 0 if self.flag == -1: self.logger.debug("代ç被æ è®° -1 ä¸¢å¼ %s" % self.proxies) return 0 if self.delay > 0 and self.flag == 1: if time.time() - self.flag_ts < self.delay: self.logger.debug("代ç被æ è®° 1 å»¶è¿ %s" % self.proxies) return 2 else: self.flag = 0 self.logger.debug("å»¶è¿ä»£çéæ¾: {}".format(self.proxies)) if self.use_interval: if time.time() - self.use_ts < self.use_interval: return 2 if not force: if time.time() - self.update_ts < self.check_interval: return 1 if self.valid_timeout > 0: ok = check_proxy( proxies=self.proxies, type=type, timeout=self.valid_timeout, logger=self.logger, ) else: ok = 1 self.update_ts = time.time() return ok @classmethod def parse_proxies(self, proxies): """ å解代çç»æé¨å :param proxies: :return: """ if not proxies: return {} if isinstance(proxies, (str, bytes)): proxies = json.loads(proxies) protocol = list(proxies.keys()) if not protocol: return {} _url = proxies.get(protocol[0]) if not _url.startswith("http"): _url = "http://" + _url _url_parse = parse.urlparse(_url) netloc = _url_parse.netloc if "@" in netloc: netloc_auth, netloc_host = netloc.split("@") else: netloc_auth, netloc_host = "", netloc ip, *port = netloc_host.split(":") port = port[0] if port else "80" user, *password = netloc_auth.split(":") password = password[0] if password else "" return { "protocol": protocol, "ip": ip, "port": port, "user": user, "password": password, "ip_port": "{}:{}".format(ip, port), } class ProxyPoolBase(object): def __init__(self, *args, **kwargs): pass def get(self, *args, **kwargs): raise NotImplementedError class ProxyPool(ProxyPoolBase): """ä»£çæ± """ def __init__(self, **kwargs): """ :param size: ä»£çæ± å¤§å° -1 为ä¸éå¶ :param proxy_source_url: 代çæä»¶å°å æ¯æå表 :param proxy_instance: æä¾ä»£ççå®ä¾ :param reset_interval: ä»£çæ± éç½®é´é æå°é´é :param reset_interval_max: ä»£çæ± éç½®é´é æå¤§é´é é»è®¤2åé :param check_valid: æ¯å¦å¨è·åä»£çæ¶è¿è¡æ£æµæææ§ :param local_proxy_file_cache_timeout: æ¬å°ç¼åç代çæä»¶è¶ æ¶æ¶é´ :param logger: æ¥å¿å¤çå¨ é»è®¤ log.get_logger() :param kwargs: å ¶ä»çåæ° """ kwargs.setdefault("size", -1) kwargs.setdefault("proxy_source_url", setting.PROXY_EXTRACT_API) super(ProxyPool, self).__init__(**kwargs) # éåæå¤§é¿åº¦ self.max_queue_size = kwargs.get("size", -1) # å®é ä»£çæ°é self.real_max_proxy_count = 1000 # 代çå¯ç¨æå¤§æ¬¡æ° # 代çè·åå°å http://localhost/proxy.txt self.proxy_source_url = kwargs.get("proxy_source_url", []) if not isinstance(self.proxy_source_url, list): self.proxy_source_url = [self.proxy_source_url] self.proxy_source_url = [x for x in self.proxy_source_url if x] self.proxy_source_url = list(set(self.proxy_source_url)) kwargs.update({"proxy_source_url": self.proxy_source_url}) # å¤çæ¥å¿ self.logger = kwargs.get("logger") or log kwargs["logger"] = self.logger if not self.proxy_source_url: self.logger.warn("need set proxy_source_url or proxy_instance") # ä»£çæ± éç½®é´é self.reset_interval = kwargs.get("reset_interval", 5) # 强å¶éç½®ä¸ä¸ä»£ç æ·»å æ°ç代çè¿æ¥ 鲿¢ä¸ç´ä½¿ç¨æ§ç被å°ç代ç self.reset_interval_max = kwargs.get("reset_interval_max", 180) # æ¯å¦çæµä»£çæææ§ self.check_valid = kwargs.get("check_valid", True) # 代çéå self.proxy_queue = None # {代çid: ProxyItem, ...} self.proxy_dict = {} # 失æä»£çéå self.invalid_proxy_dict = {} self.kwargs = kwargs # éç½®ä»£çæ± é self.reset_lock = None # éç½®æ¶é´ self.last_reset_time = 0 # éç½®çå¤ªå¿«äº è®¡æ° self.reset_fast_count = 0 # è®¡æ° è·å代çéè¯3次ä»ç¶å¤±è´¥ æ¬¡æ° self.no_valid_proxy_times = 0 # 䏿¬¡è·åä»£çæ¶é´ self.last_get_ts = time.time() # è®°å½ProxyItemçupdate_ts 鲿¢ç±äºé置太快导è´é夿£æµæææ§ self.proxy_item_update_ts_dict = {} # è¦å self.warn_flag = False def warn(self): if not self.warn_flag: for url in self.proxy_source_url: if "zhima" in url: continue self.warn_flag = True return @property def queue_size(self): """ å½åä»£çæ± ä¸ä»£çæ°é :return: """ return self.proxy_queue.qsize() if self.proxy_queue is not None else 0 def clear(self): """ æ¸ ç©ºèªå·± :return: """ self.proxy_queue = None # {代çip: ProxyItem, ...} self.proxy_dict = {} # æ¸ ç失æä»£çéå _limit = datetime.datetime.now() - datetime.timedelta(minutes=10) self.invalid_proxy_dict = { k: v for k, v in self.invalid_proxy_dict.items() if v > _limit } # æ¸ çè¶ æ¶çupdate_tsè®°å½ _limit = time.time() - 600 self.proxy_item_update_ts_dict = { k: v for k, v in self.proxy_item_update_ts_dict.items() if v > _limit } return def get(self, retry: int = 0) -> dict: """ ä»ä»£çæ± ä¸è·å代ç :param retry: :return: """ retry += 1 if retry > 3: self.no_valid_proxy_times += 1 return None if time.time() - self.last_get_ts > 3 * 60: # 3åéæ²¡æè·åè¿ éç½®ä¸ä¸ try: self.reset_proxy_pool() except Exception as e: self.logger.exception(e) # è®°å½è·åæ¶é´ self.last_get_ts = time.time() # self.warn() proxy_item = self.get_random_proxy() if proxy_item: # 䏿£æµ if not self.check_valid: # å¡åå» proxies = proxy_item.get_proxies() self.put_proxy_item(proxy_item) return proxies else: is_valid = proxy_item.is_valid() if is_valid: # è®°å½update_ts self.proxy_item_update_ts_dict[ proxy_item.proxy_id ] = proxy_item.update_ts # å¡åå» proxies = proxy_item.get_proxies() self.put_proxy_item(proxy_item) if is_valid == 1: if proxy_item.use_interval: proxy_item.use_ts = time.time() return proxies else: # å¤ç失æä»£ç self.proxy_dict.pop(proxy_item.proxy_id, "") self.invalid_proxy_dict[ proxy_item.proxy_id ] = datetime.datetime.now() else: try: self.reset_proxy_pool() except Exception as e: self.logger.exception(e) if self.no_valid_proxy_times >= 5: # è§£å³bug: å½ç¬è«ä» å©ä¸ä¸ªä»»å¡æ¶ ç±äºåªæä¸ä¸ªçº¿ç¨æ£æµä»£ç èä¸å¯ç¨ä»£çåå好å¾å¤ï¼æ¶é´è¶é¿è¶å¤ï¼ å¯è½åºç°ä¸ç´è·åä¸å°ä»£ççæ åµ # 导è´ç¬è«çå°¾ try: self.reset_proxy_pool() except Exception as e: self.logger.exception(e) return self.get(retry) get_proxy = get def get_random_proxy(self) -> ProxyItem: """ éæºè·å代ç :return: """ if self.proxy_queue is not None: if random.random() < 0.5: # ä¸åæ¦çæ£æ¥ è¿æ¯ä¸ªé«é¢æä½ ä¼åä¸ä¸ if time.time() - self.last_reset_time > self.reset_interval_max: self.reset_proxy_pool(force=True) else: min_q_size = ( min(self.max_queue_size / 2, self.real_max_proxy_count / 2) if self.max_queue_size > 0 else self.real_max_proxy_count / 2 ) if self.proxy_queue.qsize() < min_q_size: self.reset_proxy_pool() try: return self.proxy_queue.get_nowait() except Exception: pass return None def append_proxies(self, proxies_list: list) -> int: """ æ·»å 代çå°ä»£çæ± :param proxies_list: :return: """ count = 0 if not isinstance(proxies_list, list): proxies_list = [proxies_list] for proxies in proxies_list: if proxies: proxy_item = ProxyItem(proxies=proxies, **self.kwargs) # å¢å 失æå¤æ 2018/12/18 if proxy_item.proxy_id in self.invalid_proxy_dict: continue if proxy_item.proxy_id not in self.proxy_dict: # è¡¥å update_ts if not proxy_item.update_ts: proxy_item.update_ts = self.proxy_item_update_ts_dict.get( proxy_item.proxy_id, 0 ) self.put_proxy_item(proxy_item) self.proxy_dict[proxy_item.proxy_id] = proxy_item count += 1 return count def put_proxy_item(self, proxy_item: ProxyItem): """ æ·»å ProxyItem å°ä»£çæ± :param proxy_item: :return: """ return self.proxy_queue.put_nowait(proxy_item) def reset_proxy_pool(self, force: bool = False): """ éç½®ä»£çæ± :param force: æ¯å¦å¼ºå¶éç½®ä»£çæ± :return: """ if not self.reset_lock: # å¿ é¡»ç¨æ¶è°ç¨ å¦å å¯è½åå¨ gevent patchå threading就已ç»è¢«å¯¼å ¥ 导è´çRlock patch失æ import threading self.reset_lock = threading.RLock() with self.reset_lock: if ( force or self.proxy_queue is None or ( self.max_queue_size > 0 and self.proxy_queue.qsize() < self.max_queue_size / 2 ) or ( self.max_queue_size < 0 and self.proxy_queue.qsize() < self.real_max_proxy_count / 2 ) or self.no_valid_proxy_times >= 5 ): if time.time() - self.last_reset_time < self.reset_interval: self.reset_fast_count += 1 if self.reset_fast_count % 10 == 0: self.logger.debug( "ä»£çæ± éç½®ç太快äº:) {}".format(self.reset_fast_count) ) time.sleep(1) else: self.clear() if self.proxy_queue is None: import queue self.proxy_queue = queue.Queue() # TODO è¿éè·åå°çå¯è½éå¤ proxies_list = get_proxy_from_url(**self.kwargs) self.real_max_proxy_count = len(proxies_list) if 0 < self.max_queue_size < self.real_max_proxy_count: proxies_list = random.sample(proxies_list, self.max_queue_size) _valid_count = self.append_proxies(proxies_list) self.last_reset_time = time.time() self.no_valid_proxy_times = 0 self.logger.debug( "éç½®ä»£çæ± æå: è·å{}, æåæ·»å {}, 失æ{}, å½åä»£çæ°{},".format( len(proxies_list), _valid_count, len(self.invalid_proxy_dict), len(self.proxy_dict), ) ) return def tag_proxy(self, proxies_list: list, flag: int, *, delay=30) -> bool: """ 对代çè¿è¡æ è®° :param proxies_list: :param flag: -1 åºå¼ 1 å»¶è¿ä½¿ç¨ :param delay: å»¶è¿æ¶é´ :return: """ if int(flag) not in ProxyItem.proxy_tag_list or not proxies_list: return False if not isinstance(proxies_list, list): proxies_list = [proxies_list] for proxies in proxies_list: if not proxies: continue proxy_id = ProxyItem(proxies).proxy_id if proxy_id not in self.proxy_dict: continue self.proxy_dict[proxy_id].flag = flag self.proxy_dict[proxy_id].flag_ts = time.time() self.proxy_dict[proxy_id].delay = delay return True def get_proxy_item(self, proxy_id="", proxies=None): """ è·å代ç对象 :param proxy_id: :param proxies: :return: """ if proxy_id: return self.proxy_dict.get(proxy_id) if proxies: proxy_id = ProxyItem(proxies).proxy_id return self.proxy_dict.get(proxy_id) return def copy(self): return ProxyPool(**self.kwargs) def all(self) -> list: """ è·åå½åä»£çæ± ä¸çå ¨é¨ä»£ç :return: """ return get_proxy_from_url(**self.kwargs)