nixos/test: Use retry() in all looping functions that need timeouts
This commit is contained in:
parent
39bf8a7b28
commit
e05ae69904
|
@ -312,8 +312,13 @@ class Machine:
|
|||
self.monitor.send(message)
|
||||
return self.wait_for_monitor_prompt()
|
||||
|
||||
def wait_for_unit(self, unit: str, user: Optional[str] = None) -> bool:
|
||||
while True:
|
||||
def wait_for_unit(self, unit: str, user: Optional[str] = None) -> None:
|
||||
"""Wait for a systemd unit to get into "active" state.
|
||||
Throws exceptions on "failed" and "inactive" states as well as
|
||||
after timing out.
|
||||
"""
|
||||
|
||||
def check_active(_: Any) -> bool:
|
||||
info = self.get_unit_info(unit, user)
|
||||
state = info["ActiveState"]
|
||||
if state == "failed":
|
||||
|
@ -329,8 +334,10 @@ class Machine:
|
|||
'unit "{}" is inactive and there ' "are no pending jobs"
|
||||
).format(unit)
|
||||
)
|
||||
if state == "active":
|
||||
return True
|
||||
|
||||
return state == "active"
|
||||
|
||||
retry(check_active)
|
||||
|
||||
def get_unit_info(self, unit: str, user: Optional[str] = None) -> Dict[str, str]:
|
||||
status, lines = self.systemctl('--no-pager show "{}"'.format(unit), user)
|
||||
|
@ -421,18 +428,34 @@ class Machine:
|
|||
)
|
||||
|
||||
def wait_until_succeeds(self, command: str) -> str:
|
||||
"""Wait until a command returns success and return its output.
|
||||
Throws an exception on timeout.
|
||||
"""
|
||||
output = ""
|
||||
|
||||
def check_success(_: Any) -> bool:
|
||||
nonlocal output
|
||||
status, output = self.execute(command)
|
||||
return status == 0
|
||||
|
||||
with self.nested("waiting for success: {}".format(command)):
|
||||
while True:
|
||||
status, output = self.execute(command)
|
||||
if status == 0:
|
||||
return output
|
||||
retry(check_success)
|
||||
return output
|
||||
|
||||
def wait_until_fails(self, command: str) -> str:
|
||||
"""Wait until a command returns failure.
|
||||
Throws an exception on timeout.
|
||||
"""
|
||||
output = ""
|
||||
|
||||
def check_failure(_: Any) -> bool:
|
||||
nonlocal output
|
||||
status, output = self.execute(command)
|
||||
return status != 0
|
||||
|
||||
with self.nested("waiting for failure: {}".format(command)):
|
||||
while True:
|
||||
status, output = self.execute(command)
|
||||
if status != 0:
|
||||
return output
|
||||
retry(check_failure)
|
||||
return output
|
||||
|
||||
def wait_for_shutdown(self) -> None:
|
||||
if not self.booted:
|
||||
|
@ -453,25 +476,38 @@ class Machine:
|
|||
)
|
||||
return output
|
||||
|
||||
def wait_until_tty_matches(self, tty: str, regexp: str) -> bool:
|
||||
def wait_until_tty_matches(self, tty: str, regexp: str) -> None:
|
||||
"""Wait until the visible output on the chosen TTY matches regular
|
||||
expression. Throws an exception on timeout.
|
||||
"""
|
||||
matcher = re.compile(regexp)
|
||||
|
||||
def tty_matches(last: bool) -> bool:
|
||||
text = self.get_tty_text(tty)
|
||||
if last:
|
||||
self.log(
|
||||
f"Last chance to match /{regexp}/ on TTY{tty}, "
|
||||
f"which currently contains: {text}"
|
||||
)
|
||||
return len(matcher.findall(text)) > 0
|
||||
|
||||
with self.nested("waiting for {} to appear on tty {}".format(regexp, tty)):
|
||||
while True:
|
||||
text = self.get_tty_text(tty)
|
||||
if len(matcher.findall(text)) > 0:
|
||||
return True
|
||||
retry(tty_matches)
|
||||
|
||||
def send_chars(self, chars: List[str]) -> None:
|
||||
with self.nested("sending keys ‘{}‘".format(chars)):
|
||||
for char in chars:
|
||||
self.send_key(char)
|
||||
|
||||
def wait_for_file(self, filename: str) -> bool:
|
||||
def wait_for_file(self, filename: str) -> None:
|
||||
"""Waits until the file exists in machine's file system."""
|
||||
|
||||
def check_file(_: Any) -> bool:
|
||||
status, _ = self.execute("test -e {}".format(filename))
|
||||
return status == 0
|
||||
|
||||
with self.nested("waiting for file ‘{}‘".format(filename)):
|
||||
while True:
|
||||
status, _ = self.execute("test -e {}".format(filename))
|
||||
if status == 0:
|
||||
return True
|
||||
retry(check_file)
|
||||
|
||||
def wait_for_open_port(self, port: int) -> None:
|
||||
def port_is_open(_: Any) -> bool:
|
||||
|
@ -494,8 +530,8 @@ class Machine:
|
|||
def stop_job(self, jobname: str, user: Optional[str] = None) -> Tuple[int, str]:
|
||||
return self.systemctl("stop {}".format(jobname), user)
|
||||
|
||||
def wait_for_job(self, jobname: str) -> bool:
|
||||
return self.wait_for_unit(jobname)
|
||||
def wait_for_job(self, jobname: str) -> None:
|
||||
self.wait_for_unit(jobname)
|
||||
|
||||
def connect(self) -> None:
|
||||
if self.connected:
|
||||
|
@ -700,18 +736,20 @@ class Machine:
|
|||
"""Wait until it is possible to connect to the X server. Note that
|
||||
testing the existence of /tmp/.X11-unix/X0 is insufficient.
|
||||
"""
|
||||
|
||||
def check_x(_: Any) -> bool:
|
||||
cmd = (
|
||||
"journalctl -b SYSLOG_IDENTIFIER=systemd | "
|
||||
+ 'grep "Reached target Current graphical"'
|
||||
)
|
||||
status, _ = self.execute(cmd)
|
||||
if status != 0:
|
||||
return False
|
||||
status, _ = self.execute("[ -e /tmp/.X11-unix/X0 ]")
|
||||
return status == 0
|
||||
|
||||
with self.nested("waiting for the X11 server"):
|
||||
while True:
|
||||
cmd = (
|
||||
"journalctl -b SYSLOG_IDENTIFIER=systemd | "
|
||||
+ 'grep "Reached target Current graphical"'
|
||||
)
|
||||
status, _ = self.execute(cmd)
|
||||
if status != 0:
|
||||
continue
|
||||
status, _ = self.execute("[ -e /tmp/.X11-unix/X0 ]")
|
||||
if status == 0:
|
||||
return
|
||||
retry(check_x)
|
||||
|
||||
def get_window_names(self) -> List[str]:
|
||||
return self.succeed(
|
||||
|
|
Loading…
Reference in New Issue