-
-
Notifications
You must be signed in to change notification settings - Fork 23
Expand file tree
/
Copy pathextract.py
More file actions
297 lines (249 loc) · 10.3 KB
/
extract.py
File metadata and controls
297 lines (249 loc) · 10.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# ScanCode is a trademark of nexB Inc.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/nexB/extractcode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#
import logging
import traceback
from collections import namedtuple
from functools import partial
from os.path import abspath
from os.path import expanduser
from os.path import join
from commoncode import fileutils
from commoncode import ignore
import extractcode # NOQA
import extractcode.archive
logger = logging.getLogger(__name__)
TRACE = False
if TRACE:
import sys
logging.basicConfig(stream=sys.stdout)
logger.setLevel(logging.DEBUG)
"""
Extract archives and compressed files recursively to get the file content
available for further processing. This the high level extraction entry point.
This is NOT a general purpose un-archiver. The code tries hard to do the right
thing, BUT the extracted files are not meant to be something that can be
faithfully re-archived to get an equivalent archive. The purpose instead is to
extract the content of the archives as faithfully and safely as possible to make
this content available for scanning: some paths may be altered. Some files may
be altered or skipped entirely.
In particular:
- Permissions and owners stored in archives are ignored entirely: The extracted
content is always owned and readable by the user who ran the extraction.
- Special files are never extracted (such as FIFO, character devices, etc)
- Symlinks may be replaced by plain file copies as if they were regular files.
Hardlinks may be recreated as regular files, not as hardlinks to the original
files.
- Files and directories may be renamed when their name is a duplicate. And a
name may be considered a duplicate ignore upper and lower case mixes even
on case-sensitive file systems. In particular when an archive contains the
same file path several times, every paths will be extracted with different
files names, even though using a regular tool for extraction would have
overwritten previous paths with the last path.
- Paths may be converted to a safe ASCII alternative that is portable across
OSes.
- Symlinks, relative paths and absolute paths pointing outside of the archive
are replaced and renamed in such a way that all the extract content of an
archive exist under a single target extraction directory. This process
includes eventually creating "synthetic" or dummy paths that did not exist in
the original archive.
"""
"""
An ExtractEvent contains data about an archive extraction progress:
- `source` is the location of the archive being extracted
- `target` is the target location where things are extracted
- `done` is a boolean set to True when the extraction is done (even if failed).
- `warnings` is a mapping of extracted paths to a list of warning messages.
- `errors` is a list of error messages.
"""
ExtractEvent = namedtuple('ExtractEvent', 'source target done warnings errors')
def extract(
location,
kinds=extractcode.default_kinds,
recurse=False,
replace_originals=False,
ignore_pattern=(),
):
"""
Walk and extract any archives found at ``location`` (either a file or
directory). Extract only archives of a kind listed in the ``kinds`` kind
tuple.
Return an iterable of ExtractEvent for each extracted archive. This can be
used to track extraction progress:
- one event is emitted just before extracting an archive. The ExtractEvent
warnings and errors are empty. The "done" flag is False.
- one event is emitted right after extracting an archive. The ExtractEvent
warnings and errors contains warnings and errors if any. The "done" flag
is True.
If ``recurse`` is True, extract recursively archives nested inside other
archives. If ``recurse`` is false, then do not extract further an already
extracted archive identified by the corresponding extract suffix location.
If ``replace_originals`` is True, the extracted archives are replaced by the
extracted content, only if the extraction was successful.
``ignore_pattern`` is a list of glob patterns to ignore.
Note that while the original filesystem is walked top-down, breadth-first,
if ``recurse`` and a nested archive is found, it is extracted first
recursively and at full depth-first before resuming the filesystem walk.
"""
extract_events = extract_files(
location=location,
kinds=kinds,
recurse=recurse,
ignore_pattern=ignore_pattern,
)
processed_events = []
processed_events_append = processed_events.append
for event in extract_events:
yield event
if replace_originals:
processed_events_append(event)
# move files around when done, unless there are errors
if replace_originals:
for xevent in reversed(processed_events):
if xevent.done and not (event.warnings or event.errors):
source = xevent.source
target = xevent.target
if TRACE:
logger.debug(
f'extract:replace_originals: replacing '
f'{source!r} by {target!r}'
)
fileutils.delete(source)
fileutils.copytree(target, source)
fileutils.delete(target)
def extract_files(
location,
kinds=extractcode.default_kinds,
recurse=False,
ignore_pattern=(),
):
"""
Extract the files found at `location`.
Extract only archives of a kind listed in the `kinds` kind tuple.
If `recurse` is True, extract recursively archives nested inside other
archives.
If `recurse` is false, then do not extract further an already
extracted archive identified by the corresponding extract suffix location.
``ignore_pattern`` is a list of glob patterns to ignore.
"""
ignored = partial(ignore.is_ignored, ignores=ignore.default_ignores, unignores={})
if TRACE:
logger.debug('extract:start: %(location)r recurse: %(recurse)r\n' % locals())
abs_location = abspath(expanduser(location))
for top, dirs, files in fileutils.walk(abs_location, ignored):
if TRACE:
logger.debug(
'extract:walk: top: %(top)r dirs: %(dirs)r files: r(files)r' % locals())
if not recurse:
if TRACE:
drs = set(dirs)
for d in dirs[:]:
if extractcode.is_extraction_path(d):
dirs.remove(d)
if TRACE:
rd = repr(drs.symmetric_difference(set(dirs)))
logger.debug(f'extract:walk: not recurse: removed dirs: {rd}')
for f in files:
loc = join(top, f)
if not recurse and extractcode.is_extraction_path(loc):
if TRACE:
logger.debug(
'extract:walk not recurse: skipped file: %(loc)r' % locals())
continue
if not extractcode.archive.should_extract(
location=loc,
kinds=kinds,
ignore_pattern=ignore_pattern
):
if TRACE:
logger.debug(
'extract:walk: skipped file: not should_extract: %(loc)r' % locals())
continue
target = join(abspath(top), extractcode.get_extraction_path(loc))
if TRACE:
logger.debug('extract:target: %(target)r' % locals())
# extract proper
for xevent in extract_file(
location=loc,
target=target,
kinds=kinds,
):
if TRACE:
logger.debug('extract:walk:extraction event: %(xevent)r' % locals())
yield xevent
if recurse:
if TRACE:
logger.debug('extract:walk: recursing on target: %(target)r' % locals())
for xevent in extract(
location=target,
kinds=kinds,
recurse=recurse,
ignore_pattern=ignore_pattern,
):
if TRACE:
logger.debug('extract:walk:recurse:extraction event: %(xevent)r' % locals())
yield xevent
def extract_file(
location,
target,
kinds=extractcode.default_kinds,
verbose=False,
*args,
**kwargs,
):
"""
Extract a single archive file at ``location`` to the ``target`` directory if
this file is of a kind supported in the ``kinds`` kind tuple. Yield
ExtractEvents. Does not extract recursively.
"""
warnings = []
errors = []
extractor = extractcode.archive.get_extractor(
location=location,
kinds=kinds,
)
if TRACE:
emodule = getattr(extractor, '__module__', '')
ename = getattr(extractor, '__name__', '')
logger.debug(
f'extract_file: extractor: for: {location} with kinds: '
f'{kinds}: {emodule}.{ename}'
)
if extractor:
yield ExtractEvent(
source=location,
target=target,
done=False,
warnings=[],
errors=[],
)
try:
# Extract first to a temp directory: if there is an error, the
# extracted files will not be moved to the target.
tmp_tgt = fileutils.get_temp_dir(prefix='extractcode-extract-')
abs_location = abspath(expanduser(location))
warns = extractor(abs_location, tmp_tgt) or []
warnings.extend(warns)
fileutils.copytree(tmp_tgt, target)
fileutils.delete(tmp_tgt)
except Exception as e:
errors = [str(e).strip(' \'"')]
if verbose:
errors.append(traceback.format_exc())
if TRACE:
tb = traceback.format_exc()
logger.debug(
f'extract_file: ERROR: {location}: {errors}\n{e}\n{tb}')
finally:
yield ExtractEvent(
source=location,
target=target,
done=True,
warnings=warnings,
errors=errors,
)