See More

# -*- coding: utf-8 -*- """ Created on 2018-09-06 14:21 --------- @summary: 工具 --------- @author: Boris @email: [email protected] """ import asyncio import calendar import codecs import configparser # 读配置文件的 import datetime import functools import hashlib import html import json import os import pickle import random import re import socket import ssl import string import sys import time import traceback import urllib import urllib.parse import uuid import weakref from functools import partial, wraps from hashlib import md5 from pprint import pformat from pprint import pprint from urllib import request from urllib.parse import urljoin import execjs # pip install PyExecJS import redis import requests import six from requests.cookies import RequestsCookieJar from w3lib.url import canonicalize_url as _canonicalize_url import feapder.setting as setting from feapder.db.redisdb import RedisDB from feapder.utils.email_sender import EmailSender from feapder.utils.log import log os.environ["EXECJS_RUNTIME"] = "Node" # 设置使用node执行js # 全局取消ssl证书验证 ssl._create_default_https_context = ssl._create_unverified_context TIME_OUT = 30 TIMER_TIME = 5 redisdb = None def get_redisdb(): global redisdb if not redisdb: redisdb = RedisDB() return redisdb # 装饰器 class Singleton(object): def __init__(self, cls): self._cls = cls self._instance = {} def __call__(self, *args, **kwargs): if self._cls not in self._instance: self._instance[self._cls] = self._cls(*args, **kwargs) return self._instance[self._cls] def log_function_time(func): try: @functools.wraps(func) # 将函数的原来属性付给新函数 def calculate_time(*args, **kw): began_time = time.time() callfunc = func(*args, **kw) end_time = time.time() log.debug(func.__name__ + " run time = " + str(end_time - began_time)) return callfunc return calculate_time except: log.debug("求取时间无效 因为函数参数不符") return func def run_safe_model(module_name): def inner_run_safe_model(func): try: @functools.wraps(func) # 将函数的原来属性付给新函数 def run_func(*args, **kw): callfunc = None try: callfunc = func(*args, **kw) except Exception as e: log.error(module_name + ": " + func.__name__ + " - " + str(e)) traceback.print_exc() return callfunc return run_func except Exception as e: log.error(module_name + ": " + func.__name__ + " - " + str(e)) traceback.print_exc() return func return inner_run_safe_model def memoizemethod_noargs(method): """Decorator to cache the result of a method (without arguments) using a weak reference to its object """ cache = weakref.WeakKeyDictionary() @functools.wraps(method) def new_method(self, *args, **kwargs): if self not in cache: cache[self] = method(self, *args, **kwargs) return cache[self] return new_method ########################【网页解析相关】############################### # @log_function_time def get_html_by_requests( url, headers=None, code="utf-8", data=None, proxies={}, with_response=False ): html = "" r = None try: if data: r = requests.post( url, headers=headers, timeout=TIME_OUT, data=data, proxies=proxies ) else: r = requests.get(url, headers=headers, timeout=TIME_OUT, proxies=proxies) if code: r.encoding = code html = r.text except Exception as e: log.error(e) finally: r and r.close() if with_response: return html, r else: return html def get_json_by_requests( url, params=None, headers=None, data=None, proxies={}, with_response=False, cookies=None, ): json = {} response = None try: # response = requests.get(url, params = params) if data: response = requests.post( url, headers=headers, data=data, params=params, timeout=TIME_OUT, proxies=proxies, cookies=cookies, ) else: response = requests.get( url, headers=headers, params=params, timeout=TIME_OUT, proxies=proxies, cookies=cookies, ) response.encoding = "utf-8" json = response.json() except Exception as e: log.error(e) finally: response and response.close() if with_response: return json, response else: return json def get_cookies(response): cookies = requests.utils.dict_from_cookiejar(response.cookies) return cookies def get_cookies_from_str(cookie_str): """ >>> get_cookies_from_str("key=value; key2=value2; key3=; key4=; ") {'key': 'value', 'key2': 'value2', 'key3': '', 'key4': ''} Args: cookie_str: key=value; key2=value2; key3=; key4= Returns: """ cookies = {} for cookie in cookie_str.split(";"): cookie = cookie.strip() if not cookie: continue key, value = cookie.split("=", 1) key = key.strip() value = value.strip() cookies[key] = value return cookies def get_cookies_jar(cookies): """ @summary: 适用于selenium生成的cookies转requestsçš„cookies requests.get(xxx, cookies=jar) 参考:https://www.cnblogs.com/small-bud/p/9064674.html --------- @param cookies: [{},{}] --------- @result: cookie jar """ cookie_jar = RequestsCookieJar() for cookie in cookies: cookie_jar.set(cookie["name"], cookie["value"]) return cookie_jar def get_cookies_from_selenium_cookie(cookies): """ @summary: 适用于selenium生成的cookies转requestsçš„cookies requests.get(xxx, cookies=jar) 参考:https://www.cnblogs.com/small-bud/p/9064674.html --------- @param cookies: [{},{}] --------- @result: cookie jar """ cookie_dict = {} for cookie in cookies: if cookie.get("name"): cookie_dict[cookie["name"]] = cookie["value"] return cookie_dict def cookiesjar2str(cookies): str_cookie = "" for k, v in requests.utils.dict_from_cookiejar(cookies).items(): str_cookie += k str_cookie += "=" str_cookie += v str_cookie += "; " return str_cookie def cookies2str(cookies): str_cookie = "" for k, v in cookies.items(): str_cookie += k str_cookie += "=" str_cookie += v str_cookie += "; " return str_cookie def get_urls( html, stop_urls=( "javascript", "+", ".css", ".js", ".rar", ".xls", ".exe", ".apk", ".doc", ".jpg", ".png", ".flv", ".mp4", ), ): # 不匹配javascript、 +、 # 这样的url regex = r' 0: # print(regex) break if fetch_one: infos = infos if infos else ("",) return infos if len(infos) > 1 else infos[0] else: infos = allow_repeat and infos or sorted(set(infos), key=infos.index) infos = split.join(infos) if split else infos return infos def table_json(table, save_one_blank=True): """ 将表格转为json 适应于 key:value 在一行类的表格 @param table: 使用selector封装后的具有xpathçš„selector @param save_one_blank: 保留一个空白符 @return: """ data = {} trs = table.xpath(".//tr") for tr in trs: tds = tr.xpath("./td|./th") for i in range(0, len(tds), 2): if i + 1 > len(tds) - 1: break key = tds[i].xpath("string(.)").extract_first(default="").strip() value = tds[i + 1].xpath("string(.)").extract_first(default="").strip() value = replace_str(value, "[\f\n\r\t\v]", "") value = replace_str(value, " +", " " if save_one_blank else "") if key: data[key] = value return data def get_table_row_data(table): """ 获取表格里每一行数据 @param table: 使用selector封装后的具有xpathçš„selector @return: [[],[]..] """ datas = [] rows = table.xpath(".//tr") for row in rows: cols = row.xpath("./td|./th") row_datas = [] for col in cols: data = col.xpath("string(.)").extract_first(default="").strip() row_datas.append(data) datas.append(row_datas) return datas def rows2json(rows, keys=None): """ 将行数据转为json @param rows: 每一行的数据 @param keys: jsonçš„key,空时将rows的第一行作为key @return: """ data_start_pos = 0 if keys else 1 datas = [] keys = keys or rows[0] for values in rows[data_start_pos:]: datas.append(dict(zip(keys, values))) return datas def get_form_data(form): """ 提取form中提交的数据 :param form: 使用selector封装后的具有xpathçš„selector :return: """ data = {} inputs = form.xpath(".//input") for input in inputs: name = input.xpath("./@name").extract_first() value = input.xpath("./@value").extract_first() if name: data[name] = value return data # mac上不好使 # def get_domain(url): # domain = '' # try: # domain = get_tld(url) # except Exception as e: # log.debug(e) # return domain def get_domain(url): proto, rest = urllib.parse.splittype(url) domain, rest = urllib.parse.splithost(rest) return domain def get_index_url(url): return "/".join(url.split("/")[:3]) def get_ip(domain): ip = socket.getaddrinfo(domain, "http")[0][4][0] return ip def get_localhost_ip(): """ 利用 UDP 协议来实现的,生成一个UDP包,把自己的 IP 放如到 UDP 协议头中,然后从UDP包中获取本机的IP。 这个方法并不会真实的向外部发包,所以用抓包工具是看不到的 :return: """ s = None try: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(("8.8.8.8", 80)) ip = s.getsockname()[0] finally: if s: s.close() return ip def ip_to_num(ip): import struct ip_num = socket.ntohl(struct.unpack("I", socket.inet_aton(str(ip)))[0]) return ip_num def is_valid_proxy(proxy, check_url=None): """ 检验代理是否有效 @param proxy: xxx.xxx.xxx:xxx @param check_url: 利用目标网站检查,目标网站url。默认为None, 使用代理服务器的socket检查, 但不能排除Connection closed by foreign host @return: True / False """ is_valid = False if check_url: proxies = {"http": f"http://{proxy}", "https": f"https://{proxy}"} headers = { "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/69.0.3497.100 Safari/537.36" } response = None try: response = requests.get( check_url, headers=headers, proxies=proxies, stream=True, timeout=20 ) is_valid = True except Exception as e: log.error("check proxy failed: {} {}".format(e, proxy)) finally: if response: response.close() else: ip, port = proxy.split(":") with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sk: sk.settimeout(7) try: sk.connect((ip, int(port))) # 检查代理服务器是否开着 is_valid = True except Exception as e: log.error("check proxy failed: {} {}:{}".format(e, ip, port)) return is_valid def is_valid_url(url): """ 验证url是否合法 :param url: :return: """ if re.match(r"(^https?:/{2}\w.+$)|(ftp://)", url): return True else: return False def get_text(soup, *args): try: return soup.get_text() except Exception as e: log.error(e) return "" def del_html_tag(content, except_line_break=False, save_img=False, white_replaced=""): """ 删除html标签 @param content: html内容 @param except_line_break: 保留p标签 @param save_img: 保留图片 @param white_replaced: 空白符替换 @return: """ content = replace_str(content, "(?i)

", "/p") content = replace_str(content, "<[^p].*?>") content = content.replace("/p", "") content = replace_str(content, "[ \f\r\t\v]") elif save_img: content = replace_str(content, "(?!)<.+?>") # 替换掉除图片外的其他标签 content = replace_str(content, "(?! +)\s+", "\n") # 保留空格 content = content.strip() else: content = replace_str(content, "<(.|\n)*?>") content = replace_str(content, "\s", white_replaced) content = content.strip() return content def del_html_js_css(content): content = replace_str(content, "(?i)