forked from hardbyte/python-can
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathplayer.py
More file actions
111 lines (89 loc) · 2.9 KB
/
player.py
File metadata and controls
111 lines (89 loc) · 2.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
"""
Replays CAN traffic saved with can.logger back
to a CAN bus.
Similar to canplayer in the can-utils package.
"""
import argparse
import errno
import sys
from datetime import datetime
from typing import Iterable, cast
from can import LogReader, Message, MessageSync
from .logger import _create_base_argument_parser, _create_bus, _parse_additional_config
def main() -> None:
parser = argparse.ArgumentParser(description="Replay CAN traffic.")
_create_base_argument_parser(parser)
parser.add_argument(
"-f",
"--file_name",
dest="log_file",
help="Path and base log filename, for supported types see can.LogReader.",
default=None,
)
parser.add_argument(
"-v",
action="count",
dest="verbosity",
help="""Also print can frames to stdout.
You can add several of these to enable debugging""",
default=2,
)
parser.add_argument(
"--ignore-timestamps",
dest="timestamps",
help="""Ignore timestamps (send all frames immediately with minimum gap between frames)""",
action="store_false",
)
parser.add_argument(
"--error-frames",
help="Also send error frames to the interface.",
action="store_true",
)
parser.add_argument(
"-g",
"--gap",
type=float,
help="<s> minimum time between replayed frames",
default=0.0001,
)
parser.add_argument(
"-s",
"--skip",
type=float,
default=60 * 60 * 24,
help="<s> skip gaps greater than 's' seconds",
)
parser.add_argument(
"infile",
metavar="input-file",
type=str,
help="The file to replay. For supported types see can.LogReader.",
)
# print help message when no arguments were given
if len(sys.argv) < 2:
parser.print_help(sys.stderr)
raise SystemExit(errno.EINVAL)
results, unknown_args = parser.parse_known_args()
additional_config = _parse_additional_config([*results.extra_args, *unknown_args])
verbosity = results.verbosity
error_frames = results.error_frames
with _create_bus(results, **additional_config) as bus:
with LogReader(results.infile, **additional_config) as reader:
in_sync = MessageSync(
cast(Iterable[Message], reader),
timestamps=results.timestamps,
gap=results.gap,
skip=results.skip,
)
print(f"Can LogReader (Started on {datetime.now()})")
try:
for message in in_sync:
if message.is_error_frame and not error_frames:
continue
if verbosity >= 3:
print(message)
bus.send(message)
except KeyboardInterrupt:
pass
if __name__ == "__main__":
main()