forked from aboutcode-org/commoncode
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathresource.py
More file actions
2110 lines (1751 loc) · 72.2 KB
/
resource.py
File metadata and controls
2110 lines (1751 loc) · 72.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#
# Copyright (c) nexB Inc. and others. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
# See http://www.apache.org/licenses/LICENSE-2.0 for the license text.
# See https://github.com/nexB/commoncode for support or download.
# See https://aboutcode.org for more information about nexB OSS projects.
#
import json
import os
import sys
import traceback
from collections import deque
from functools import partial
from hashlib import md5
from operator import itemgetter
from os import walk as os_walk
from os.path import abspath
from os.path import exists
from os.path import expanduser
from os.path import isfile
from os.path import join
from os.path import normpath
from posixpath import join as posixpath_join
from posixpath import normpath as posixpath_normpath
from posixpath import dirname as posixpath_parent
import attr
try:
from scancode_config import scancode_temp_dir as temp_dir
except ImportError:
# alway have something there.
import tempfile
temp_dir = tempfile.mkdtemp(prefix='scancode-resource-cache')
from commoncode import ignore
from commoncode.datautils import List
from commoncode.datautils import Mapping
from commoncode.datautils import String
from commoncode.filetype import is_file as filetype_is_file
from commoncode.filetype import is_special
from commoncode.fileutils import as_posixpath
from commoncode.fileutils import create_dir
from commoncode.fileutils import delete
from commoncode.fileutils import file_name
from commoncode.fileutils import parent_directory
from commoncode.fileutils import splitext_name
"""
This module provides Codebase and Resource objects as an abstraction for files
and directories used throughout ScanCode. ScanCode deals with a lot of these as
they are the basic unit of processing.
A Codebase is a tree of Resource. A Resource represents a file or directory and
holds essential file information as attributes. At runtime, scan data is added
as attributes to a Resource. Resource are kept in memory or saved on disk.
This module handles all the details of walking files, path handling and caching.
"""
# Tracing flags
TRACE = False
TRACE_DEEP = False
def logger_debug(*args):
pass
if TRACE or TRACE_DEEP:
import logging
logger = logging.getLogger(__name__)
logging.basicConfig(stream=sys.stdout)
logger.setLevel(logging.DEBUG)
def logger_debug(*args):
return logger.debug(' '.join(isinstance(a, str) and a or repr(a) for a in args))
class ResourceNotInCache(Exception):
pass
class UnknownResource(Exception):
pass
def skip_ignored(location):
"""
Return True if ``location`` should be skipped.
Always ignore VCS and some special filetypes.
"""
ignored = partial(ignore.is_ignored, ignores=ignore.ignores_VCS)
if TRACE_DEEP:
logger_debug()
logger_debug(
'Codebase.populate: walk: ignored loc:',
location,
'ignored:',
ignored(location),
'is_special:',
is_special(location),
)
return is_special(location) or ignored(location)
def depth_walk(
root_location,
max_depth,
skip_ignored=skip_ignored,
error_handler=lambda: None,
):
"""
Yield a (top, dirs, files) tuple at each step of walking the ``root_location``
directory recursively up to ``max_depth`` path segments extending from the
``root_location``. The behaviour is similar of ``os.walk``.
Arguments:
- root_location: Absolute, normalized path for the directory to be walked
- max_depth: positive integer for fixed depth limit. 0 for no limit.
- skip_ignored: Callback function that takes a location as argument and
returns a boolean indicating whether to ignore files in that location.
- error_handler: Error handler callback. No action taken by default.
"""
if max_depth < 0:
raise Exception("ERROR: `max_depth` must be a positive integer or 0.")
# Find root directory depth using path separator's count
root_dir_depth = root_location.count(os.path.sep)
for top, dirs, files in os_walk(root_location, topdown=True, onerror=error_handler):
# If depth is limited (non-zero)
if max_depth:
current_depth = top.count(os.path.sep) - root_dir_depth
if skip_ignored(top) or (max_depth and current_depth >= max_depth):
# we clear out `dirs` and `files` to prevent `os_walk` from visiting
# the files and subdirectories of directories we are ignoring or
# are not in the specified nesting level
dirs[:] = []
files[:] = []
continue
yield top, dirs, files
@attr.s(slots=True)
class Header(object):
"""
Represent a Codebase header. Each tool that transforms the codebase
should create a Header and append it to the Codebase.headers list.
"""
tool_name = String(help='Name of the tool used such as scancode-toolkit.')
tool_version = String(default='', help='Tool version used such as v1.2.3.')
options = Mapping(help='Mapping of key/values describing the options used with this tool.')
notice = String(default='', help='Notice text for this tool.')
start_timestamp = String(help='Start timestamp for this header.')
end_timestamp = String(help='End timestamp for this header.')
output_format_version = String(help='Version for the output data format, such as v1.1 .')
duration = String(help='Scan duration in seconds.')
message = String(help='Message text.')
errors = List(help='List of error messages.')
warnings = List(help='List of warning messages.')
extra_data = Mapping(help='Mapping of extra key/values for this tool.')
def to_dict(self):
return attr.asdict(self, dict_factory=dict)
@classmethod
def from_dict(cls, **kwargs):
"""
Return a Header object deserialized from a `kwargs` mapping of
key/values. Unknown attributes are ignored.
"""
known_attributes = set(attr.fields_dict(Header))
kwargs = {k: v for k, v in kwargs.items() if k in known_attributes}
return cls(**kwargs)
def ignore_nothing(resource, codebase):
"""
Return True if `resource` should be ignored.
This function is used as a callable for `ignored` argument in Codebase and
Resource walk.
"""
return False
class Codebase:
"""
Represent a codebase being scanned. A Codebase is a list of Resources.
"""
# we do not really need slots but this is a way to ensure we have tight
# control on object attributes
__slots__ = (
'max_depth',
'location',
'has_single_resource',
'resource_attributes',
'resource_class',
'root',
'is_file',
'temp_dir',
'resources_by_path',
'resources_count',
'paths',
'max_in_memory',
'all_in_memory',
'all_on_disk',
'cache_dir',
'headers',
'current_header',
'codebase_attributes',
'attributes',
'counters',
'timings',
'errors',
)
# the value returned if the resource is cached
CACHED_RESOURCE = 1
def __init__(
self,
location,
resource_attributes=None,
codebase_attributes=None,
temp_dir=temp_dir,
max_in_memory=10000,
max_depth=0,
paths=tuple(),
*args,
**kwargs,
):
"""
Initialize a new codebase rooted at the ``location`` existing file or
directory.
Use an optional list of ``paths`` strings that are paths relative to the
root ``location`` such that joining the root ``location`` and such a
path is the ``location`` of this path. If these ``paths`` are provided,
the codebase will only contain these paths and no other path.
``resource_attributes`` is an ordered mapping of attr Resource attributes
such as plugin-provided attributes: these will be added to a Resource
sub-class crafted for this codebase.
``codebase_attributes`` is an ordered mapping of attr Codebase attributes
such as plugin-provided attributes: these will be added to a
CodebaseAttributes sub-class crafted for this codebase.
``temp_dir`` is the base temporary directory to use to cache resources on
disk and other temporary files.
``max_in_memory`` is the maximum number of Resource instances to keep in
memory. Beyond this number, Resource are saved on disk instead. -1 means
no memory is used and 0 means unlimited memory is used.
``max_depth`` is the maximum depth of subdirectories to descend below and
including `location`.
``paths`` is an optional list of of path strings that extend from the
root ``location``. If provided, the codebase will contain only these
paths.
"""
self.max_depth = max_depth
# Resource sub-class to use: Configured with attributes in _populate
self.resource_class = Resource
self.resource_attributes = resource_attributes or {}
self.codebase_attributes = codebase_attributes or {}
# setup location
########################################################################
location = os.fsdecode(location)
location = abspath(normpath(expanduser(location)))
location = location.rstrip('/\\')
# TODO: what if is_special(location)???
assert exists(location)
self.location = location
self.is_file = filetype_is_file(location)
# True if this codebase root is a file or an empty directory.
self.has_single_resource = bool(self.is_file or not os.listdir(location))
########################################################################
# Set up caching, summary, timing, and error info
self._setup_essentials(temp_dir, max_in_memory)
# finally populate
self.paths = self._prepare_clean_paths(paths)
self._populate()
def _prepare_clean_paths(self, paths=tuple()):
"""
Return a new set of cleaned ``paths`` possibly empty.
We convert to POSIX and ensure we have no slash at both ends.
"""
paths = (clean_path(p) for p in (paths or []) if p)
# we sort by path segments (e.g. essentially a topo sort)
_sorter = lambda p: p.split('/')
return sorted(paths, key=_sorter)
def _setup_essentials(self, temp_dir=temp_dir, max_in_memory=10000):
"""
Set the remaining Codebase attributes
`temp_dir` is the base temporary directory to use to cache resources on
disk and other temporary files.
`max_in_memory` is the maximum number of Resource instances to keep in
memory. Beyond this number, Resource are saved on disk instead. -1 means
no memory is used and 0 means unlimited memory is used.
"""
# setup Resources
########################################################################
# root resource, never cached on disk
self.root = None
# mapping of {path: Resource}. This the key data structure of a Codebase.
# All resources MUST exist there. When cached to disk the value is CACHED_RESOURCE
self.resources_by_path = {}
self.resources_count = 0
# setup caching
########################################################################
# dir used for caching and other temp files
self.temp_dir = temp_dir
# maximum number of Resource objects kept in memory cached in this
# Codebase. When the number of in-memory Resources exceed this number,
# the next Resource instances are saved to disk instead and re-loaded
# from disk when used/needed.
self.max_in_memory = max_in_memory
# use only memory
self.all_in_memory = max_in_memory == 0
# use only disk
self.all_on_disk = max_in_memory == -1
# dir where the on-disk cache is stored
self.cache_dir = None
if not self.all_in_memory:
# this is unique to this codebase instance
self.cache_dir = get_codebase_cache_dir(temp_dir=temp_dir)
# setup extra and misc attributes
########################################################################
# stores a list of Header records for this codebase
self.headers = []
self.current_header = None
# mapping of scan counters at the codebase level such
# as the number of files and directories, etc
self.counters = dict()
# mapping of timings for scan stage as {stage: time in seconds as float}
# This is populated automatically.
self.timings = dict()
# list of error strings from collecting the codebase details (such as
# unreadable file, etc).
self.errors = []
def _get_resource_cache_location(self, path, create_dirs=False):
"""
Return the location where to get/put a Resource in the cache given a
Resource `path`. Create the directories if requested.
"""
if not self.cache_dir:
return
if isinstance(path, Resource):
path = path.path
path = clean_path(path)
# for the cached file name, we use an md5 of the path to avoid things being too long
resid = str(md5(path.encode('utf-8')).hexdigest())
cache_sub_dir, cache_file_name = resid[-2:], resid
parent = join(self.cache_dir, cache_sub_dir)
if create_dirs and not exists(parent):
create_dir(parent)
return join(parent, cache_file_name)
def _collect_codebase_attributes(self, *args, **kwargs):
"""
Return a mapping of CodebaseAttributes fields to use with this Codebase
"""
return self.codebase_attributes
def _build_resource_class(self, *args, **kwargs):
"""
Return a Resource class to use with this Codebase
"""
# Resource sub-class to use. Configured with plugin attributes if present
return attr.make_class(
name='ScannedResource',
attrs=self.resource_attributes or {},
slots=True,
bases=(Resource,),
)
# TODO: add populate progress manager!!!
def _populate(self):
"""
Populate this codebase with Resource objects.
The actual subclass of Resource objects used in this codebase will be
created as a side effect.
Population is done by walking its `location` topdown, breadth-first,
first creating first file then directory Resources both sorted in case-
insensitive name order.
Special files, links and VCS files are ignored.
"""
# Collect headers
##########################################################
self.headers = []
# Collect codebase-level attributes and build a class, then load
##########################################################
# Codebase attributes to use. Configured with plugin attributes if
# present.
self.codebase_attributes = self._collect_codebase_attributes()
cbac = _CodebaseAttributes.from_attributes(attributes=self.codebase_attributes)
self.attributes = cbac()
# Resource sub-class to use. Configured with plugin attributes if present
##########################################################
self.resource_class = self._build_resource_class()
##########################################################
# walk and create resources proper
# Create root first
##########################################################
root = self._create_root_resource()
if TRACE:
logger_debug('Codebase.populate: root:', root)
if self.has_single_resource:
# there is nothing else to do for a single file or a single
# childless directory
return
if self.paths:
return self._create_resources_from_paths(root=root, paths=self.paths)
else:
return self._create_resources_from_root(root=root)
def _create_resources_from_paths(self, root, paths):
# without paths we iterate the provided paths. We report an error
# if a path is missing on disk.
# !!!NOTE: WE DO NOT skip_ignored in this case!!!!!
base_location = parent_directory(root.location)
# track resources parents by path during construction to avoid
# recreating all ancestor directories
parents_by_path = {root.path: root}
for path in paths:
res_loc = join(base_location, path)
if not exists(res_loc):
msg = f'ERROR: cannot populate codebase: path: {path!r} not found in {res_loc!r}'
self.errors.append(msg)
raise Exception(path, join(base_location, path))
continue
# create all parents. The last parent is the one we want to use
parent = root
if TRACE:
logger_debug('Codebase._create_resources_from_paths: parent', parent)
for parent_path in get_ancestor_paths(path, include_self=False):
if TRACE:
logger_debug(
f' Codebase._create_resources_from_paths: parent_path: {parent_path!r}'
)
if not parent_path:
continue
newpar = parents_by_path.get(parent_path)
if TRACE:
logger_debug(' Codebase._create_resources_from_paths: newpar', repr(newpar))
if not newpar:
newpar = self._get_or_create_resource(
name=file_name(parent_path),
parent=parent,
path=parent_path,
is_file=False,
)
if not newpar:
raise Exception(
f'ERROR: Codebase._create_resources_from_paths: cannot create parent for: {parent_path!r}'
)
parent = newpar
parents_by_path[parent_path] = parent
if TRACE:
logger_debug(
f' Codebase._create_resources_from_paths:',
f'created newpar: {newpar!r}',
)
res = self._get_or_create_resource(
name=file_name(path),
parent=parent,
path=path,
is_file=isfile(res_loc),
)
if TRACE:
logger_debug('Codebase._create_resources_from_paths: resource', res)
def _create_resources_from_root(self, root):
# without paths we walks the root location top-down
# track resources parents by location during construction.
# NOTE: this cannot exhaust memory on a large codebase, because we do
# not keep parents already walked and we walk topdown.
parents_by_loc = {root.location: root}
def err(_error):
"""os.walk error handler"""
self.errors.append(
f'ERROR: cannot populate codebase: {_error}\n{traceback.format_exc()}'
)
# Walk over the directory and build the resource tree
for (top, dirs, files) in depth_walk(
root_location=root.location,
max_depth=self.max_depth,
error_handler=err,
):
parent = parents_by_loc.pop(top)
for created in self._create_resources(
parent=parent,
top=top,
dirs=dirs,
files=files,
):
# on the plain, bare FS, files cannot be parents
if not created.is_file:
parents_by_loc[created.location] = created
def _create_resources(self, parent, top, dirs, files, skip_ignored=skip_ignored):
"""
Create and yield ``files`` and ``dirs`` children Resources of a
``parent`` Resource. These are sorted as: directories then files and by
lowercase name, then name.
"""
for names, is_file in [(dirs, False), (files, True)]:
names.sort(key=lambda p: (p.lower(), p))
for name in names:
location = join(top, name)
if skip_ignored(location):
continue
res = self._get_or_create_resource(
name=name,
parent=parent,
is_file=is_file,
)
if TRACE:
logger_debug('Codebase.create_resources:', res)
yield res
def _create_root_resource(self):
"""
Create and return the root Resource of this codebase.
"""
# we cannot recreate a root if it exists!!
if self.root:
raise TypeError('Root resource already exists and cannot be recreated')
location = self.location
name = file_name(location)
# do not strip root for codebase with a single Resource.
path = Resource.build_path(root_location=location, location=location)
if TRACE:
logger_debug(f' Codebase._create_root_resource: {path} is_file: {self.is_file}')
logger_debug()
root = self.resource_class(
name=name,
location=location,
# never cached
cache_location=None,
path=path,
is_root=True,
is_file=self.is_file,
)
self.resources_by_path[path] = root
self.resources_count += 1
self.root = root
return root
def _get_or_create_resource(
self,
name,
parent,
is_file=False,
path=None,
):
"""
Create and return a new codebase Resource with ``path`` and ``location``.
"""
if not parent:
raise TypeError(
f'Cannot create resource without parent: name: {name!r}, path: {path!r}'
)
# If the codebase is virtual, we provide the path
if not path:
path = posixpath_join(parent.path, name)
path = clean_path(path)
existing = self.get_resource(path)
if existing:
if TRACE:
logger_debug(' Codebase._get_or_create_resource: path already exists:', path)
return existing
if self._use_disk_cache_for_resource():
cache_location = self._get_resource_cache_location(path=path, create_dirs=True)
else:
cache_location = None
# NOTE: If the codebase is virtual, then there is no location
parent_location = parent.location
if parent_location:
location = join(parent_location, name)
else:
location = None
if TRACE:
logger_debug(
f' Codebase._get_or_create_resource: with path: {path}\n'
f' name={name}, is_file={is_file}'
)
child = self.resource_class(
name=name,
location=location,
path=path,
cache_location=cache_location,
is_file=is_file,
)
self.resources_count += 1
parent.children_names.append(name)
self.save_resource(parent)
self.save_resource(child)
return child
def get_or_create_current_header(self):
"""
Return the current Header. Create it if it does not exists and store
it in the headers.
"""
if not self.current_header:
self.current_header = Header()
self.headers.append(self.current_header)
return self.current_header
def get_files_count(self):
"""
Return the final files counts for the codebase.
"""
return self.counters.get('final:files_count', 0)
def add_files_count_to_current_header(self):
"""
Add the final files counts for the codebase to the current header.
Return the files_count.
"""
files_count = self.get_files_count()
current_header = self.get_or_create_current_header()
current_header.extra_data['files_count'] = files_count
return files_count
def get_headers(self):
"""
Return a serialized headers composed only of native Python objects
suitable for use in outputs.
"""
return [le.to_dict() for le in (self.headers or [])]
def exists(self, resource):
"""
Return True if the Resource path exists in the codebase.
"""
return resource and resource.path in self.resources_by_path
def _use_disk_cache_for_resource(self):
"""
Return True if Resource ``res`` should be cached on-disk or False if it
should be kept in-memory.
"""
use_disk_cache = False
if self.all_on_disk:
use_disk_cache = True
elif self.all_in_memory:
use_disk_cache = False
else:
# mixed case where some are in memory and some on disk
if self.resources_count < self.max_in_memory:
use_disk_cache = False
else:
use_disk_cache = True
if TRACE:
logger_debug(
f' Codebase._use_disk_cache_for_resource mode: {use_disk_cache} '
f'on_disk: {self.all_on_disk} '
f'in_mem: {self.all_in_memory} '
f'max_in_mem: {self.max_in_memory}'
)
return use_disk_cache
def _exists_in_memory(self, path):
"""
Return True if Resource `path` exists in the codebase memory cache.
"""
path = clean_path(path)
return isinstance(self.resources_by_path.get(path), Resource)
def _exists_on_disk(self, path):
"""
Return True if Resource `path` exists in the codebase disk cache.
"""
path = clean_path(path)
if not self._exists_in_memory(path):
cache_location = self._get_resource_cache_location(path, create_dirs=False)
if cache_location:
return exists(cache_location)
########### FIXME: the PATH SHOULD NOT INCLUDE THE ROOT NAME
def get_resource(self, path):
"""
Return the Resource with `path` or None if it does not exists.
The ``path`` must be relative to the root (and including the root
name as its first segment).
"""
assert isinstance(path, str), f'Invalid path: {path!r} is not a string.'
path = clean_path(path)
if TRACE:
msg = [' Codebase.get_resource:', 'path:', path]
if not path or path not in self.resources_by_path:
msg.append('not in resources!')
else:
msg.extend(['exists on disk:', self._exists_on_disk(path)])
msg.extend(['exists in memo:', self._exists_in_memory(path)])
logger_debug(*msg)
# we use Codebase.CACHED_RESOURCE as a semaphore for existing but only
# on-disk, non-in-memory resource that we need to load from the disk
# cache to differentiate from None which means missing
res = self.resources_by_path.get(path)
if res is Codebase.CACHED_RESOURCE:
res = self._load_resource(path)
elif isinstance(res, Resource):
res = attr.evolve(res)
elif res is None:
pass
else:
# this should never happen
raise Exception(f'get_resource: Internal error when getting {path!r}')
if TRACE:
logger_debug(' Resource:', res)
return res
def save_resource(self, resource):
"""
Save the `resource` Resource to cache (in memory or disk).
"""
if not resource:
return
path = clean_path(resource.path)
if TRACE:
logger_debug(' Codebase.save_resource:', resource)
if resource.is_root:
self.root = resource
self.resources_by_path[path] = resource
elif resource.cache_location:
self._dump_resource(resource)
self.resources_by_path[path] = Codebase.CACHED_RESOURCE
else:
self.resources_by_path[path] = resource
def _dump_resource(self, resource):
"""
Dump a Resource to the disk cache.
"""
cache_location = resource.cache_location
if not cache_location:
raise TypeError(
'Resource cannot be dumped to disk and is used only' f'in memory: {resource}'
)
# TODO: consider messagepack or protobuf for compact/faster processing?
with open(cache_location, 'w') as cached:
cached.write(json.dumps(resource.serialize(), check_circular=False))
# TODO: consider adding a small LRU cache in front of this for perf?
def _load_resource(self, path):
"""
Return a Resource with ``path`` loaded from the disk cache.
"""
path = clean_path(path)
cache_location = self._get_resource_cache_location(path, create_dirs=False)
if TRACE:
logger_debug(
' Codebase._load_resource: exists:',
exists(cache_location),
'cache_location:',
cache_location,
)
if not exists(cache_location):
raise ResourceNotInCache(f'Failed to load Resource: {path} from {cache_location!r}')
# TODO: consider messagepack or protobuf for compact/faster processing
try:
with open(cache_location, 'rb') as cached:
# TODO: Use custom json encoder to encode JSON list as a tuple
# TODO: Consider using simplejson
data = json.load(cached)
return self.resource_class(**data)
except Exception as e:
with open(cache_location, 'rb') as cached:
cached_data = cached.read()
msg = (
f'ERROR: failed to load resource from cached location: {cache_location} '
'with content:\n\n' + repr(cached_data) + '\n\n' + traceback.format_exc()
)
raise Exception(msg) from e
def _remove_resource(self, resource):
"""
Remove the ``resource`` Resource object from this codebase.
Does not remove children.
"""
if resource.is_root:
raise TypeError(f'Cannot remove the root resource from codebase: {resource!r}')
# remove from in-memory cache. The disk cache is cleared on exit.
self.resources_by_path.pop(resource.path, None)
if TRACE:
logger_debug('Codebase._remove_resource:', resource)
def remove_resource(self, resource):
"""
Remove the `resource` Resource object and all its children from the
codebase. Return a set of removed Resource paths.
"""
if TRACE:
logger_debug('Codebase.remove_resource')
logger_debug(' resource', resource)
if resource.is_root:
raise TypeError(f'Cannot remove the root resource from codebase: {resource!r}')
removed_paths = set()
# remove all descendants bottom up to avoid out-of-order access to
# removed resources
for descendant in resource.walk(self, topdown=False):
self._remove_resource(descendant)
removed_paths.add(descendant.location)
# remove resource from parent
parent = resource.parent(self)
if TRACE:
logger_debug(' parent', parent)
parent.children_names.remove(resource.name)
parent.save(self)
# remove resource proper
self._remove_resource(resource)
removed_paths.add(resource.location)
return removed_paths
def walk(self, topdown=True, skip_root=False, ignored=ignore_nothing):
"""
Yield all resources for this Codebase walking its resource tree. Walk
the tree top-down, depth-first if ``topdown`` is True, otherwise walk
bottom-up.
Each level is sorted by children woth this sort order: resource without-
children first, then resource with-children and each group sorted by
case-insensitive name.
If ``skip_root`` is True, the root resource is not returned unless this
is a codebase with a single resource.
``ignored`` is a callable that accepts two arguments, ``resource`` and
``codebase``, and returns True if ``resource`` should be ignored.
"""
root = self.root
if ignored(resource=root, codebase=self):
return
# make a copy
root = attr.evolve(root)
# include root if no children (e.g. codebase with a single resource)
if self.has_single_resource or (skip_root and not root.has_children()):
skip_root = False
root = attr.evolve(root)
if topdown and not skip_root:
yield root
for res in root.walk(self, topdown=topdown, ignored=ignored):
yield res
if not topdown and not skip_root:
yield root
def __iter__(self):
yield from self.walk()
def walk_filtered(self, topdown=True, skip_root=False):
"""
Walk this Codebase as with walk() but does not return Resources with
`is_filtered` flag set to True.
"""
for resource in self.walk(topdown=topdown, skip_root=skip_root):
if not resource.is_filtered:
yield resource
def compute_counts(self, skip_root=False, skip_filtered=False):
"""
Compute, update and save the counts of every resource.
Return a tuple of top level counters for this codebase as:
(files_count, dirs_count, size_count).
The counts are computed differently based on these flags:
- If ``skip_root`` is True, the root resource is not included in counts.
- If ``skip_filtered`` is True, resources with ``is_filtered`` set to True
are not included in counts.
"""
self.update_counts(skip_filtered=skip_filtered)
root = self.root
files_count = root.files_count
dirs_count = root.dirs_count
size_count = root.size_count
if (skip_root and not root.is_file) or (skip_filtered and root.is_filtered):
return files_count, dirs_count, size_count
if root.is_file:
files_count += 1
else:
dirs_count += 1
size_count += root.size or 0
return files_count, dirs_count, size_count
def update_counts(self, skip_filtered=False):
"""
Update files_count, dirs_count and size_count attributes of each
Resource in this codebase based on the current Resource data.
If ``skip_filtered`` is True, resources with ``is_filtered`` set to True are