3
0
Fork 0
forked from mirrors/nixpkgs

nixos/test-driver: switch to pythons' logging lib

- Less code
- more thread-safe according to @flokli
This commit is contained in:
Jörg Thalheim 2020-08-25 09:33:09 +01:00
parent c1667f85bb
commit 392415c285
No known key found for this signature in database
GPG key ID: 003F2096411B5F92
2 changed files with 128 additions and 215 deletions

View file

@ -1,17 +1,15 @@
#! /somewhere/python3
from contextlib import contextmanager, _GeneratorContextManager
from queue import Queue, Empty
from contextlib import contextmanager
from typing import Tuple, Any, Callable, Dict, Iterator, Optional, List
from xml.sax.saxutils import XMLGenerator
import queue
import io
import _thread
import argparse
import atexit
import base64
import codecs
import os
import pathlib
import logging
import ptpython.repl
import pty
import re
@ -22,8 +20,6 @@ import subprocess
import sys
import tempfile
import time
import traceback
import unicodedata
CHAR_TO_KEY = {
"A": "shift-a",
@ -88,13 +84,17 @@ CHAR_TO_KEY = {
")": "shift-0x0B",
}
# Forward references
log: "Logger"
# Forward reference
machines: "List[Machine]"
logging.basicConfig(format="%(message)s")
logger = logging.getLogger("test-driver")
logger.setLevel(logging.INFO)
def eprint(*args: object, **kwargs: Any) -> None:
print(*args, file=sys.stderr, **kwargs)
class MachineLogAdapter(logging.LoggerAdapter):
def process(self, msg: str, kwargs: Any) -> Tuple[str, Any]:
return f"{self.extra['machine']}: {msg}", kwargs
def make_command(args: list) -> str:
@ -102,8 +102,7 @@ def make_command(args: list) -> str:
def create_vlan(vlan_nr: str) -> Tuple[str, str, "subprocess.Popen[bytes]", Any]:
global log
log.log("starting VDE switch for network {}".format(vlan_nr))
logger.info(f"starting VDE switch for network {vlan_nr}")
vde_socket = tempfile.mkdtemp(
prefix="nixos-test-vde-", suffix="-vde{}.ctl".format(vlan_nr)
)
@ -142,70 +141,6 @@ def retry(fn: Callable) -> None:
raise Exception("action timed out")
class Logger:
def __init__(self) -> None:
self.logfile = os.environ.get("LOGFILE", "/dev/null")
self.logfile_handle = codecs.open(self.logfile, "wb")
self.xml = XMLGenerator(self.logfile_handle, encoding="utf-8")
self.queue: "Queue[Dict[str, str]]" = Queue()
self.xml.startDocument()
self.xml.startElement("logfile", attrs={})
def close(self) -> None:
self.xml.endElement("logfile")
self.xml.endDocument()
self.logfile_handle.close()
def sanitise(self, message: str) -> str:
return "".join(ch for ch in message if unicodedata.category(ch)[0] != "C")
def maybe_prefix(self, message: str, attributes: Dict[str, str]) -> str:
if "machine" in attributes:
return "{}: {}".format(attributes["machine"], message)
return message
def log_line(self, message: str, attributes: Dict[str, str]) -> None:
self.xml.startElement("line", attributes)
self.xml.characters(message)
self.xml.endElement("line")
def log(self, message: str, attributes: Dict[str, str] = {}) -> None:
eprint(self.maybe_prefix(message, attributes))
self.drain_log_queue()
self.log_line(message, attributes)
def enqueue(self, message: Dict[str, str]) -> None:
self.queue.put(message)
def drain_log_queue(self) -> None:
try:
while True:
item = self.queue.get_nowait()
attributes = {"machine": item["machine"], "type": "serial"}
self.log_line(self.sanitise(item["msg"]), attributes)
except Empty:
pass
@contextmanager
def nested(self, message: str, attributes: Dict[str, str] = {}) -> Iterator[None]:
eprint(self.maybe_prefix(message, attributes))
self.xml.startElement("nest", attrs={})
self.xml.startElement("head", attributes)
self.xml.characters(message)
self.xml.endElement("head")
tic = time.time()
self.drain_log_queue()
yield
self.drain_log_queue()
toc = time.time()
self.log("({:.2f} seconds)".format(toc - tic))
self.xml.endElement("nest")
class Machine:
def __init__(self, args: Dict[str, Any]) -> None:
if "name" in args:
@ -235,8 +170,8 @@ class Machine:
self.pid: Optional[int] = None
self.socket = None
self.monitor: Optional[socket.socket] = None
self.logger: Logger = args["log"]
self.allow_reboot = args.get("allowReboot", False)
self.logger = MachineLogAdapter(logger, extra=dict(machine=self.name))
@staticmethod
def create_startcommand(args: Dict[str, str]) -> str:
@ -292,14 +227,6 @@ class Machine:
def is_up(self) -> bool:
return self.booted and self.connected
def log(self, msg: str) -> None:
self.logger.log(msg, {"machine": self.name})
def nested(self, msg: str, attrs: Dict[str, str] = {}) -> _GeneratorContextManager:
my_attrs = {"machine": self.name}
my_attrs.update(attrs)
return self.logger.nested(msg, my_attrs)
def wait_for_monitor_prompt(self) -> str:
assert self.monitor is not None
answer = ""
@ -314,7 +241,7 @@ class Machine:
def send_monitor_command(self, command: str) -> str:
message = ("{}\n".format(command)).encode()
self.log("sending monitor command: {}".format(command))
self.logger.info(f"sending monitor command: {command}")
assert self.monitor is not None
self.monitor.send(message)
return self.wait_for_monitor_prompt()
@ -381,16 +308,16 @@ class Machine:
return self.execute("systemctl {}".format(q))
def require_unit_state(self, unit: str, require_state: str = "active") -> None:
with self.nested(
"checking if unit {} has reached state '{}'".format(unit, require_state)
):
info = self.get_unit_info(unit)
state = info["ActiveState"]
if state != require_state:
raise Exception(
"Expected unit {} to to be in state ".format(unit)
+ "'{}' but it is in state {}".format(require_state, state)
)
self.logger.info(
f"checking if unit {unit} has reached state '{require_state}'"
)
info = self.get_unit_info(unit)
state = info["ActiveState"]
if state != require_state:
raise Exception(
"Expected unit {} to to be in state ".format(unit)
+ "'{}' but it is in state {}".format(require_state, state)
)
def execute(self, command: str) -> Tuple[int, str]:
self.connect()
@ -414,27 +341,25 @@ class Machine:
"""Execute each command and check that it succeeds."""
output = ""
for command in commands:
with self.nested("must succeed: {}".format(command)):
(status, out) = self.execute(command)
if status != 0:
self.log("output: {}".format(out))
raise Exception(
"command `{}` failed (exit code {})".format(command, status)
)
output += out
self.logger.info(f"must succeed: {command}")
(status, out) = self.execute(command)
if status != 0:
self.logger.info(f"output: {out}")
raise Exception(
"command `{}` failed (exit code {})".format(command, status)
)
output += out
return output
def fail(self, *commands: str) -> str:
"""Execute each command and check that it fails."""
output = ""
for command in commands:
with self.nested("must fail: {}".format(command)):
(status, out) = self.execute(command)
if status == 0:
raise Exception(
"command `{}` unexpectedly succeeded".format(command)
)
output += out
self.logger.info(f"must fail: {command}")
(status, out) = self.execute(command)
if status == 0:
raise Exception("command `{}` unexpectedly succeeded".format(command))
output += out
return output
def wait_until_succeeds(self, command: str) -> str:
@ -448,9 +373,9 @@ class Machine:
status, output = self.execute(command)
return status == 0
with self.nested("waiting for success: {}".format(command)):
retry(check_success)
return output
self.logger.info(f"waiting for success: {command}")
retry(check_success)
return output
def wait_until_fails(self, command: str) -> str:
"""Wait until a command returns failure.
@ -463,21 +388,21 @@ class Machine:
status, output = self.execute(command)
return status != 0
with self.nested("waiting for failure: {}".format(command)):
retry(check_failure)
return output
self.logger.info(f"waiting for failure: {command}")
retry(check_failure)
return output
def wait_for_shutdown(self) -> None:
if not self.booted:
return
with self.nested("waiting for the VM to power off"):
sys.stdout.flush()
self.process.wait()
self.logger.info("waiting for the VM to power off")
sys.stdout.flush()
self.process.wait()
self.pid = None
self.booted = False
self.connected = False
self.pid = None
self.booted = False
self.connected = False
def get_tty_text(self, tty: str) -> str:
status, output = self.execute(
@ -495,19 +420,19 @@ class Machine:
def tty_matches(last: bool) -> bool:
text = self.get_tty_text(tty)
if last:
self.log(
self.logger.info(
f"Last chance to match /{regexp}/ on TTY{tty}, "
f"which currently contains: {text}"
)
return len(matcher.findall(text)) > 0
with self.nested("waiting for {} to appear on tty {}".format(regexp, tty)):
retry(tty_matches)
self.logger.info(f"waiting for {regexp} to appear on tty {tty}")
retry(tty_matches)
def send_chars(self, chars: List[str]) -> None:
with self.nested("sending keys {}".format(chars)):
for char in chars:
self.send_key(char)
self.logger.info(f"sending keys {chars}")
for char in chars:
self.send_key(char)
def wait_for_file(self, filename: str) -> None:
"""Waits until the file exists in machine's file system."""
@ -516,16 +441,16 @@ class Machine:
status, _ = self.execute("test -e {}".format(filename))
return status == 0
with self.nested("waiting for file {}".format(filename)):
retry(check_file)
self.logger.info(f"waiting for file {filename}")
retry(check_file)
def wait_for_open_port(self, port: int) -> None:
def port_is_open(_: Any) -> bool:
status, _ = self.execute("nc -z localhost {}".format(port))
return status == 0
with self.nested("waiting for TCP port {}".format(port)):
retry(port_is_open)
self.logger.info(f"waiting for TCP port {port}")
retry(port_is_open)
def wait_for_closed_port(self, port: int) -> None:
def port_is_closed(_: Any) -> bool:
@ -547,17 +472,17 @@ class Machine:
if self.connected:
return
with self.nested("waiting for the VM to finish booting"):
self.start()
self.logger.info("waiting for the VM to finish booting")
self.start()
tic = time.time()
self.shell.recv(1024)
# TODO: Timeout
toc = time.time()
tic = time.time()
self.shell.recv(1024)
# TODO: Timeout
toc = time.time()
self.log("connected to guest root shell")
self.log("(connecting took {:.2f} seconds)".format(toc - tic))
self.connected = True
self.logger.info("connected to guest root shell")
self.logger.info(f"(connecting took {toc - tic:.2f} seconds)")
self.connected = True
def screenshot(self, filename: str) -> None:
out_dir = os.environ.get("out", os.getcwd())
@ -566,15 +491,12 @@ class Machine:
filename = os.path.join(out_dir, "{}.png".format(filename))
tmp = "{}.ppm".format(filename)
with self.nested(
"making screenshot {}".format(filename),
{"image": os.path.basename(filename)},
):
self.send_monitor_command("screendump {}".format(tmp))
ret = subprocess.run("pnmtopng {} > {}".format(tmp, filename), shell=True)
os.unlink(tmp)
if ret.returncode != 0:
raise Exception("Cannot convert screenshot")
self.logger.info(f"making screenshot {filename}")
self.send_monitor_command("screendump {}".format(tmp))
ret = subprocess.run("pnmtopng {} > {}".format(tmp, filename), shell=True)
os.unlink(tmp)
if ret.returncode != 0:
raise Exception("Cannot convert screenshot")
def copy_from_host_via_shell(self, source: str, target: str) -> None:
"""Copy a file from the host into the guest by piping it over the
@ -650,20 +572,18 @@ class Machine:
tess_args = "-c debug_file=/dev/null --psm 11 --oem 2"
with self.nested("performing optical character recognition"):
with tempfile.NamedTemporaryFile() as tmpin:
self.send_monitor_command("screendump {}".format(tmpin.name))
self.logger.info("performing optical character recognition")
with tempfile.NamedTemporaryFile() as tmpin:
self.send_monitor_command("screendump {}".format(tmpin.name))
cmd = "convert {} {} tiff:- | tesseract - - {}".format(
magick_args, tmpin.name, tess_args
)
ret = subprocess.run(cmd, shell=True, capture_output=True)
if ret.returncode != 0:
raise Exception(
"OCR failed with exit code {}".format(ret.returncode)
)
cmd = "convert {} {} tiff:- | tesseract - - {}".format(
magick_args, tmpin.name, tess_args
)
ret = subprocess.run(cmd, shell=True, capture_output=True)
if ret.returncode != 0:
raise Exception("OCR failed with exit code {}".format(ret.returncode))
return ret.stdout.decode("utf-8")
return ret.stdout.decode("utf-8")
def wait_for_text(self, regex: str) -> None:
def screen_matches(last: bool) -> bool:
@ -671,15 +591,15 @@ class Machine:
matches = re.search(regex, text) is not None
if last and not matches:
self.log("Last OCR attempt failed. Text was: {}".format(text))
self.logger.info(f"Last OCR attempt failed. Text was: {text}")
return matches
with self.nested("waiting for {} to appear on screen".format(regex)):
retry(screen_matches)
self.logger.info(f"waiting for {regex} to appear on screen")
retry(screen_matches)
def wait_for_console_text(self, regex: str) -> None:
self.log("waiting for {} to appear on console".format(regex))
self.logger.info(f"waiting for {regex} to appear on console")
# Buffer the console output, this is needed
# to match multiline regexes.
console = io.StringIO()
@ -702,7 +622,7 @@ class Machine:
if self.booted:
return
self.log("starting vm")
self.logger.info("starting vm")
def create_socket(path: str) -> socket.socket:
if os.path.exists(path):
@ -759,7 +679,7 @@ class Machine:
# Store last serial console lines for use
# of wait_for_console_text
self.last_lines: Queue = Queue()
self.last_lines: queue.Queue = queue.Queue()
def process_serial_output() -> None:
assert self.process.stdout is not None
@ -767,8 +687,7 @@ class Machine:
# Ignore undecodable bytes that may occur in boot menus
line = _line.decode(errors="ignore").replace("\r", "").rstrip()
self.last_lines.put(line)
eprint("{} # {}".format(self.name, line))
self.logger.enqueue({"msg": line, "machine": self.name})
self.logger.info(line)
_thread.start_new_thread(process_serial_output, ())
@ -777,10 +696,10 @@ class Machine:
self.pid = self.process.pid
self.booted = True
self.log("QEMU running (pid {})".format(self.pid))
self.logger.info(f"QEMU running (pid {self.pid})")
def cleanup_statedir(self) -> None:
self.log("delete the VM state directory")
self.logger.info("delete the VM state directory")
if os.path.isfile(self.state_dir):
shutil.rmtree(self.state_dir)
@ -795,7 +714,7 @@ class Machine:
if not self.booted:
return
self.log("forced crash")
self.logger.info("forced crash")
self.send_monitor_command("quit")
self.wait_for_shutdown()
@ -815,8 +734,8 @@ class Machine:
status, _ = self.execute("[ -e /tmp/.X11-unix/X0 ]")
return status == 0
with self.nested("waiting for the X11 server"):
retry(check_x)
self.logger.info("waiting for the X11 server")
retry(check_x)
def get_window_names(self) -> List[str]:
return self.succeed(
@ -829,15 +748,14 @@ class Machine:
def window_is_visible(last_try: bool) -> bool:
names = self.get_window_names()
if last_try:
self.log(
"Last chance to match {} on the window list,".format(regexp)
+ " which currently contains: "
+ ", ".join(names)
self.logger.info(
f"Last chance to match {regexp} on the window list, "
+ f"which currently contains: {', '.join(names)}"
)
return any(pattern.search(name) for name in names)
with self.nested("Waiting for a window to appear"):
retry(window_is_visible)
self.logger.info("Waiting for a window to appear")
retry(window_is_visible)
def sleep(self, secs: int) -> None:
time.sleep(secs)
@ -865,23 +783,22 @@ class Machine:
def create_machine(args: Dict[str, Any]) -> Machine:
global log
args["log"] = log
args["redirectSerial"] = os.environ.get("USE_SERIAL", "0") == "1"
return Machine(args)
def start_all() -> None:
global machines
with log.nested("starting all VMs"):
for machine in machines:
machine.start()
logger.info("starting all VMs")
for machine in machines:
machine.start()
def join_all() -> None:
global machines
with log.nested("waiting for all VMs to finish"):
for machine in machines:
machine.wait_for_shutdown()
logger.info("waiting for all VMs to finish")
for machine in machines:
machine.wait_for_shutdown()
def test_script() -> None:
@ -892,13 +809,12 @@ def run_tests() -> None:
global machines
tests = os.environ.get("tests", None)
if tests is not None:
with log.nested("running the VM test script"):
try:
exec(tests, globals())
except Exception as e:
eprint("error: ")
traceback.print_exc()
sys.exit(1)
logger.info("running the VM test script")
try:
exec(tests, globals())
except Exception:
logging.exception("error:")
sys.exit(1)
else:
ptpython.repl.embed(locals(), globals())
@ -911,13 +827,13 @@ def run_tests() -> None:
@contextmanager
def subtest(name: str) -> Iterator[None]:
with log.nested(name):
try:
yield
return True
except Exception as e:
log.log(f'Test "{name}" failed with error: "{e}"')
raise e
logger.info(name)
try:
yield
return True
except Exception as e:
logger.info(f'Test "{name}" failed with error: "{e}"')
raise e
return False
@ -933,8 +849,6 @@ def main() -> None:
)
(cli_args, vm_scripts) = arg_parser.parse_known_args()
log = Logger()
vlan_nrs = list(dict.fromkeys(os.environ.get("VLANS", "").split()))
vde_sockets = [create_vlan(v) for v in vlan_nrs]
for nr, vde_socket, _, _ in vde_sockets:
@ -952,15 +866,14 @@ def main() -> None:
@atexit.register
def clean_up() -> None:
with log.nested("cleaning up"):
for machine in machines:
if machine.pid is None:
continue
log.log("killing {} (pid {})".format(machine.name, machine.pid))
machine.process.kill()
for _, _, process, _ in vde_sockets:
process.terminate()
log.close()
logger.info("cleaning up")
for machine in machines:
if machine.pid is None:
continue
logger.info(f"killing {machine.name} (pid {machine.pid})")
machine.process.kill()
for _, _, process, _ in vde_sockets:
process.terminate()
tic = time.time()
run_tests()

View file

@ -62,7 +62,7 @@ rec {
''
mkdir -p $out
LOGFILE=/dev/null tests='exec(os.environ["testScript"])' ${driver}/bin/nixos-test-driver
tests='exec(os.environ["testScript"])' ${driver}/bin/nixos-test-driver
for i in */xchg/coverage-data; do
mkdir -p $out/coverage-data