qutebrowser/tests/end2end/fixtures/testprocess.py

503 lines
17 KiB
Python

# SPDX-FileCopyrightText: Florian Bruhin (The Compiler) <mail@qutebrowser.org>
#
# SPDX-License-Identifier: GPL-3.0-or-later
"""Base class for a subprocess run for tests."""
import re
import time
import warnings
import dataclasses
import pytest
import pytestqt.wait_signal
from qutebrowser.qt.core import (pyqtSlot, pyqtSignal, QProcess, QObject,
QElapsedTimer, QProcessEnvironment)
from qutebrowser.qt.test import QSignalSpy
from helpers import testutils
from qutebrowser.utils import utils as quteutils
class InvalidLine(Exception):
"""Raised when the process prints a line which is not parsable."""
class ProcessExited(Exception):
"""Raised when the child process did exit."""
class WaitForTimeout(Exception):
"""Raised when wait_for didn't get the expected message."""
class BlacklistedMessageError(Exception):
"""Raised when ensure_not_logged found a message."""
@dataclasses.dataclass
class Line:
"""Container for a line of data the process emits.
Attributes:
data: The raw data passed to the constructor.
waited_for: If Process.wait_for was used on this line already.
"""
data: str
waited_for: bool = False
def _render_log(data, *, verbose, threshold=100):
"""Shorten the given log without -v and convert to a string."""
data = [str(d) for d in data]
is_exception = any('Traceback (most recent call last):' in line or
'Uncaught exception' in line for line in data)
if (len(data) > threshold and
not verbose and
not is_exception and
not testutils.ON_CI):
msg = '[{} lines suppressed, use -v to show]'.format(
len(data) - threshold)
data = [msg] + data[-threshold:]
if testutils.ON_CI:
data = [testutils.gha_group_begin('Log')] + data + [testutils.gha_group_end()]
return '\n'.join(data)
@pytest.hookimpl(hookwrapper=True)
def pytest_runtest_makereport(item, call):
"""Add qutebrowser/server sections to captured output if a test failed."""
outcome = yield
if call.when not in ['call', 'teardown']:
return
report = outcome.get_result()
if report.passed:
return
quteproc_log = getattr(item, '_quteproc_log', None)
server_logs = getattr(item, '_server_logs', [])
if not hasattr(report.longrepr, 'addsection'):
# In some conditions (on macOS and Windows it seems), report.longrepr
# is actually a tuple. This is handled similarly in pytest-qt too.
return
if item.config.getoption('--capture') == 'no':
# Already printed live
return
verbose = item.config.getoption('--verbose')
if quteproc_log is not None:
report.longrepr.addsection(
"qutebrowser output", _render_log(quteproc_log, verbose=verbose))
for name, content in server_logs:
report.longrepr.addsection(
f"{name} output", _render_log(content, verbose=verbose))
class Process(QObject):
"""Abstraction over a running test subprocess process.
Reads the log from its stdout and parses it.
Attributes:
_invalid: A list of lines which could not be parsed.
_data: A list of parsed lines.
_started: Whether the process was ever started.
proc: The QProcess for the underlying process.
exit_expected: Whether the process is expected to quit.
request: The request object for the current test.
Signals:
ready: Emitted when the server finished starting up.
new_data: Emitted when a new line was parsed.
"""
ready = pyqtSignal()
new_data = pyqtSignal(object)
KEYS = ['data']
def __init__(self, request, parent=None):
super().__init__(parent)
self.request = request
self.captured_log = []
self._started = False
self._invalid = []
self._data = []
self.proc = QProcess()
self.proc.setReadChannel(QProcess.ProcessChannel.StandardError)
self.exit_expected = None # Not started at all yet
def _log(self, line):
"""Add the given line to the captured log output."""
if self.request.config.getoption('--capture') == 'no':
print(line)
self.captured_log.append(line)
def log_summary(self, text):
"""Log the given line as summary/title."""
text = '\n{line} {text} {line}\n'.format(line='='*30, text=text)
self._log(text)
def _parse_line(self, line):
"""Parse the given line from the log.
Return:
A self.ParseResult member.
"""
raise NotImplementedError
def _executable_args(self):
"""Get the executable and necessary arguments as a tuple."""
raise NotImplementedError
def _default_args(self):
"""Get the default arguments to use if none were passed to start()."""
raise NotImplementedError
def _get_data(self):
"""Get the parsed data for this test.
Also waits for 0.5s to make sure any new data is received.
Subprocesses are expected to alias this to a public method with a
better name.
"""
self.proc.waitForReadyRead(500)
self.read_log()
return self._data
def _wait_signal(self, signal, timeout=5000, raising=True):
"""Wait for a signal to be emitted.
Should be used in a contextmanager.
"""
blocker = pytestqt.wait_signal.SignalBlocker(timeout=timeout, raising=raising)
blocker.connect(signal)
return blocker
@pyqtSlot()
def read_log(self):
"""Read the log from the process' stdout."""
if not hasattr(self, 'proc'):
# I have no idea how this happens, but it does...
return
while self.proc.canReadLine():
line = self.proc.readLine()
line = bytes(line).decode('utf-8', errors='ignore').rstrip('\r\n')
try:
parsed = self._parse_line(line)
except InvalidLine:
self._invalid.append(line)
self._log("INVALID: {}".format(line))
continue
if parsed is None:
if self._invalid:
self._log("IGNORED: {}".format(line))
else:
self._data.append(parsed)
self.new_data.emit(parsed)
def start(self, args=None, *, env=None):
"""Start the process and wait until it started."""
self._start(args, env=env)
self._started = True
verbose = self.request.config.getoption('--verbose')
timeout = 60 if testutils.ON_CI else 20
for _ in range(timeout):
with self._wait_signal(self.ready, timeout=1000,
raising=False) as blocker:
pass
if not self.is_running():
if self.exit_expected:
return
# _start ensures it actually started, but it might quit shortly
# afterwards
raise ProcessExited('\n' + _render_log(self.captured_log,
verbose=verbose))
if blocker.signal_triggered:
self._after_start()
return
raise WaitForTimeout("Timed out while waiting for process start.\n" +
_render_log(self.captured_log, verbose=verbose))
def _start(self, args, env):
"""Actually start the process."""
executable, exec_args = self._executable_args()
if args is None:
args = self._default_args()
procenv = QProcessEnvironment.systemEnvironment()
if env is not None:
for k, v in env.items():
procenv.insert(k, v)
self.proc.readyRead.connect(self.read_log)
self.proc.setProcessEnvironment(procenv)
self.proc.start(executable, exec_args + args)
ok = self.proc.waitForStarted()
assert ok
assert self.is_running()
def _after_start(self):
"""Do things which should be done immediately after starting."""
def before_test(self):
"""Restart process before a test if it exited before."""
self._invalid = []
if not self.is_running():
self.start()
def after_test(self):
"""Clean up data after each test.
Also checks self._invalid so the test counts as failed if there were
unexpected output lines earlier.
"""
__tracebackhide__ = lambda e: e.errisinstance(ProcessExited)
self.captured_log = []
if self._invalid:
# Wait for a bit so the full error has a chance to arrive
time.sleep(1)
# Exit the process to make sure we're in a defined state again
self.terminate()
self.clear_data()
raise InvalidLine('\n' + '\n'.join(self._invalid))
self.clear_data()
if not self.is_running() and not self.exit_expected and self._started:
raise ProcessExited
self.exit_expected = False
def clear_data(self):
"""Clear the collected data."""
self._data.clear()
def terminate(self):
"""Clean up and shut down the process."""
if not self.is_running():
return
if quteutils.is_windows:
self.proc.kill()
else:
self.proc.terminate()
ok = self.proc.waitForFinished(5000)
if not ok:
cmdline = ' '.join([self.proc.program()] + self.proc.arguments())
warnings.warn(f"Test process {cmdline} with PID {self.proc.processId()} "
"failed to terminate!")
self.proc.kill()
self.proc.waitForFinished()
def is_running(self):
"""Check if the process is currently running."""
return self.proc.state() == QProcess.ProcessState.Running
def _match_data(self, value, expected):
"""Helper for wait_for to match a given value.
The behavior of this method is slightly different depending on the
types of the filtered values:
- If expected is None, the filter always matches.
- If the value is a string or bytes object and the expected value is
too, the pattern is treated as a glob pattern (with only * active).
- If the value is a string or bytes object and the expected value is a
compiled regex, it is used for matching.
- If the value is any other type, == is used.
Return:
A bool
"""
regex_type = type(re.compile(''))
if expected is None:
return True
elif isinstance(expected, regex_type):
return expected.search(value)
elif isinstance(value, (bytes, str)):
return testutils.pattern_match(pattern=expected, value=value)
else:
return value == expected
def _wait_for_existing(self, override_waited_for, after, **kwargs):
"""Check if there are any line in the history for wait_for.
Return: either the found line or None.
"""
for line in self._data:
matches = []
for key, expected in kwargs.items():
value = getattr(line, key)
matches.append(self._match_data(value, expected))
if after is None:
too_early = False
else:
too_early = ((line.timestamp, line.msecs) <
(after.timestamp, after.msecs))
if (all(matches) and
(not line.waited_for or override_waited_for) and
not too_early):
# If we waited for this line, chances are we don't mean the
# same thing the next time we use wait_for and it matches
# this line again.
line.waited_for = True
self._log("\n----> Already found {!r} in the log: {}".format(
kwargs.get('message', 'line'), line))
return line
return None
def _wait_for_new(self, timeout, do_skip, **kwargs):
"""Wait for a log message which doesn't exist yet.
Called via wait_for.
"""
__tracebackhide__ = lambda e: e.errisinstance(WaitForTimeout)
message = kwargs.get('message', None)
if message is not None:
elided = quteutils.elide(repr(message), 100)
self._log("\n----> Waiting for {} in the log".format(elided))
spy = QSignalSpy(self.new_data)
elapsed_timer = QElapsedTimer()
elapsed_timer.start()
while True:
# Skip if there are pending messages causing a skip
self._maybe_skip()
got_signal = spy.wait(timeout)
if not got_signal or elapsed_timer.hasExpired(timeout):
msg = "Timed out after {}ms waiting for {!r}.".format(
timeout, kwargs)
if do_skip:
pytest.skip(msg)
else:
raise WaitForTimeout(msg)
match = self._wait_for_match(spy, kwargs)
if match is not None:
if message is not None:
self._log(f"----> found it: {match.formatted_str()}")
return match
raise quteutils.Unreachable
def _wait_for_match(self, spy, kwargs):
"""Try matching the kwargs with the given QSignalSpy."""
for args in spy:
assert len(args) == 1
line = args[0]
matches = []
for key, expected in kwargs.items():
value = getattr(line, key)
matches.append(self._match_data(value, expected))
if all(matches):
# If we waited for this line, chances are we don't mean the
# same thing the next time we use wait_for and it matches
# this line again.
line.waited_for = True
return line
return None
def _maybe_skip(self):
"""Can be overridden by subclasses to skip on certain log lines.
We can't run pytest.skip directly while parsing the log, as that would
lead to a pytest.skip.Exception error in a virtual Qt method, which
means pytest-qt fails the test.
Instead, we check for skip messages periodically in
QuteProc._maybe_skip, and call _maybe_skip after every parsed message
in wait_for (where it's most likely that new messages arrive).
"""
def wait_for(self, timeout=None, *, override_waited_for=False,
do_skip=False, divisor=1, after=None, **kwargs):
"""Wait until a given value is found in the data.
Keyword arguments to this function get interpreted as attributes of the
searched data. Every given argument is treated as a pattern which
the attribute has to match against.
Args:
timeout: How long to wait for the message.
override_waited_for: If set, gets triggered by previous messages
again.
do_skip: If set, call pytest.skip on a timeout.
divisor: A factor to decrease the timeout by.
after: If it's an existing line, ensure it's after the given one.
Return:
The matched line.
"""
__tracebackhide__ = lambda e: e.errisinstance(WaitForTimeout)
if timeout is None:
if do_skip:
timeout = 2000
elif testutils.ON_CI:
timeout = 15000
else:
timeout = 5000
timeout //= divisor
if not kwargs:
raise TypeError("No keyword arguments given!")
for key in kwargs:
assert key in self.KEYS
existing = self._wait_for_existing(override_waited_for, after,
**kwargs)
if existing is not None:
return existing
else:
return self._wait_for_new(timeout=timeout, do_skip=do_skip,
**kwargs)
def ensure_not_logged(self, delay=500, **kwargs):
"""Make sure the data matching the given arguments is not logged.
If nothing is found in the log, we wait for delay ms to make sure
nothing arrives.
"""
__tracebackhide__ = lambda e: e.errisinstance(BlacklistedMessageError)
try:
line = self.wait_for(timeout=delay, override_waited_for=True,
**kwargs)
except WaitForTimeout:
return
else:
raise BlacklistedMessageError(line)
def wait_for_quit(self):
"""Wait until the process has quit."""
self.exit_expected = True
with self._wait_signal(self.proc.finished, timeout=15000):
pass
assert not self.is_running()