import datetime
import gzip
import os
import re
import shutil
import socket
import time
import warnings
import http.client as http_client
import urllib.error as urllib_error
import urllib.request as urllib_request
from logging import getLogger
from http.client import InvalidURL
from typing import Optional, List, Union
logger = getLogger("sphinx-runpython")
class ReadUrlException(InvalidURL):
pass
class InternetException(IOError):
pass
class FileException(OSError):
pass
MONTH_DATE = {
"jan": 1,
"feb": 2,
"mar": 3,
"apr": 4,
"may": 5,
"jun": 6,
"jul": 7,
"aug": 8,
"sep": 9,
"oct": 10,
"nov": 11,
"dec": 12,
}
def _first_more_recent(f1: str, path: str) -> bool:
"""
Checks if the first file (opened url)
is more recent of the second file (path).
:param f1: opened url
:param path: path name
:return: True if more recent
"""
s = str(f1.info())
da = re.compile("Last[-]Modified: (.+) GMT").search(s)
if da is None:
return True
else: # pragma: no cover
da = da.groups()[0]
gr = re.compile(
"[\\w, ]* ([ \\d]{2}) ([\\w]{3}) ([\\d]{4}) "
"([\\d]{2}):([\\d]{2}):([\\d]{2})"
).search(da)
if gr is None:
return True
gr = gr.groups()
dau = datetime.datetime(
int(gr[2]),
MONTH_DATE[gr[1].lower()],
int(gr[0]),
int(gr[3]),
int(gr[4]),
int(gr[5]),
)
p = time.ctime(os.path.getmtime(path))
gr = re.compile(
"[\\w, ]* ([\\w]{3}) ([ \\d]{2}) ([\\d]{2}):([\\d]{2}):"
"([\\d]{2}) ([\\d]{4})"
).search(p)
if gr is None:
return True
gr = gr.groups()
da = datetime.datetime(
int(gr[5]),
MONTH_DATE[gr[0].lower()],
int(gr[1]),
int(gr[2]),
int(gr[3]),
int(gr[4]),
)
file = da
return dau > file
def _get_file_url(url: str, path: Optional[str]) -> str:
"""
Builds a filename knowing an url.
:param url: url
:param path: where to download the file
:return: filename
"""
path = (
path
+ "/"
+ url.replace("/", "!")
.replace(":", "")
.replace(".", "-")
.replace("=", "_")
.replace("?", "_")
)
spl = path.split("-")
if len(spl) >= 2:
ext = spl[len(spl) - 1].lower()
if 2 <= len(ext) <= 3 and ext in {
"c",
"cc",
"cpp",
"cu",
"gif",
"gz",
"h",
"hpp",
"html",
"png",
"jpeg",
"jpg",
"pdf",
"py",
"tif",
"txt",
"zip",
}:
spl = path.split("-")
spl = spl[: len(spl) - 1]
path = "-".join(spl) + "." + ext
return path
def read_url(url: str, encoding: Optional[str] = None) -> Union[bytes, str]:
"""
Reads the content of a url.
:param url: url
:param encoding: if None, the result type is bytes, str otherwise
:return: str (encoding is not None) or bytes
"""
request = urllib_request.Request(url)
try:
with urllib_request.urlopen(request) as fu:
content = fu.read()
except Exception as e:
import urllib.parse as urlparse
res = urlparse.urlparse(url)
raise ReadUrlException(f"unable to open url '{url}' scheme: {res}\nexc: {e}")
if encoding is None:
return content
else:
return content.decode(encoding=encoding)
def download(url: str, path_download: str = ".", outfile: Optional[str] = None) -> str:
"""
Downloads a small file.
If *url* is an url, it downloads the file and returns the downloaded filename.
If it has already been downloaded, it is not downloaded again
The function raises an exception if the url does not contain
``http://`` or ``https://`` or ``ftp://``.
:param url: url
:param path_download: download the file here
:param outfile: see below
:return: the filename
If *outfile* is None, the function will give a relative name
based on the last part of the url.
If *outfile* is "", the function will remove every weird character.
If *outfile* is not null, the function will use it. It will be relative to
the current folder and not *path_download*.
"""
lurl = url.lower()
if lurl.startswith("file://"):
if outfile is None:
last = os.path.split(url)[-1]
if last.startswith("__cached__"):
last = last[len("__cached__") :]
dest = os.path.join(path_download, last)
elif outfile == "":
dest = _get_file_url(url, path_download)
else:
dest = outfile
shutil.copy(url[7:], dest)
return dest
if "http://" in lurl or "https://" in lurl or "ftp://" in lurl:
if outfile is None:
dest = os.path.join(path_download, os.path.split(url)[-1])
elif outfile == "":
dest = _get_file_url(url, path_download)
else:
dest = outfile
down = False
nyet = dest + ".notyet"
if os.path.exists(dest) and not os.path.exists(nyet):
try:
f1 = urllib_request.urlopen(url)
down = _first_more_recent(f1, dest)
newdate = down
f1.close()
except urllib_error.HTTPError as e:
raise ReadUrlException(f"Unable to fetch '{url}'") from e
except IOError as e:
raise ReadUrlException(f"Unable to download '{url}'") from e
else:
down = True
newdate = False
if down:
if newdate:
logger.info("[download] downloading (updated) %r", url)
else:
logger.info("[download] downloading %r", url)
if len(url) > 4 and url[-4].lower() in [
".txt",
".csv",
".tsv",
".log",
".tmpl",
]:
logger.info("creating text file %r", dest)
format = "w"
else:
logger.info("creating binary file %r", dest)
format = "wb"
if os.path.exists(nyet):
size = os.stat(dest).st_size
logger.info(
"[download] resume downloading (stop at %d) from %r", size, url
)
try:
request = urllib_request.Request(url)
request.add_header("Range", "bytes=%d-" % size)
fu = urllib_request.urlopen(request)
except urllib_error.HTTPError as e:
raise ReadUrlException(f"Unable to fetch '{url}'") from e
f = open(
dest, format.replace("w", "a") # pylint: disable=W1501
) # pylint: disable=W1501
else:
logger.info("[download] downloading %r", url)
try:
request = urllib_request.Request(url)
fu = urllib_request.urlopen(url)
except urllib_error.HTTPError as e:
raise ReadUrlException(f"Unable to fetch '{url}'") from e
f = open(dest, format)
open(nyet, "w").close()
c = fu.read(2**21)
size = 0
while len(c) > 0:
size += len(c)
logger.info("[download] size %d", size)
f.write(c)
f.flush()
c = fu.read(2**21)
logger.info("end downloading")
f.close()
fu.close()
os.remove(nyet)
url = dest
return url
else:
raise FileException(f"This url does not seem to be one {url!r}.")
def get_url_content_timeout(
url: str,
timeout: int = 10,
output: Optional[str] = None,
encoding: str = "utf-8",
raise_exception: bool = True,
chunk: Optional[int] = None,
) -> Union[bytes, str]:
"""
Downloads a file from internet (by default, it assumes
it is text information, otherwise, encoding should be None).
:param url: url
:param timeout: in seconds, after this time,
the function drops an returns None, -1 for forever
:param output: if None, the content is stored in that file
:param encoding: utf-8 by default, but if it is None,
the returned information is binary
:param raise_exception: True to raise an exception, False to send a warnings
:param chunk: save data every chunk (only if output is not None)
:return: content of the url
If the function automatically detects that the downloaded data is in gzip
format, it will decompress it.
The function raises the exception @see cl InternetException.
"""
def save_content(content, append=False):
"local function"
app = "a" if append else "w"
if encoding is not None:
with open(output, app, encoding=encoding) as f:
f.write(content)
else:
with open(output, app + "b") as f:
f.write(content)
try:
if chunk is not None:
if output is None:
raise ValueError("output cannot be None if chunk is not None")
app = [False]
size = [0]
def _local_loop(ur):
while True:
res = ur.read(chunk)
size[0] += len(res) # pylint: disable=E1137
if logger.info is not None:
logger.info(
"[get_url_content_timeout] downloaded %d bytes", size
)
if len(res) > 0:
if encoding is not None:
res = res.decode(encoding=encoding)
save_content(res, app)
else:
break
app[0] = True # pylint: disable=E1137
if timeout != -1:
with urllib_request.urlopen(url, timeout=timeout) as ur:
_local_loop(ur)
else:
with urllib_request.urlopen(url) as ur:
_local_loop(ur)
app = app[0]
size = size[0]
else:
if timeout != -1:
with urllib_request.urlopen(url, timeout=timeout) as ur:
res = ur.read()
else:
with urllib_request.urlopen(url) as ur:
res = ur.read()
except (
urllib_error.HTTPError,
urllib_error.URLError,
ConnectionRefusedError,
socket.timeout,
ConnectionResetError,
http_client.BadStatusLine,
http_client.IncompleteRead,
ValueError,
InvalidURL,
) as e:
if raise_exception:
raise InternetException(f"Unable to retrieve content url='{url}'") from e
warnings.warn(
f"Unable to retrieve content from '{url}' because of {e}", ResourceWarning
)
return None
except Exception as e:
if raise_exception: # pragma: no cover
raise InternetException(
f"Unable to retrieve content, url='{url}', exc={e}"
) from e
warnings.warn(
f"Unable to retrieve content from '{url}' "
f"because of unknown exception: {e}",
ResourceWarning,
)
raise e
if chunk is None:
if len(res) >= 2 and res[:2] == b"\x1f\x8B":
# gzip format
res = gzip.decompress(res)
if encoding is not None:
try:
content = res.decode(encoding)
except UnicodeDecodeError as e: # pragma: no cover
# it tries different encoding
laste = [e]
othenc = ["iso-8859-1", "latin-1"]
for encode in othenc:
try:
content = res.decode(encode)
break
except UnicodeDecodeError as ee:
laste.append(ee)
content = None
if content is None:
mes = [f"Unable to parse text from '{url}'."]
mes.append("tried:" + str([encoding] + othenc))
mes.append("beginning:\n" + str([res])[:50])
for e in laste:
mes.append("Exception: " + str(e))
raise ValueError("\n".join(mes))
else:
content = res
else:
content = None
if output is not None and chunk is None:
save_content(content)
return content
def download_requirejs(
to: str = ".",
location: str = "http://requirejs.org/docs/download.html",
clean: bool = True,
) -> List[str]:
"""
Downloads `require.js