-
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathutils.py
More file actions
243 lines (208 loc) · 6.87 KB
/
utils.py
File metadata and controls
243 lines (208 loc) · 6.87 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
from __future__ import annotations
import atexit
import contextlib
import datetime
import io
import logging
import os
import re
import sys
from types import TracebackType
from typing import Iterable, Iterator
from python_utils import types
from python_utils.converters import scale_1024
from python_utils.terminal import get_terminal_size
from python_utils.time import epoch, format_time, timedelta_to_seconds
from progressbar import base, env, terminal
if types.TYPE_CHECKING:
from .bar import ProgressBar, ProgressBarMixinBase
assert timedelta_to_seconds is not None
assert get_terminal_size is not None
assert format_time is not None
assert scale_1024 is not None
assert epoch is not None
StringT = types.TypeVar('StringT', bound=types.StringTypes)
def deltas_to_seconds(*deltas, default: types.Optional[types.Type[ValueError]]=ValueError) -> int | float | None:
"""
Convert timedeltas and seconds as int to seconds as float while coalescing.
>>> deltas_to_seconds(datetime.timedelta(seconds=1, milliseconds=234))
1.234
>>> deltas_to_seconds(123)
123.0
>>> deltas_to_seconds(1.234)
1.234
>>> deltas_to_seconds(None, 1.234)
1.234
>>> deltas_to_seconds(0, 1.234)
0.0
>>> deltas_to_seconds()
Traceback (most recent call last):
...
ValueError: No valid deltas passed to `deltas_to_seconds`
>>> deltas_to_seconds(None)
Traceback (most recent call last):
...
ValueError: No valid deltas passed to `deltas_to_seconds`
>>> deltas_to_seconds(default=0.0)
0.0
"""
if not deltas and default is ValueError:
raise ValueError('No valid deltas passed to `deltas_to_seconds`')
elif not deltas:
return default
for delta in deltas:
if delta is None:
continue
elif isinstance(delta, datetime.timedelta):
return timedelta_to_seconds(delta)
elif isinstance(delta, (int, float)):
return float(delta)
if default is ValueError:
raise ValueError('No valid deltas passed to `deltas_to_seconds`')
return default
def no_color(value: StringT) -> StringT:
"""
Return the `value` without ANSI escape codes.
>>> no_color(b'\x1b[1234]abc')
b'abc'
>>> str(no_color(u'\x1b[1234]abc'))
'abc'
>>> str(no_color('\x1b[1234]abc'))
'abc'
>>> no_color(123)
Traceback (most recent call last):
...
TypeError: `value` must be a string or bytes, got 123
"""
if not isinstance(value, (str, bytes)):
raise TypeError(f'`value` must be a string or bytes, got {value}')
# Pattern to match ANSI escape sequences
pattern = re.compile(rb'\x1b\[[^m]*m' if isinstance(value, bytes) else r'\x1b\[[^m]*m')
return types.cast(StringT, pattern.sub(b'' if isinstance(value, bytes) else '', value))
def len_color(value: types.StringTypes) -> int:
"""
Return the length of `value` without ANSI escape codes.
>>> len_color(b'\x1b[1234]abc')
3
>>> len_color(u'\x1b[1234]abc')
3
>>> len_color('\x1b[1234]abc')
3
"""
return len(no_color(value))
class WrappingIO:
buffer: io.StringIO
target: base.IO
capturing: bool
listeners: set
needs_clear: bool = False
def __init__(self, target: base.IO, capturing: bool=False, listeners: types.Optional[types.Set[ProgressBar]]=None) -> None:
self.buffer = io.StringIO()
self.target = target
self.capturing = capturing
self.listeners = listeners or set()
self.needs_clear = False
def __enter__(self) -> WrappingIO:
return self
def __next__(self) -> str:
return self.target.__next__()
def __iter__(self) -> Iterator[str]:
return self.target.__iter__()
def __exit__(self, __t: type[BaseException] | None, __value: BaseException | None, __traceback: TracebackType | None) -> None:
self.close()
def fileno(self) -> int:
"""Return the file descriptor of the target."""
return self.target.fileno()
def close(self) -> None:
"""Close the buffer and target."""
self.buffer.close()
if hasattr(self.target, 'close'):
self.target.close()
def flush(self) -> None:
"""Flush the buffer and target."""
self.buffer.flush()
if hasattr(self.target, 'flush'):
self.target.flush()
def isatty(self) -> bool:
"""Return True if the target is a terminal."""
return hasattr(self.target, 'isatty') and self.target.isatty()
class StreamWrapper:
"""Wrap stdout and stderr globally."""
stdout: base.TextIO | WrappingIO
stderr: base.TextIO | WrappingIO
original_excepthook: types.Callable[[types.Type[BaseException], BaseException, TracebackType | None], None]
wrapped_stdout: int = 0
wrapped_stderr: int = 0
wrapped_excepthook: int = 0
capturing: int = 0
listeners: set
def __init__(self):
self.stdout = self.original_stdout = sys.stdout
self.stderr = self.original_stderr = sys.stderr
self.original_excepthook = sys.excepthook
self.wrapped_stdout = 0
self.wrapped_stderr = 0
self.wrapped_excepthook = 0
self.capturing = 0
self.listeners = set()
if env.env_flag('WRAP_STDOUT', default=False):
self.wrap_stdout()
if env.env_flag('WRAP_STDERR', default=False):
self.wrap_stderr()
def flush(self) -> None:
"""Flush both stdout and stderr streams."""
if hasattr(self.stdout, 'flush'):
self.stdout.flush()
if hasattr(self.stderr, 'flush'):
self.stderr.flush()
class AttributeDict(dict):
"""
A dict that can be accessed with .attribute.
>>> attrs = AttributeDict(spam=123)
# Reading
>>> attrs['spam']
123
>>> attrs.spam
123
# Read after update using attribute
>>> attrs.spam = 456
>>> attrs['spam']
456
>>> attrs.spam
456
# Read after update using dict access
>>> attrs['spam'] = 123
>>> attrs['spam']
123
>>> attrs.spam
123
# Read after update using dict access
>>> del attrs.spam
>>> attrs['spam']
Traceback (most recent call last):
...
KeyError: 'spam'
>>> attrs.spam
Traceback (most recent call last):
...
AttributeError: No such attribute: spam
>>> del attrs.spam
Traceback (most recent call last):
...
AttributeError: No such attribute: spam
"""
def __getattr__(self, name: str) -> int:
if name in self:
return self[name]
else:
raise AttributeError(f'No such attribute: {name}')
def __setattr__(self, name: str, value: int) -> None:
self[name] = value
def __delattr__(self, name: str) -> None:
if name in self:
del self[name]
else:
raise AttributeError(f'No such attribute: {name}')
logger = logging.getLogger(__name__)
streams = StreamWrapper()
atexit.register(streams.flush)