diff options
Diffstat (limited to 'tests/qemu-iotests/iotests.py')
-rw-r--r-- | tests/qemu-iotests/iotests.py | 366 |
1 files changed, 219 insertions, 147 deletions
diff --git a/tests/qemu-iotests/iotests.py b/tests/qemu-iotests/iotests.py index 5f8c263d59..6c0e781af7 100644 --- a/tests/qemu-iotests/iotests.py +++ b/tests/qemu-iotests/iotests.py @@ -16,26 +16,39 @@ # along with this program. If not, see <http://www.gnu.org/licenses/>. # -import errno +import atexit +from collections import OrderedDict +import faulthandler +import io +import json +import logging import os import re +import signal +import struct import subprocess -import string -import unittest import sys -import struct -import json -import signal -import logging -import atexit -import io -from collections import OrderedDict -import faulthandler +from typing import (Any, Callable, Dict, Iterable, + List, Optional, Sequence, TypeVar) +import unittest +# pylint: disable=import-error, wrong-import-position sys.path.append(os.path.join(os.path.dirname(__file__), '..', '..', 'python')) from qemu import qtest -assert sys.version_info >= (3,6) +assert sys.version_info >= (3, 6) + +# Type Aliases +QMPResponse = Dict[str, Any] + + +# Use this logger for logging messages directly from the iotests module +logger = logging.getLogger('qemu.iotests') +logger.addHandler(logging.NullHandler()) + +# Use this logger for messages that ought to be used for diff output. +test_logger = logging.getLogger('qemu.iotests.diff_io') + faulthandler.enable() @@ -80,9 +93,11 @@ luks_default_key_secret_opt = 'key-secret=keysec0' def qemu_img(*args): '''Run qemu-img and return the exit code''' devnull = open('/dev/null', 'r+') - exitcode = subprocess.call(qemu_img_args + list(args), stdin=devnull, stdout=devnull) + exitcode = subprocess.call(qemu_img_args + list(args), + stdin=devnull, stdout=devnull) if exitcode < 0: - sys.stderr.write('qemu-img received signal %i: %s\n' % (-exitcode, ' '.join(qemu_img_args + list(args)))) + sys.stderr.write('qemu-img received signal %i: %s\n' + % (-exitcode, ' '.join(qemu_img_args + list(args)))) return exitcode def ordered_qmp(qmsg, conv_keys=True): @@ -121,7 +136,8 @@ def qemu_img_verbose(*args): '''Run qemu-img without suppressing its output and return the exit code''' exitcode = subprocess.call(qemu_img_args + list(args)) if exitcode < 0: - sys.stderr.write('qemu-img received signal %i: %s\n' % (-exitcode, ' '.join(qemu_img_args + list(args)))) + sys.stderr.write('qemu-img received signal %i: %s\n' + % (-exitcode, ' '.join(qemu_img_args + list(args)))) return exitcode def qemu_img_pipe(*args): @@ -132,7 +148,8 @@ def qemu_img_pipe(*args): universal_newlines=True) exitcode = subp.wait() if exitcode < 0: - sys.stderr.write('qemu-img received signal %i: %s\n' % (-exitcode, ' '.join(qemu_img_args + list(args)))) + sys.stderr.write('qemu-img received signal %i: %s\n' + % (-exitcode, ' '.join(qemu_img_args + list(args)))) return subp.communicate()[0] def qemu_img_log(*args): @@ -140,12 +157,12 @@ def qemu_img_log(*args): log(result, filters=[filter_testfiles]) return result -def img_info_log(filename, filter_path=None, imgopts=False, extra_args=[]): - args = [ 'info' ] +def img_info_log(filename, filter_path=None, imgopts=False, extra_args=()): + args = ['info'] if imgopts: args.append('--image-opts') else: - args += [ '-f', imgfmt ] + args += ['-f', imgfmt] args += extra_args args.append(filename) @@ -162,7 +179,8 @@ def qemu_io(*args): universal_newlines=True) exitcode = subp.wait() if exitcode < 0: - sys.stderr.write('qemu-io received signal %i: %s\n' % (-exitcode, ' '.join(args))) + sys.stderr.write('qemu-io received signal %i: %s\n' + % (-exitcode, ' '.join(args))) return subp.communicate()[0] def qemu_io_log(*args): @@ -224,7 +242,7 @@ class QemuIoInteractive: # quit command is in close(), '\n' is added automatically assert '\n' not in cmd cmd = cmd.strip() - assert cmd != 'q' and cmd != 'quit' + assert cmd not in ('q', 'quit') self._p.stdin.write(cmd + '\n') self._p.stdin.flush() return self._read_output() @@ -246,10 +264,8 @@ def qemu_nbd_early_pipe(*args): sys.stderr.write('qemu-nbd received signal %i: %s\n' % (-exitcode, ' '.join(qemu_nbd_args + ['--fork'] + list(args)))) - if exitcode == 0: - return exitcode, '' - else: - return exitcode, subp.communicate()[0] + + return exitcode, subp.communicate()[0] if exitcode else '' def qemu_nbd_popen(*args): '''Run qemu-nbd in daemon mode and return the parent's exit code''' @@ -286,10 +302,13 @@ win32_re = re.compile(r"\r") def filter_win32(msg): return win32_re.sub("", msg) -qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* \([0-9\/.inf]* [EPTGMKiBbytes]*\/sec and [0-9\/.inf]* ops\/sec\)") +qemu_io_re = re.compile(r"[0-9]* ops; [0-9\/:. sec]* " + r"\([0-9\/.inf]* [EPTGMKiBbytes]*\/sec " + r"and [0-9\/.inf]* ops\/sec\)") def filter_qemu_io(msg): msg = filter_win32(msg) - return qemu_io_re.sub("X ops; XX:XX:XX.X (XXX YYY/sec and XXX ops/sec)", msg) + return qemu_io_re.sub("X ops; XX:XX:XX.X " + "(XXX YYY/sec and XXX ops/sec)", msg) chown_re = re.compile(r"chown [0-9]+:[0-9]+") def filter_chown(msg): @@ -313,7 +332,7 @@ def filter_qmp(qmsg, filter_fn): items = qmsg.items() for k, v in items: - if isinstance(v, list) or isinstance(v, dict): + if isinstance(v, (dict, list)): qmsg[k] = filter_qmp(v, filter_fn) else: qmsg[k] = filter_fn(k, v) @@ -324,7 +343,7 @@ def filter_testfiles(msg): return msg.replace(prefix, 'TEST_DIR/PID-') def filter_qmp_testfiles(qmsg): - def _filter(key, value): + def _filter(_key, value): if is_str(value): return filter_testfiles(value) return value @@ -342,7 +361,9 @@ def filter_img_info(output, filename): line = filter_testfiles(line) line = line.replace(imgfmt, 'IMGFMT') line = re.sub('iters: [0-9]+', 'iters: XXX', line) - line = re.sub('uuid: [-a-f0-9]+', 'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX', line) + line = re.sub('uuid: [-a-f0-9]+', + 'uuid: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX', + line) line = re.sub('cid: [0-9]+', 'cid: XXXXXXXXXX', line) lines.append(line) return '\n'.join(lines) @@ -351,36 +372,40 @@ def filter_imgfmt(msg): return msg.replace(imgfmt, 'IMGFMT') def filter_qmp_imgfmt(qmsg): - def _filter(key, value): + def _filter(_key, value): if is_str(value): return filter_imgfmt(value) return value return filter_qmp(qmsg, _filter) -def log(msg, filters=[], indent=None): - '''Logs either a string message or a JSON serializable message (like QMP). - If indent is provided, JSON serializable messages are pretty-printed.''' + +Msg = TypeVar('Msg', Dict[str, Any], List[Any], str) + +def log(msg: Msg, + filters: Iterable[Callable[[Msg], Msg]] = (), + indent: Optional[int] = None) -> None: + """ + Logs either a string message or a JSON serializable message (like QMP). + If indent is provided, JSON serializable messages are pretty-printed. + """ for flt in filters: msg = flt(msg) - if isinstance(msg, dict) or isinstance(msg, list): - # Python < 3.4 needs to know not to add whitespace when pretty-printing: - separators = (', ', ': ') if indent is None else (',', ': ') + if isinstance(msg, (dict, list)): # Don't sort if it's already sorted do_sort = not isinstance(msg, OrderedDict) - print(json.dumps(msg, sort_keys=do_sort, - indent=indent, separators=separators)) + test_logger.info(json.dumps(msg, sort_keys=do_sort, indent=indent)) else: - print(msg) + test_logger.info(msg) class Timeout: - def __init__(self, seconds, errmsg = "Timeout"): + def __init__(self, seconds, errmsg="Timeout"): self.seconds = seconds self.errmsg = errmsg def __enter__(self): signal.signal(signal.SIGALRM, self.timeout) signal.setitimer(signal.ITIMER_REAL, self.seconds) return self - def __exit__(self, type, value, traceback): + def __exit__(self, exc_type, value, traceback): signal.setitimer(signal.ITIMER_REAL, 0) return False def timeout(self, signum, frame): @@ -389,7 +414,7 @@ class Timeout: def file_pattern(name): return "{0}-{1}".format(os.getpid(), name) -class FilePaths(object): +class FilePaths: """ FilePaths is an auto-generated filename that cleans itself up. @@ -490,21 +515,21 @@ class VM(qtest.QEMUQtestMachine): self._args.append(opts) return self - def add_drive(self, path, opts='', interface='virtio', format=imgfmt): + def add_drive(self, path, opts='', interface='virtio', img_format=imgfmt): '''Add a virtio-blk drive to the VM''' options = ['if=%s' % interface, 'id=drive%d' % self._num_drives] if path is not None: options.append('file=%s' % path) - options.append('format=%s' % format) + options.append('format=%s' % img_format) options.append('cache=%s' % cachemode) options.append('aio=%s' % aiomode) if opts: options.append(opts) - if format == 'luks' and 'key-secret' not in opts: + if img_format == 'luks' and 'key-secret' not in opts: # default luks support if luks_default_secret_object not in self._args: self.add_object(luks_default_secret_object) @@ -529,30 +554,37 @@ class VM(qtest.QEMUQtestMachine): self._args.append(addr) return self - def pause_drive(self, drive, event=None): - '''Pause drive r/w operations''' + def hmp(self, command_line: str, use_log: bool = False) -> QMPResponse: + cmd = 'human-monitor-command' + kwargs = {'command-line': command_line} + if use_log: + return self.qmp_log(cmd, **kwargs) + else: + return self.qmp(cmd, **kwargs) + + def pause_drive(self, drive: str, event: Optional[str] = None) -> None: + """Pause drive r/w operations""" if not event: self.pause_drive(drive, "read_aio") self.pause_drive(drive, "write_aio") return - self.qmp('human-monitor-command', - command_line='qemu-io %s "break %s bp_%s"' % (drive, event, drive)) + self.hmp(f'qemu-io {drive} "break {event} bp_{drive}"') - def resume_drive(self, drive): - self.qmp('human-monitor-command', - command_line='qemu-io %s "remove_break bp_%s"' % (drive, drive)) + def resume_drive(self, drive: str) -> None: + """Resume drive r/w operations""" + self.hmp(f'qemu-io {drive} "remove_break bp_{drive}"') - def hmp_qemu_io(self, drive, cmd): - '''Write to a given drive using an HMP command''' - return self.qmp('human-monitor-command', - command_line='qemu-io %s "%s"' % (drive, cmd)) + def hmp_qemu_io(self, drive: str, cmd: str, + use_log: bool = False) -> QMPResponse: + """Write to a given drive using an HMP command""" + return self.hmp(f'qemu-io {drive} "{cmd}"', use_log=use_log) def flatten_qmp_object(self, obj, output=None, basestr=''): if output is None: output = dict() if isinstance(obj, list): - for i in range(len(obj)): - self.flatten_qmp_object(obj[i], output, basestr + str(i) + '.') + for i, item in enumerate(obj): + self.flatten_qmp_object(item, output, basestr + str(i) + '.') elif isinstance(obj, dict): for key in obj: self.flatten_qmp_object(obj[key], output, basestr + key + '.') @@ -573,7 +605,7 @@ class VM(qtest.QEMUQtestMachine): result.append(filter_qmp_event(ev)) return result - def qmp_log(self, cmd, filters=[], indent=None, **kwargs): + def qmp_log(self, cmd, filters=(), indent=None, **kwargs): full_cmd = OrderedDict(( ("execute", cmd), ("arguments", ordered_qmp(kwargs)) @@ -585,7 +617,7 @@ class VM(qtest.QEMUQtestMachine): # Returns None on success, and an error string on failure def run_job(self, job, auto_finalize=True, auto_dismiss=False, - pre_finalize=None, cancel=False, use_log=True, wait=60.0): + pre_finalize=None, cancel=False, wait=60.0): """ run_job moves a job from creation through to dismissal. @@ -598,7 +630,6 @@ class VM(qtest.QEMUQtestMachine): invoked prior to issuing job-finalize, if any. :param cancel: Bool. When true, cancels the job after the pre_finalize callback. - :param use_log: Bool. When false, does not log QMP messages. :param wait: Float. Timeout value specifying how long to wait for any event, in seconds. Defaults to 60.0. """ @@ -616,8 +647,7 @@ class VM(qtest.QEMUQtestMachine): while True: ev = filter_qmp_event(self.events_wait(events, timeout=wait)) if ev['event'] != 'JOB_STATUS_CHANGE': - if use_log: - log(ev) + log(ev) continue status = ev['data']['status'] if status == 'aborting': @@ -625,29 +655,18 @@ class VM(qtest.QEMUQtestMachine): for j in result['return']: if j['id'] == job: error = j['error'] - if use_log: - log('Job failed: %s' % (j['error'])) + log('Job failed: %s' % (j['error'])) elif status == 'ready': - if use_log: - self.qmp_log('job-complete', id=job) - else: - self.qmp('job-complete', id=job) + self.qmp_log('job-complete', id=job) elif status == 'pending' and not auto_finalize: if pre_finalize: pre_finalize() - if cancel and use_log: + if cancel: self.qmp_log('job-cancel', id=job) - elif cancel: - self.qmp('job-cancel', id=job) - elif use_log: - self.qmp_log('job-finalize', id=job) else: - self.qmp('job-finalize', id=job) + self.qmp_log('job-finalize', id=job) elif status == 'concluded' and not auto_dismiss: - if use_log: - self.qmp_log('job-dismiss', id=job) - else: - self.qmp('job-dismiss', id=job) + self.qmp_log('job-dismiss', id=job) elif status == 'null': return error @@ -710,9 +729,7 @@ class VM(qtest.QEMUQtestMachine): for bitmap in bitmaps[node_name]: if bitmap.get('name', '') == bitmap_name: - if recording is None: - return bitmap - elif bitmap.get('recording') == recording: + if recording is None or bitmap.get('recording') == recording: return bitmap return None @@ -763,12 +780,13 @@ class VM(qtest.QEMUQtestMachine): assert node is not None, 'Cannot follow path %s%s' % (root, path) try: - node_id = next(edge['child'] for edge in graph['edges'] \ - if edge['parent'] == node['id'] and - edge['name'] == child_name) + node_id = next(edge['child'] for edge in graph['edges'] + if (edge['parent'] == node['id'] and + edge['name'] == child_name)) + + node = next(node for node in graph['nodes'] + if node['id'] == node_id) - node = next(node for node in graph['nodes'] \ - if node['id'] == node_id) except StopIteration: node = None @@ -786,6 +804,12 @@ index_re = re.compile(r'([^\[]+)\[([^\]]+)\]') class QMPTestCase(unittest.TestCase): '''Abstract base class for QMP test cases''' + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + # Many users of this class set a VM property we rely on heavily + # in the methods below. + self.vm = None + def dictpath(self, d, path): '''Traverse a path in a nested dict''' for component in path.split('/'): @@ -795,16 +819,18 @@ class QMPTestCase(unittest.TestCase): idx = int(idx) if not isinstance(d, dict) or component not in d: - self.fail('failed path traversal for "%s" in "%s"' % (path, str(d))) + self.fail(f'failed path traversal for "{path}" in "{d}"') d = d[component] if m: if not isinstance(d, list): - self.fail('path component "%s" in "%s" is not a list in "%s"' % (component, path, str(d))) + self.fail(f'path component "{component}" in "{path}" ' + f'is not a list in "{d}"') try: d = d[idx] except IndexError: - self.fail('invalid index "%s" in path "%s" in "%s"' % (idx, path, str(d))) + self.fail(f'invalid index "{idx}" in path "{path}" ' + f'in "{d}"') return d def assert_qmp_absent(self, d, path): @@ -831,7 +857,7 @@ class QMPTestCase(unittest.TestCase): else: self.assertEqual(result, value, '"%s" is "%s", expected "%s"' - % (path, str(result), str(value))) + % (path, str(result), str(value))) def assert_no_active_block_jobs(self): result = self.vm.qmp('query-block-jobs') @@ -841,24 +867,27 @@ class QMPTestCase(unittest.TestCase): """Issue a query-named-block-nodes and assert node_name and/or file_name is present in the result""" def check_equal_or_none(a, b): - return a == None or b == None or a == b + return a is None or b is None or a == b assert node_name or file_name result = self.vm.qmp('query-named-block-nodes') for x in result["return"]: if check_equal_or_none(x.get("node-name"), node_name) and \ check_equal_or_none(x.get("file"), file_name): return - self.assertTrue(False, "Cannot find %s %s in result:\n%s" % \ - (node_name, file_name, result)) + self.fail("Cannot find %s %s in result:\n%s" % + (node_name, file_name, result)) def assert_json_filename_equal(self, json_filename, reference): '''Asserts that the given filename is a json: filename and that its content is equal to the given reference object''' self.assertEqual(json_filename[:5], 'json:') - self.assertEqual(self.vm.flatten_qmp_object(json.loads(json_filename[5:])), - self.vm.flatten_qmp_object(reference)) + self.assertEqual( + self.vm.flatten_qmp_object(json.loads(json_filename[5:])), + self.vm.flatten_qmp_object(reference) + ) - def cancel_and_wait(self, drive='drive0', force=False, resume=False, wait=60.0): + def cancel_and_wait(self, drive='drive0', force=False, + resume=False, wait=60.0): '''Cancel a block job and wait for it to finish, returning the event''' result = self.vm.qmp('block-job-cancel', device=drive, force=force) self.assert_qmp(result, 'return', {}) @@ -882,8 +911,8 @@ class QMPTestCase(unittest.TestCase): self.assert_no_active_block_jobs() return result - def wait_until_completed(self, drive='drive0', check_offset=True, wait=60.0, - error=None): + def wait_until_completed(self, drive='drive0', check_offset=True, + wait=60.0, error=None): '''Wait for a block job to finish, returning the event''' while True: for event in self.vm.get_qmp_events(wait=wait): @@ -898,13 +927,13 @@ class QMPTestCase(unittest.TestCase): self.assert_qmp(event, 'data/error', error) self.assert_no_active_block_jobs() return event - elif event['event'] == 'JOB_STATUS_CHANGE': + if event['event'] == 'JOB_STATUS_CHANGE': self.assert_qmp(event, 'data/id', drive) def wait_ready(self, drive='drive0'): - '''Wait until a block job BLOCK_JOB_READY event''' - f = {'data': {'type': 'mirror', 'device': drive } } - event = self.vm.event_wait(name='BLOCK_JOB_READY', match=f) + """Wait until a BLOCK_JOB_READY event, and return the event.""" + f = {'data': {'type': 'mirror', 'device': drive}} + return self.vm.event_wait(name='BLOCK_JOB_READY', match=f) def wait_ready_and_cancel(self, drive='drive0'): self.wait_ready(drive=drive) @@ -933,7 +962,7 @@ class QMPTestCase(unittest.TestCase): for job in result['return']: if job['device'] == job_id: found = True - if job['paused'] == True and job['busy'] == False: + if job['paused'] and not job['busy']: return job break assert found @@ -957,7 +986,7 @@ def notrun(reason): seq = os.path.basename(sys.argv[0]) open('%s/%s.notrun' % (output_dir, seq), 'w').write(reason + '\n') - print('%s not run: %s' % (seq, reason)) + logger.warning("%s not run: %s", seq, reason) sys.exit(0) def case_notrun(reason): @@ -972,7 +1001,8 @@ def case_notrun(reason): open('%s/%s.casenotrun' % (output_dir, seq), 'a').write( ' [case not run] ' + reason + '\n') -def verify_image_format(supported_fmts=[], unsupported_fmts=[]): +def _verify_image_format(supported_fmts: Sequence[str] = (), + unsupported_fmts: Sequence[str] = ()) -> None: assert not (supported_fmts and unsupported_fmts) if 'generic' in supported_fmts and \ @@ -986,7 +1016,8 @@ def verify_image_format(supported_fmts=[], unsupported_fmts=[]): if not_sup or (imgfmt in unsupported_fmts): notrun('not suitable for this image format: %s' % imgfmt) -def verify_protocol(supported=[], unsupported=[]): +def _verify_protocol(supported: Sequence[str] = (), + unsupported: Sequence[str] = ()) -> None: assert not (supported and unsupported) if 'generic' in supported: @@ -996,20 +1027,20 @@ def verify_protocol(supported=[], unsupported=[]): if not_sup or (imgproto in unsupported): notrun('not suitable for this protocol: %s' % imgproto) -def verify_platform(supported=None, unsupported=None): - if unsupported is not None: - if any((sys.platform.startswith(x) for x in unsupported)): - notrun('not suitable for this OS: %s' % sys.platform) +def _verify_platform(supported: Sequence[str] = (), + unsupported: Sequence[str] = ()) -> None: + if any((sys.platform.startswith(x) for x in unsupported)): + notrun('not suitable for this OS: %s' % sys.platform) - if supported is not None: + if supported: if not any((sys.platform.startswith(x) for x in supported)): notrun('not suitable for this OS: %s' % sys.platform) -def verify_cache_mode(supported_cache_modes=[]): +def _verify_cache_mode(supported_cache_modes: Sequence[str] = ()) -> None: if supported_cache_modes and (cachemode not in supported_cache_modes): notrun('not suitable for this cache mode: %s' % cachemode) -def verify_aio_mode(supported_aio_modes=[]): +def _verify_aio_mode(supported_aio_modes: Sequence[str] = ()): if supported_aio_modes and (aiomode not in supported_aio_modes): notrun('not suitable for this aio mode: %s' % aiomode) @@ -1022,16 +1053,19 @@ def verify_quorum(): notrun('quorum support missing') def qemu_pipe(*args): - '''Run qemu with an option to print something and exit (e.g. a help option), - and return its output''' + """ + Run qemu with an option to print something and exit (e.g. a help option). + + :return: QEMU's stdout output. + """ args = [qemu_prog] + qemu_opts + list(args) subp = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, universal_newlines=True) exitcode = subp.wait() if exitcode < 0: - sys.stderr.write('qemu received signal %i: %s\n' % (-exitcode, - ' '.join(args))) + sys.stderr.write('qemu received signal %i: %s\n' % + (-exitcode, ' '.join(args))) return subp.communicate()[0] def supported_formats(read_only=False): @@ -1049,7 +1083,7 @@ def supported_formats(read_only=False): return supported_formats.formats[read_only] -def skip_if_unsupported(required_formats=[], read_only=False): +def skip_if_unsupported(required_formats=(), read_only=False): '''Skip Test Decorator Runs the test if all the required formats are whitelisted''' def skip_test_decorator(func): @@ -1061,8 +1095,9 @@ def skip_if_unsupported(required_formats=[], read_only=False): usf_list = list(set(fmts) - set(supported_formats(read_only))) if usf_list: - test_case.case_skip('{}: formats {} are not whitelisted'.format( - test_case, usf_list)) + msg = f'{test_case}: formats {usf_list} are not whitelisted' + test_case.case_skip(msg) + return None else: return func(test_case, *args, **kwargs) return func_wrapper @@ -1074,11 +1109,23 @@ def skip_if_user_is_root(func): def func_wrapper(*args, **kwargs): if os.getuid() == 0: case_notrun('{}: cannot be run as root'.format(args[0])) + return None else: return func(*args, **kwargs) return func_wrapper -def execute_unittest(output, verbosity, debug): +def execute_unittest(debug=False): + """Executes unittests within the calling module.""" + + verbosity = 2 if debug else 1 + + if debug: + output = sys.stdout + else: + # We need to filter out the time taken from the output so that + # qemu-iotest can reliably diff the results against master output. + output = io.StringIO() + runner = unittest.TextTestRunner(stream=output, descriptions=True, verbosity=verbosity) try: @@ -1086,6 +1133,8 @@ def execute_unittest(output, verbosity, debug): # exception unittest.main(testRunner=runner) finally: + # We need to filter out the time taken from the output so that + # qemu-iotest can reliably diff the results against master output. if not debug: out = output.getvalue() out = re.sub(r'Ran (\d+) tests? in [\d.]+s', r'Ran \1 tests', out) @@ -1097,13 +1146,19 @@ def execute_unittest(output, verbosity, debug): sys.stderr.write(out) -def execute_test(test_function=None, - supported_fmts=[], - supported_platforms=None, - supported_cache_modes=[], supported_aio_modes={}, - unsupported_fmts=[], supported_protocols=[], - unsupported_protocols=[]): - """Run either unittest or script-style tests.""" +def execute_setup_common(supported_fmts: Sequence[str] = (), + supported_platforms: Sequence[str] = (), + supported_cache_modes: Sequence[str] = (), + supported_aio_modes: Sequence[str] = (), + unsupported_fmts: Sequence[str] = (), + supported_protocols: Sequence[str] = (), + unsupported_protocols: Sequence[str] = ()) -> bool: + """ + Perform necessary setup for either script-style or unittest-style tests. + + :return: Bool; Whether or not debug mode has been requested via the CLI. + """ + # Note: Python 3.6 and pylint do not like 'Collection' so use 'Sequence'. # We are using TEST_DIR and QEMU_DEFAULT_MACHINE as proxies to # indicate that we're not being run via "check". There may be @@ -1113,34 +1168,51 @@ def execute_test(test_function=None, sys.stderr.write('Please run this test via the "check" script\n') sys.exit(os.EX_USAGE) - debug = '-d' in sys.argv - verbosity = 1 - verify_image_format(supported_fmts, unsupported_fmts) - verify_protocol(supported_protocols, unsupported_protocols) - verify_platform(supported=supported_platforms) - verify_cache_mode(supported_cache_modes) - verify_aio_mode(supported_aio_modes) + _verify_image_format(supported_fmts, unsupported_fmts) + _verify_protocol(supported_protocols, unsupported_protocols) + _verify_platform(supported=supported_platforms) + _verify_cache_mode(supported_cache_modes) + _verify_aio_mode(supported_aio_modes) + debug = '-d' in sys.argv if debug: - output = sys.stdout - verbosity = 2 sys.argv.remove('-d') - else: - # We need to filter out the time taken from the output so that - # qemu-iotest can reliably diff the results against master output. - output = io.StringIO() - logging.basicConfig(level=(logging.DEBUG if debug else logging.WARN)) + logger.debug("iotests debugging messages active") + + return debug + +def execute_test(*args, test_function=None, **kwargs): + """Run either unittest or script-style tests.""" + debug = execute_setup_common(*args, **kwargs) if not test_function: - execute_unittest(output, verbosity, debug) + execute_unittest(debug) else: test_function() +def activate_logging(): + """Activate iotests.log() output to stdout for script-style tests.""" + handler = logging.StreamHandler(stream=sys.stdout) + formatter = logging.Formatter('%(message)s') + handler.setFormatter(formatter) + test_logger.addHandler(handler) + test_logger.setLevel(logging.INFO) + test_logger.propagate = False + +# This is called from script-style iotests without a single point of entry +def script_initialize(*args, **kwargs): + """Initialize script-style tests without running any tests.""" + activate_logging() + execute_setup_common(*args, **kwargs) + +# This is called from script-style iotests with a single point of entry def script_main(test_function, *args, **kwargs): """Run script-style tests outside of the unittest framework""" - execute_test(test_function, *args, **kwargs) + activate_logging() + execute_test(*args, test_function=test_function, **kwargs) +# This is called from unittest style iotests def main(*args, **kwargs): """Run tests using the unittest framework""" - execute_test(None, *args, **kwargs) + execute_test(*args, **kwargs) |