Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions tests/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,18 @@ internal_bench/bytebuf:
1 tests performed (3 individual testcases)
```

## Serial reliability and performance test

Serial port reliability and performance can be tested using the `serial_test.py` script.
Pass the name of the port to test against, for example:

$ ./serial_test.py -t /dev/ttyACM0

If no port is specified then `/dev/ttyACM0` is used as the default.

The test will send data out to the target, and receive data from the target, in various
chunk sizes. The throughput of the serial connection will be reported for each sub-test.

## Test key/certificates

SSL/TLS tests in `multi_net` and `net_inet` use self-signed key/cert pairs
Expand Down
263 changes: 263 additions & 0 deletions tests/serial_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,263 @@
#!/usr/bin/env python
#
# Performance and reliability test for serial port communication.
#
# Basic usage:
# serial_test.py [-t serial-device]
#
# The `serial-device` will default to /dev/ttyACM0.

import argparse
import serial
import sys
import time

run_tests_module = __import__("run-tests")

read_test_script = """
bin = True
try:
wr=__import__("pyb").USB_VCP(0).send
except:
import sys
if hasattr(sys.stdout,'buffer'):
wr=sys.stdout.buffer.write
else:
wr=sys.stdout.write
bin = False
b=bytearray(%u)
if bin:
wr('BIN')
for i in range(len(b)):
b[i] = i & 0xff
else:
wr('TXT')
for i in range(len(b)):
b[i] = 0x20 + (i & 0x3f)
for _ in range(%d):
wr(b)
"""


write_test_script_verified = """
import sys
try:
rd=__import__("pyb").USB_VCP(0).recv
except:
rd=sys.stdin.readinto
b=bytearray(%u)
for _ in range(%u):
n = rd(b)
fail = 0
for i in range(n):
if b[i] != 32 + (i & 0x3f):
fail += 1
if fail:
sys.stdout.write(b'ER%%05u' %% fail)
else:
sys.stdout.write(b'OK%%05u' %% n)
"""

write_test_script_unverified = """
import sys
try:
rd=__import__("pyb").USB_VCP(0).recv
except:
rd=sys.stdin.readinto
b=bytearray(%u)
for _ in range(%u):
n = rd(b)
if n != len(b):
sys.stdout.write(b'ER%%05u' %% n)
else:
sys.stdout.write(b'OK%%05u' %% n)
"""


class TestError(Exception):
pass


def drain_input(ser):
time.sleep(0.1)
while ser.inWaiting() > 0:
data = ser.read(ser.inWaiting())
time.sleep(0.1)


def send_script(ser, script):
chunk_size = 32
for i in range(0, len(script), chunk_size):
ser.write(script[i : i + chunk_size])
time.sleep(0.01)
ser.write(b"\x04") # eof
ser.flush()
response = ser.read(2)
if response != b"OK":
response += ser.read(ser.inWaiting())
raise TestError("could not send script", response)


def read_test(ser_repl, ser_data, bufsize, nbuf):
global test_passed

assert bufsize % 256 == 0 # for verify to work

# Load and run the read_test_script.
ser_repl.write(b"\x03\x01\x04") # break, raw-repl, soft-reboot
drain_input(ser_repl)
script = bytes(read_test_script % (bufsize, nbuf), "ascii")
send_script(ser_repl, script)

# Read from the device the type of data that it will send (BIN or TXT).
data_type = ser_data.read(3)

# Read data from the device, check it is correct, and measure throughput.
n = 0
last_byte = None
t_start = time.time()
remain = nbuf * bufsize
total_data = bytearray(remain)
while remain:
t0 = time.monotonic_ns()
while ser_data.inWaiting() == 0:
if time.monotonic_ns() - t0 > 1e9:
# timeout waiting for data from device
break
time.sleep(0.0001)
if not ser_data.inWaiting():
test_passed = False
print("ERROR: timeout waiting for data")
print(total_data[:n])
return 0
to_read = min(ser_data.inWaiting(), remain)
data = ser_data.read(to_read)
remain -= len(data)
print(f"{n} / {nbuf * bufsize}", end="\r")
total_data[n : n + len(data)] = data
n += len(data)
t_end = time.time()
for i in range(0, len(total_data)):
if data_type == b"BIN":
wanted = i & 0xFF
else:
wanted = 0x20 + (i & 0x3F)
if total_data[i] != wanted:
test_passed = False
print("ERROR: data mismatch:", i, wanted, total_data[i])
ser_repl.write(b"\x03") # break
t = t_end - t_start

# Print results.
print(
"DATA IN: bufsize=%u, read %u bytes in %.2f msec = %.2f kibytes/sec = %.2f MBits/sec"
% (bufsize, n, t * 1000, n / 1024 / t, n * 8 / 1000000 / t)
)

return n / t


def write_test(ser_repl, ser_data, bufsize, nbuf, verified):
global test_passed

# Load and run the write_test_script.
ser_repl.write(b"\x03\x01\x04") # break, raw-repl, soft-reboot
drain_input(ser_repl)
if verified:
script = write_test_script_verified
else:
script = write_test_script_unverified
script = bytes(script % (bufsize, nbuf), "ascii")
send_script(ser_repl, script)
drain_input(ser_repl)

# Write data to the device, check it is correct, and measure throughput.
n = 0
t_start = time.time()
buf = bytearray(bufsize)
for i in range(len(buf)):
buf[i] = 32 + (i & 0x3F) # don't want to send ctrl chars!
for i in range(nbuf):
ser_data.write(buf)
n += len(buf)
print(f"{n} / {nbuf * bufsize}", end="\r")
response = ser_repl.read(7)
if response != b"OK%05u" % bufsize:
test_passed = False
print("ERROR: bad response, expecting OK%05u, got %r" % (bufsize, response))
t_end = time.time()
ser_repl.write(b"\x03") # break
t = t_end - t_start

# Print results.
print(
"DATA OUT: verify=%d, bufsize=%u, wrote %u bytes in %.2f msec = %.2f kibytes/sec = %.2f MBits/sec"
% (verified, bufsize, n, t * 1000, n / 1024 / t, n * 8 / 1000000 / t)
)

return n / t


def do_test(dev_repl, dev_data=None, time_per_subtest=1):
if dev_data is None:
print("REPL and data on", dev_repl)
ser_repl = serial.Serial(dev_repl, baudrate=115200, timeout=1)
ser_data = ser_repl
else:
print("REPL on", dev_repl)
print("data on", dev_data)
ser_repl = serial.Serial(dev_repl, baudrate=115200, timeout=1)
ser_data = serial.Serial(dev_data, baudrate=115200, timeout=1)

for test_func, test_args, bufsize in (
(read_test, (), 256),
(write_test, (True,), 128),
(write_test, (False,), 128),
):
nbuf = 128
while bufsize <= 16384:
rate = test_func(ser_repl, ser_data, bufsize, nbuf, *test_args)
bufsize *= 2
if rate:
# Adjust the amount of data based on the rate, to keep each subtest
# at around time_per_subtest seconds long.
nbuf = max(min(128, int(rate * time_per_subtest / bufsize)), 1)

ser_repl.close()
ser_data.close()


def main():
global test_passed

cmd_parser = argparse.ArgumentParser(
description="Test performance and reliability of serial port communication.",
epilog=run_tests_module.test_instance_epilog,
formatter_class=argparse.RawTextHelpFormatter,
)
cmd_parser.add_argument(
"-t",
"--test-instance",
default="a0",
help="MicroPython instance to test",
)
cmd_parser.add_argument(
"--time-per-subtest", default="1", help="approximate time to take per subtest (in seconds)"
)
args = cmd_parser.parse_args()

dev_repl = run_tests_module.convert_device_shortcut_to_real_device(args.test_instance)

test_passed = True
try:
do_test(dev_repl, None, float(args.time_per_subtest))
except TestError as er:
test_passed = False
print("ERROR:", er)

if not test_passed:
sys.exit(1)


if __name__ == "__main__":
main()
Loading