Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 54 additions & 32 deletions test/pyTest/capture_serial.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from serial import Serial
from threading import Thread, Event, Timer
import queue
import time
import time, os
from target_info import TargetInfo
from pty_forwarder import PtyForwarder
from os import symlink, unlink
Expand All @@ -11,7 +11,7 @@


class CaptureSerial(Serial):
""" Capture bytes received from the target
"""Capture bytes received from the target
and make it easy to check content against expected.

This derives from pyserial's Serial class and accepts
Expand All @@ -31,6 +31,7 @@ def __init__(self, *args, **kwargs):
if "name" in kwargs:
self._target_name = kwargs["name"]
del kwargs["name"]
self._on_line = kwargs.pop("on_line", None)
self._boot_started_bytes = None
self._boot_complete_bytes = None
self._max_boot_time = None
Expand All @@ -43,13 +44,17 @@ def __init__(self, *args, **kwargs):

self._pty_forwarder = None
self._pty_ext_link = None
if "port" in kwargs and \
kwargs["port"] == PtyForwarder.PTY_SLAVE_A_ALIAS:
if "port" in kwargs and kwargs["port"] == PtyForwarder.PTY_SLAVE_A_ALIAS:
self._pty_ext_link = kwargs["ext_link"]
del kwargs["ext_link"]
self._pty_forwarder = PtyForwarder()
self._pty_forwarder.start()
kwargs["port"] = self._pty_forwarder.slave_a_path
try:
if os.path.lexists(self._pty_ext_link):
unlink(self._pty_ext_link)
except Exception:
pass
symlink(self._pty_forwarder.slave_b_path, self._pty_ext_link)

self._write_byte_delay = None
Expand All @@ -59,8 +64,7 @@ def __init__(self, *args, **kwargs):

self._send_command_expected_bytes = None
if "send_command_expected" in kwargs:
self._send_command_expected_bytes = \
kwargs["send_command_expected"].encode()
self._send_command_expected_bytes = kwargs["send_command_expected"].encode()
del kwargs["send_command_expected"]

self._send_command_timeout = None
Expand Down Expand Up @@ -102,18 +106,16 @@ def write(self, data, write_byte_delay=None):
write_count = 0
for i in range(len(data)):
time.sleep(write_byte_delay)
ret = super().write(data[i:i+1])
ret = super().write(data[i : i + 1])
if isinstance(ret, int):
write_count = write_count + ret
return write_count
else:
return super().write(data)

def send_command(self, data,
expected=None,
timeout=None,
max_retries=None,
write_byte_delay=None):
def send_command(
self, data, expected=None, timeout=None, max_retries=None, write_byte_delay=None
):
"""Send a command and wait for expected bytes that confirm success.
Arguments passed override target configuration file settings.
"""
Expand All @@ -124,16 +126,17 @@ def send_command(self, data,
if max_retries is None:
max_retries = self._send_command_max_retries

if not isinstance(expected, (bytes, bytearray)) \
or not isinstance(timeout, (float, int)) \
or not isinstance(max_retries, int):
raise ValueError(
"Valid settings are required to use this method")
if (
not isinstance(expected, (bytes, bytearray))
or not isinstance(timeout, (float, int))
or not isinstance(max_retries, int)
):
raise ValueError("Valid settings are required to use this method")

success = False
self._command_successful_expected = expected
self._command_successful_event.clear()
for _ in range(max_retries+1):
for _ in range(max_retries + 1):
self.write(data, write_byte_delay=write_byte_delay)
success = self._command_successful_event.wait(timeout=timeout)
if success:
Expand All @@ -147,9 +150,14 @@ def clear(self):
self.reset_output_buffer()
self._received_lines_queue = queue.Queue()

def add_boot_info(self, started_str, complete_str, max_time,
wait_time_after_complete=0.0,
assume_booted_at_start=False):
def add_boot_info(
self,
started_str,
complete_str,
max_time,
wait_time_after_complete=0.0,
assume_booted_at_start=False,
):
"""Set config data used in monitoring boot status"""
self._boot_started_bytes = started_str.encode()
self._boot_complete_bytes = complete_str.encode()
Expand All @@ -173,7 +181,7 @@ def mark_not_booted(self):

def seconds_since_booted(self):
if self._booted_event.is_set():
return time.time()-self._last_booted_time
return time.time() - self._last_booted_time
return 0.0

def wait_for_boot_complete(self):
Expand All @@ -189,15 +197,18 @@ def wait_for_boot_complete(self):
return booted

def _check_line(self, line):
if self._command_successful_expected and \
self._command_successful_expected in line:
if (
self._command_successful_expected
and self._command_successful_expected in line
):
self._command_successful_event.set()
if self._boot_started_bytes and self._boot_started_bytes in line:
self.mark_not_booted()
elif self._boot_complete_bytes and self._boot_complete_bytes in line:
if self._wait_time_after_complete:
self._boot_complete_wait_timer = Timer(
self._wait_time_after_complete, self.mark_booted)
self._wait_time_after_complete, self.mark_booted
)
self._boot_complete_wait_timer.start()
else:
self.mark_booted()
Expand All @@ -208,10 +219,16 @@ def _read_thread_func(self):
try:
for b in super().read():
line.append(b)
if b in b'\n':
if b in b"\n":
# print(f"[{self._target_name}] In: {line.decode()}")
self._check_line(line)
self._received_lines_queue.put(line)
if self._on_line is not None:
try:
self._on_line(bytes(line))
except Exception:
pass

line = bytearray()
except Exception:
pass
Expand Down Expand Up @@ -296,12 +313,13 @@ def read_until(self, expected, timeout=None, match_all_expected=True):
while True:
try:
received_line = self._received_lines_queue.get(
block=True if timeout else False, timeout=timeout)
block=True if timeout else False, timeout=timeout
)
# print("Out:"+received_line.decode())
received_lines.append(received_line)
for e in expected_found_at_i:
if expected_found_at_i[e] is None and e in received_line:
expected_found_at_i[e] = len(received_lines)-1
expected_found_at_i[e] = len(received_lines) - 1
except queue.Empty:
break
if timeout and time.time() - start_time > timeout:
Expand All @@ -312,22 +330,26 @@ def read_until(self, expected, timeout=None, match_all_expected=True):
success = True
break
else:
if list(expected_found_at_i.values()).count(None) \
< len(expected_found_at_i):
if list(expected_found_at_i.values()).count(None) < len(
expected_found_at_i
):
success = True
break

return success, received_lines, expected_found_at_i


def start_capture_serial():
def start_capture_serial(on_line_factory=None):
"""Create a CaptureSerial object for each target"""
for name, target_info in TargetInfo.by_name.items():
serial_info = target_info.serial
serial_info["name"] = name
if serial_info["port"] == PtyForwarder.PTY_SLAVE_A_ALIAS:
serial_info["ext_link"] = target_info.pty_forwarder["ext_link"]
capture_serial = CaptureSerial(**serial_info)

on_line = on_line_factory(name) if on_line_factory else None
capture_serial = CaptureSerial(**serial_info, on_line=on_line)

if target_info.boot:
capture_serial.add_boot_info(**target_info.boot)
capture_serial_by_name[name] = capture_serial
Expand Down
30 changes: 22 additions & 8 deletions test/pyTest/conftest.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import pytest
import pytest, os
from can.interfaces import socketcan
import isotp
from doipclient import DoIPClient
Expand All @@ -18,6 +18,9 @@
capture_serial_by_name,
CaptureSerial,
)
from serial_minilog import start_minilog, stop_minilog, on_line as minilog_on_line
import functools
from datetime import datetime


class TargetSession:
Expand Down Expand Up @@ -134,14 +137,25 @@ def once_per_pytest_run():
"""This is used once for the whole pytest run to provide opportunity
for setup before all tests and teardown after all tests.
"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
log_path = os.environ.get("SERIAL_LOG_PATH", f"artifacts/serial_{timestamp}.log")
if TargetInfo.by_name:
# Will be executed before the first test
start_per_run_processes()
start_capture_serial()

start_minilog(log_path)

def on_line_factory(target_name):
# Bind the target into the callback (CaptureSerial calls _on_line(self._target_name, bytes(line)))
return functools.partial(minilog_on_line, target_name)

start_capture_serial(on_line_factory=on_line_factory)

yield 1

if TargetInfo.by_name:
# Will be executed after the last test
close_capture_serial()
stop_minilog()
stop_all_processes()


Expand Down Expand Up @@ -211,7 +225,7 @@ def pytest_generate_tests(metafunc):
if "hw_tester" in metafunc.fixturenames:
need_hw_tester = True
fixture_names += ",hw_tester"

need_uds_transport = False
if "uds_transport" in metafunc.fixturenames:
need_uds_transport = True
Expand All @@ -225,7 +239,9 @@ def pytest_generate_tests(metafunc):
if need_hw_tester:
if target_info.hw_tester_serial:
# The target has hw_tester so this test can run
all_targets_fixture_args.append([name, target_info.hw_tester_serial])
all_targets_fixture_args.append(
[name, target_info.hw_tester_serial]
)

if need_uds_transport:
if target_info.socketcan:
Expand All @@ -235,9 +251,7 @@ def pytest_generate_tests(metafunc):
# Test UDS over Ethernet
all_targets_fixture_args.append([name, "eth"])

metafunc.parametrize(fixture_names,
all_targets_fixture_args,
indirect=True)
metafunc.parametrize(fixture_names, all_targets_fixture_args, indirect=True)


def pytest_runtest_setup(item):
Expand Down
46 changes: 46 additions & 0 deletions test/pyTest/serial_minilog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import os
import threading
from datetime import datetime

_log_file = None
_log_lock = threading.Lock()


def start_minilog(path: str):
global _log_file
os.makedirs(os.path.dirname(os.path.abspath(path)), exist_ok=True)
_log_file = open(path, "a", encoding="utf-8", buffering=1)
_log_file.write(
f"===== Serial capture started: {datetime.now().astimezone().isoformat(timespec='milliseconds')} =====\n"
)


def stop_minilog():
global _log_file
if _log_file:
_log_file.write(
f"===== Serial capture ended: {datetime.now().astimezone().isoformat(timespec='milliseconds')} =====\n"
)
_log_file.flush()
_log_file.close()
_log_file = None


def on_line(target_name: str, raw_line: bytes):
"""Callback to pass into CaptureSerial"""
if _log_file is None:
return
try:
text = (
raw_line.replace(b"\r\n", b"\n")
.replace(b"\r", b"\n")
.decode("utf-8", errors="replace")
)
if not text.endswith("\n"):
text += "\n"
line = f"{datetime.now().astimezone().isoformat(timespec='milliseconds')} {target_name} | {text}"
with _log_lock:
_log_file.write(line)
_log_file.flush()
except Exception:
pass