summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--python/.gitignore5
-rw-r--r--python/Makefile9
-rw-r--r--python/Pipfile.lock28
-rw-r--r--python/avocado.cfg3
-rw-r--r--python/qemu/aqmp/__init__.py59
-rw-r--r--python/qemu/aqmp/aqmp_tui.py652
-rw-r--r--python/qemu/aqmp/error.py50
-rw-r--r--python/qemu/aqmp/events.py706
-rw-r--r--python/qemu/aqmp/message.py209
-rw-r--r--python/qemu/aqmp/models.py133
-rw-r--r--python/qemu/aqmp/protocol.py902
-rw-r--r--python/qemu/aqmp/py.typed0
-rw-r--r--python/qemu/aqmp/qmp_client.py621
-rw-r--r--python/qemu/aqmp/util.py217
-rw-r--r--python/setup.cfg43
-rw-r--r--python/tests/protocol.py583
16 files changed, 4214 insertions, 6 deletions
diff --git a/python/.gitignore b/python/.gitignore
index c8b0e67fe6..904f324bb1 100644
--- a/python/.gitignore
+++ b/python/.gitignore
@@ -15,3 +15,8 @@ qemu.egg-info/
.venv/
.tox/
.dev-venv/
+
+# Coverage.py reports
+.coverage
+.coverage.*
+htmlcov/
diff --git a/python/Makefile b/python/Makefile
index fe27a3e12e..3334311362 100644
--- a/python/Makefile
+++ b/python/Makefile
@@ -92,6 +92,13 @@ check:
check-tox:
@tox $(QEMU_TOX_EXTRA_ARGS)
+.PHONY: check-coverage
+check-coverage:
+ @coverage run -m avocado --config avocado.cfg run tests/*.py
+ @coverage combine
+ @coverage html
+ @coverage report
+
.PHONY: clean
clean:
python3 setup.py clean --all
@@ -100,3 +107,5 @@ clean:
.PHONY: distclean
distclean: clean
rm -rf qemu.egg-info/ .venv/ .tox/ $(QEMU_VENV_DIR) dist/
+ rm -f .coverage .coverage.*
+ rm -rf htmlcov/
diff --git a/python/Pipfile.lock b/python/Pipfile.lock
index 8ab41a3f60..d2a7dbd88b 100644
--- a/python/Pipfile.lock
+++ b/python/Pipfile.lock
@@ -1,7 +1,7 @@
{
"_meta": {
"hash": {
- "sha256": "eff562a688ebc6f3ffe67494dbb804b883e2159ad81c4d55d96da9f7aec13e91"
+ "sha256": "784b327272db32403d5a488507853b5afba850ba26a5948e5b6a90c1baef2d9c"
},
"pipfile-spec": 6,
"requires": {
@@ -39,11 +39,11 @@
},
"avocado-framework": {
"hashes": [
- "sha256:3fca7226d7d164f124af8a741e7fa658ff4345a0738ddc32907631fd688b38ed",
- "sha256:48ac254c0ae2ef0c0ceeb38e3d3df0388718eda8f48b3ab55b30b252839f42b1"
+ "sha256:244cb569f8eb4e50a22ac82e1a2b2bba2458999f4281efbe2651bd415d59c65b",
+ "sha256:6f15998b67ecd0e7dde790c4de4dd249d6df52dfe6d5cc4e2dd6596df51c3583"
],
"index": "pypi",
- "version": "==87.0"
+ "version": "==90.0"
},
"distlib": {
"hashes": [
@@ -200,6 +200,14 @@
],
"version": "==2.0.0"
},
+ "pygments": {
+ "hashes": [
+ "sha256:a18f47b506a429f6f4b9df81bb02beab9ca21d0a5fee38ed15aef65f0545519f",
+ "sha256:d66e804411278594d764fc69ec36ec13d9ae9147193a1740cd34d272ca383b8e"
+ ],
+ "markers": "python_version >= '3.5'",
+ "version": "==2.9.0"
+ },
"pylint": {
"hashes": [
"sha256:082a6d461b54f90eea49ca90fff4ee8b6e45e8029e5dbd72f6107ef84f3779c0",
@@ -289,6 +297,18 @@
"markers": "python_version < '3.8'",
"version": "==3.10.0.0"
},
+ "urwid": {
+ "hashes": [
+ "sha256:588bee9c1cb208d0906a9f73c613d2bd32c3ed3702012f51efe318a3f2127eae"
+ ],
+ "version": "==2.1.2"
+ },
+ "urwid-readline": {
+ "hashes": [
+ "sha256:018020cbc864bb5ed87be17dc26b069eae2755cb29f3a9c569aac3bded1efaf4"
+ ],
+ "version": "==0.13"
+ },
"virtualenv": {
"hashes": [
"sha256:14fdf849f80dbb29a4eb6caa9875d476ee2a5cf76a5f5415fa2f1606010ab467",
diff --git a/python/avocado.cfg b/python/avocado.cfg
index 10dc6fb605..c7722e7ecd 100644
--- a/python/avocado.cfg
+++ b/python/avocado.cfg
@@ -1,3 +1,6 @@
+[run]
+test_runner = runner
+
[simpletests]
# Don't show stdout/stderr in the test *summary*
status.failure_fields = ['status']
diff --git a/python/qemu/aqmp/__init__.py b/python/qemu/aqmp/__init__.py
new file mode 100644
index 0000000000..ab1782999c
--- /dev/null
+++ b/python/qemu/aqmp/__init__.py
@@ -0,0 +1,59 @@
+"""
+QEMU Monitor Protocol (QMP) development library & tooling.
+
+This package provides a fairly low-level class for communicating
+asynchronously with QMP protocol servers, as implemented by QEMU, the
+QEMU Guest Agent, and the QEMU Storage Daemon.
+
+`QMPClient` provides the main functionality of this package. All errors
+raised by this library dervive from `AQMPError`, see `aqmp.error` for
+additional detail. See `aqmp.events` for an in-depth tutorial on
+managing QMP events.
+"""
+
+# Copyright (C) 2020, 2021 John Snow for Red Hat, Inc.
+#
+# Authors:
+# John Snow <jsnow@redhat.com>
+#
+# Based on earlier work by Luiz Capitulino <lcapitulino@redhat.com>.
+#
+# This work is licensed under the terms of the GNU GPL, version 2. See
+# the COPYING file in the top-level directory.
+
+import warnings
+
+from .error import AQMPError
+from .events import EventListener
+from .message import Message
+from .protocol import ConnectError, Runstate, StateError
+from .qmp_client import ExecInterruptedError, ExecuteError, QMPClient
+
+
+_WMSG = """
+
+The Asynchronous QMP library is currently in development and its API
+should be considered highly fluid and subject to change. It should
+not be used by any other scripts checked into the QEMU tree.
+
+Proceed with caution!
+"""
+
+warnings.warn(_WMSG, FutureWarning)
+
+
+# The order of these fields impact the Sphinx documentation order.
+__all__ = (
+ # Classes, most to least important
+ 'QMPClient',
+ 'Message',
+ 'EventListener',
+ 'Runstate',
+
+ # Exceptions, most generic to most explicit
+ 'AQMPError',
+ 'StateError',
+ 'ConnectError',
+ 'ExecuteError',
+ 'ExecInterruptedError',
+)
diff --git a/python/qemu/aqmp/aqmp_tui.py b/python/qemu/aqmp/aqmp_tui.py
new file mode 100644
index 0000000000..a2929f771c
--- /dev/null
+++ b/python/qemu/aqmp/aqmp_tui.py
@@ -0,0 +1,652 @@
+# Copyright (c) 2021
+#
+# Authors:
+# Niteesh Babu G S <niteesh.gs@gmail.com>
+#
+# This work is licensed under the terms of the GNU GPL, version 2 or
+# later. See the COPYING file in the top-level directory.
+"""
+AQMP TUI
+
+AQMP TUI is an asynchronous interface built on top the of the AQMP library.
+It is the successor of QMP-shell and is bought-in as a replacement for it.
+
+Example Usage: aqmp-tui <SOCKET | TCP IP:PORT>
+Full Usage: aqmp-tui --help
+"""
+
+import argparse
+import asyncio
+import json
+import logging
+from logging import Handler, LogRecord
+import signal
+from typing import (
+ List,
+ Optional,
+ Tuple,
+ Type,
+ Union,
+ cast,
+)
+
+from pygments import lexers
+from pygments import token as Token
+import urwid
+import urwid_readline
+
+from ..qmp import QEMUMonitorProtocol, QMPBadPortError
+from .error import ProtocolError
+from .message import DeserializationError, Message, UnexpectedTypeError
+from .protocol import ConnectError, Runstate
+from .qmp_client import ExecInterruptedError, QMPClient
+from .util import create_task, pretty_traceback
+
+
+# The name of the signal that is used to update the history list
+UPDATE_MSG: str = 'UPDATE_MSG'
+
+
+palette = [
+ (Token.Punctuation, '', '', '', 'h15,bold', 'g7'),
+ (Token.Text, '', '', '', '', 'g7'),
+ (Token.Name.Tag, '', '', '', 'bold,#f88', 'g7'),
+ (Token.Literal.Number.Integer, '', '', '', '#fa0', 'g7'),
+ (Token.Literal.String.Double, '', '', '', '#6f6', 'g7'),
+ (Token.Keyword.Constant, '', '', '', '#6af', 'g7'),
+ ('DEBUG', '', '', '', '#ddf', 'g7'),
+ ('INFO', '', '', '', 'g100', 'g7'),
+ ('WARNING', '', '', '', '#ff6', 'g7'),
+ ('ERROR', '', '', '', '#a00', 'g7'),
+ ('CRITICAL', '', '', '', '#a00', 'g7'),
+ ('background', '', 'black', '', '', 'g7'),
+]
+
+
+def format_json(msg: str) -> str:
+ """
+ Formats valid/invalid multi-line JSON message into a single-line message.
+
+ Formatting is first tried using the standard json module. If that fails
+ due to an decoding error then a simple string manipulation is done to
+ achieve a single line JSON string.
+
+ Converting into single line is more asthetically pleasing when looking
+ along with error messages.
+
+ Eg:
+ Input:
+ [ 1,
+ true,
+ 3 ]
+ The above input is not a valid QMP message and produces the following error
+ "QMP message is not a JSON object."
+ When displaying this in TUI in multiline mode we get
+
+ [ 1,
+ true,
+ 3 ]: QMP message is not a JSON object.
+
+ whereas in singleline mode we get the following
+
+ [1, true, 3]: QMP message is not a JSON object.
+
+ The single line mode is more asthetically pleasing.
+
+ :param msg:
+ The message to formatted into single line.
+
+ :return: Formatted singleline message.
+ """
+ try:
+ msg = json.loads(msg)
+ return str(json.dumps(msg))
+ except json.decoder.JSONDecodeError:
+ msg = msg.replace('\n', '')
+ words = msg.split(' ')
+ words = list(filter(None, words))
+ return ' '.join(words)
+
+
+def has_handler_type(logger: logging.Logger,
+ handler_type: Type[Handler]) -> bool:
+ """
+ The Logger class has no interface to check if a certain type of handler is
+ installed or not. So we provide an interface to do so.
+
+ :param logger:
+ Logger object
+ :param handler_type:
+ The type of the handler to be checked.
+
+ :return: returns True if handler of type `handler_type`.
+ """
+ for handler in logger.handlers:
+ if isinstance(handler, handler_type):
+ return True
+ return False
+
+
+class App(QMPClient):
+ """
+ Implements the AQMP TUI.
+
+ Initializes the widgets and starts the urwid event loop.
+
+ :param address:
+ Address of the server to connect to.
+ :param num_retries:
+ The number of times to retry before stopping to reconnect.
+ :param retry_delay:
+ The delay(sec) before each retry
+ """
+ def __init__(self, address: Union[str, Tuple[str, int]], num_retries: int,
+ retry_delay: Optional[int]) -> None:
+ urwid.register_signal(type(self), UPDATE_MSG)
+ self.window = Window(self)
+ self.address = address
+ self.aloop: Optional[asyncio.AbstractEventLoop] = None
+ self.num_retries = num_retries
+ self.retry_delay = retry_delay if retry_delay else 2
+ self.retry: bool = False
+ self.exiting: bool = False
+ super().__init__()
+
+ def add_to_history(self, msg: str, level: Optional[str] = None) -> None:
+ """
+ Appends the msg to the history list.
+
+ :param msg:
+ The raw message to be appended in string type.
+ """
+ urwid.emit_signal(self, UPDATE_MSG, msg, level)
+
+ def _cb_outbound(self, msg: Message) -> Message:
+ """
+ Callback: outbound message hook.
+
+ Appends the outgoing messages to the history box.
+
+ :param msg: raw outbound message.
+ :return: final outbound message.
+ """
+ str_msg = str(msg)
+
+ if not has_handler_type(logging.getLogger(), TUILogHandler):
+ logging.debug('Request: %s', str_msg)
+ self.add_to_history('<-- ' + str_msg)
+ return msg
+
+ def _cb_inbound(self, msg: Message) -> Message:
+ """
+ Callback: outbound message hook.
+
+ Appends the incoming messages to the history box.
+
+ :param msg: raw inbound message.
+ :return: final inbound message.
+ """
+ str_msg = str(msg)
+
+ if not has_handler_type(logging.getLogger(), TUILogHandler):
+ logging.debug('Request: %s', str_msg)
+ self.add_to_history('--> ' + str_msg)
+ return msg
+
+ async def _send_to_server(self, msg: Message) -> None:
+ """
+ This coroutine sends the message to the server.
+ The message has to be pre-validated.
+
+ :param msg:
+ Pre-validated message to be to sent to the server.
+
+ :raise Exception: When an unhandled exception is caught.
+ """
+ try:
+ await self._raw(msg, assign_id='id' not in msg)
+ except ExecInterruptedError as err:
+ logging.info('Error server disconnected before reply %s', str(err))
+ self.add_to_history('Server disconnected before reply', 'ERROR')
+ except Exception as err:
+ logging.error('Exception from _send_to_server: %s', str(err))
+ raise err
+
+ def cb_send_to_server(self, raw_msg: str) -> None:
+ """
+ Validates and sends the message to the server.
+ The raw string message is first converted into a Message object
+ and is then sent to the server.
+
+ :param raw_msg:
+ The raw string message to be sent to the server.
+
+ :raise Exception: When an unhandled exception is caught.
+ """
+ try:
+ msg = Message(bytes(raw_msg, encoding='utf-8'))
+ create_task(self._send_to_server(msg))
+ except (DeserializationError, UnexpectedTypeError) as err:
+ raw_msg = format_json(raw_msg)
+ logging.info('Invalid message: %s', err.error_message)
+ self.add_to_history(f'{raw_msg}: {err.error_message}', 'ERROR')
+
+ def unhandled_input(self, key: str) -> None:
+ """
+ Handle's keys which haven't been handled by the child widgets.
+
+ :param key:
+ Unhandled key
+ """
+ if key == 'esc':
+ self.kill_app()
+
+ def kill_app(self) -> None:
+ """
+ Initiates killing of app. A bridge between asynchronous and synchronous
+ code.
+ """
+ create_task(self._kill_app())
+
+ async def _kill_app(self) -> None:
+ """
+ This coroutine initiates the actual disconnect process and calls
+ urwid.ExitMainLoop() to kill the TUI.
+
+ :raise Exception: When an unhandled exception is caught.
+ """
+ self.exiting = True
+ await self.disconnect()
+ logging.debug('Disconnect finished. Exiting app')
+ raise urwid.ExitMainLoop()
+
+ async def disconnect(self) -> None:
+ """
+ Overrides the disconnect method to handle the errors locally.
+ """
+ try:
+ await super().disconnect()
+ except (OSError, EOFError) as err:
+ logging.info('disconnect: %s', str(err))
+ self.retry = True
+ except ProtocolError as err:
+ logging.info('disconnect: %s', str(err))
+ except Exception as err:
+ logging.error('disconnect: Unhandled exception %s', str(err))
+ raise err
+
+ def _set_status(self, msg: str) -> None:
+ """
+ Sets the message as the status.
+
+ :param msg:
+ The message to be displayed in the status bar.
+ """
+ self.window.footer.set_text(msg)
+
+ def _get_formatted_address(self) -> str:
+ """
+ Returns a formatted version of the server's address.
+
+ :return: formatted address
+ """
+ if isinstance(self.address, tuple):
+ host, port = self.address
+ addr = f'{host}:{port}'
+ else:
+ addr = f'{self.address}'
+ return addr
+
+ async def _initiate_connection(self) -> Optional[ConnectError]:
+ """
+ Tries connecting to a server a number of times with a delay between
+ each try. If all retries failed then return the error faced during
+ the last retry.
+
+ :return: Error faced during last retry.
+ """
+ current_retries = 0
+ err = None
+
+ # initial try
+ await self.connect_server()
+ while self.retry and current_retries < self.num_retries:
+ logging.info('Connection Failed, retrying in %d', self.retry_delay)
+ status = f'[Retry #{current_retries} ({self.retry_delay}s)]'
+ self._set_status(status)
+
+ await asyncio.sleep(self.retry_delay)
+
+ err = await self.connect_server()
+ current_retries += 1
+ # If all retries failed report the last error
+ if err:
+ logging.info('All retries failed: %s', err)
+ return err
+ return None
+
+ async def manage_connection(self) -> None:
+ """
+ Manage the connection based on the current run state.
+
+ A reconnect is issued when the current state is IDLE and the number
+ of retries is not exhausted.
+ A disconnect is issued when the current state is DISCONNECTING.
+ """
+ while not self.exiting:
+ if self.runstate == Runstate.IDLE:
+ err = await self._initiate_connection()
+ # If retry is still true then, we have exhausted all our tries.
+ if err:
+ self._set_status(f'[Error: {err.error_message}]')
+ else:
+ addr = self._get_formatted_address()
+ self._set_status(f'[Connected {addr}]')
+ elif self.runstate == Runstate.DISCONNECTING:
+ self._set_status('[Disconnected]')
+ await self.disconnect()
+ # check if a retry is needed
+ if self.runstate == Runstate.IDLE:
+ continue
+ await self.runstate_changed()
+
+ async def connect_server(self) -> Optional[ConnectError]:
+ """
+ Initiates a connection to the server at address `self.address`
+ and in case of a failure, sets the status to the respective error.
+ """
+ try:
+ await self.connect(self.address)
+ self.retry = False
+ except ConnectError as err:
+ logging.info('connect_server: ConnectError %s', str(err))
+ self.retry = True
+ return err
+ return None
+
+ def run(self, debug: bool = False) -> None:
+ """
+ Starts the long running co-routines and the urwid event loop.
+
+ :param debug:
+ Enables/Disables asyncio event loop debugging
+ """
+ screen = urwid.raw_display.Screen()
+ screen.set_terminal_properties(256)
+
+ self.aloop = asyncio.get_event_loop()
+ self.aloop.set_debug(debug)
+
+ # Gracefully handle SIGTERM and SIGINT signals
+ cancel_signals = [signal.SIGTERM, signal.SIGINT]
+ for sig in cancel_signals:
+ self.aloop.add_signal_handler(sig, self.kill_app)
+
+ event_loop = urwid.AsyncioEventLoop(loop=self.aloop)
+ main_loop = urwid.MainLoop(urwid.AttrMap(self.window, 'background'),
+ unhandled_input=self.unhandled_input,
+ screen=screen,
+ palette=palette,
+ handle_mouse=True,
+ event_loop=event_loop)
+
+ create_task(self.manage_connection(), self.aloop)
+ try:
+ main_loop.run()
+ except Exception as err:
+ logging.error('%s\n%s\n', str(err), pretty_traceback())
+ raise err
+
+
+class StatusBar(urwid.Text):
+ """
+ A simple statusbar modelled using the Text widget. The status can be
+ set using the set_text function. All text set is aligned to right.
+
+ :param text: Initial text to be displayed. Default is empty str.
+ """
+ def __init__(self, text: str = ''):
+ super().__init__(text, align='right')
+
+
+class Editor(urwid_readline.ReadlineEdit):
+ """
+ A simple editor modelled using the urwid_readline.ReadlineEdit widget.
+ Mimcs GNU readline shortcuts and provides history support.
+
+ The readline shortcuts can be found below:
+ https://github.com/rr-/urwid_readline#features
+
+ Along with the readline features, this editor also has support for
+ history. Pressing the 'up'/'down' switches between the prev/next messages
+ available in the history.
+
+ Currently there is no support to save the history to a file. The history of
+ previous commands is lost on exit.
+
+ :param parent: Reference to the TUI object.
+ """
+ def __init__(self, parent: App) -> None:
+ super().__init__(caption='> ', multiline=True)
+ self.parent = parent
+ self.history: List[str] = []
+ self.last_index: int = -1
+ self.show_history: bool = False
+
+ def keypress(self, size: Tuple[int, int], key: str) -> Optional[str]:
+ """
+ Handles the keypress on this widget.
+
+ :param size:
+ The current size of the widget.
+ :param key:
+ The key to be handled.
+
+ :return: Unhandled key if any.
+ """
+ msg = self.get_edit_text()
+ if key == 'up' and not msg:
+ # Show the history when 'up arrow' is pressed with no input text.
+ # NOTE: The show_history logic is necessary because in 'multiline'
+ # mode (which we use) 'up arrow' is used to move between lines.
+ if not self.history:
+ return None
+ self.show_history = True
+ last_msg = self.history[self.last_index]
+ self.set_edit_text(last_msg)
+ self.edit_pos = len(last_msg)
+ elif key == 'up' and self.show_history:
+ self.last_index = max(self.last_index - 1, -len(self.history))
+ self.set_edit_text(self.history[self.last_index])
+ self.edit_pos = len(self.history[self.last_index])
+ elif key == 'down' and self.show_history:
+ if self.last_index == -1:
+ self.set_edit_text('')
+ self.show_history = False
+ else:
+ self.last_index += 1
+ self.set_edit_text(self.history[self.last_index])
+ self.edit_pos = len(self.history[self.last_index])
+ elif key == 'meta enter':
+ # When using multiline, enter inserts a new line into the editor
+ # send the input to the server on alt + enter
+ self.parent.cb_send_to_server(msg)
+ self.history.append(msg)
+ self.set_edit_text('')
+ self.last_index = -1
+ self.show_history = False
+ else:
+ self.show_history = False
+ self.last_index = -1
+ return cast(Optional[str], super().keypress(size, key))
+ return None
+
+
+class EditorWidget(urwid.Filler):
+ """
+ Wrapper around the editor widget.
+
+ The Editor is a flow widget and has to wrapped inside a box widget.
+ This class wraps the Editor inside filler widget.
+
+ :param parent: Reference to the TUI object.
+ """
+ def __init__(self, parent: App) -> None:
+ super().__init__(Editor(parent), valign='top')
+
+
+class HistoryBox(urwid.ListBox):
+ """
+ This widget is modelled using the ListBox widget, contains the list of
+ all messages both QMP messages and log messsages to be shown in the TUI.
+
+ The messages are urwid.Text widgets. On every append of a message, the
+ focus is shifted to the last appended message.
+
+ :param parent: Reference to the TUI object.
+ """
+ def __init__(self, parent: App) -> None:
+ self.parent = parent
+ self.history = urwid.SimpleFocusListWalker([])
+ super().__init__(self.history)
+
+ def add_to_history(self,
+ history: Union[str, List[Tuple[str, str]]]) -> None:
+ """
+ Appends a message to the list and set the focus to the last appended
+ message.
+
+ :param history:
+ The history item(message/event) to be appended to the list.
+ """
+ self.history.append(urwid.Text(history))
+ self.history.set_focus(len(self.history) - 1)
+
+ def mouse_event(self, size: Tuple[int, int], _event: str, button: float,
+ _x: int, _y: int, focus: bool) -> None:
+ # Unfortunately there are no urwid constants that represent the mouse
+ # events.
+ if button == 4: # Scroll up event
+ super().keypress(size, 'up')
+ elif button == 5: # Scroll down event
+ super().keypress(size, 'down')
+
+
+class HistoryWindow(urwid.Frame):
+ """
+ This window composes the HistoryBox and EditorWidget in a horizontal split.
+ By default the first focus is given to the history box.
+
+ :param parent: Reference to the TUI object.
+ """
+ def __init__(self, parent: App) -> None:
+ self.parent = parent
+ self.editor_widget = EditorWidget(parent)
+ self.editor = urwid.LineBox(self.editor_widget)
+ self.history = HistoryBox(parent)
+ self.body = urwid.Pile([('weight', 80, self.history),
+ ('weight', 20, self.editor)])
+ super().__init__(self.body)
+ urwid.connect_signal(self.parent, UPDATE_MSG, self.cb_add_to_history)
+
+ def cb_add_to_history(self, msg: str, level: Optional[str] = None) -> None:
+ """
+ Appends a message to the history box
+
+ :param msg:
+ The message to be appended to the history box.
+ :param level:
+ The log level of the message, if it is a log message.
+ """
+ formatted = []
+ if level:
+ msg = f'[{level}]: {msg}'
+ formatted.append((level, msg))
+ else:
+ lexer = lexers.JsonLexer() # pylint: disable=no-member
+ for token in lexer.get_tokens(msg):
+ formatted.append(token)
+ self.history.add_to_history(formatted)
+
+
+class Window(urwid.Frame):
+ """
+ This window is the top most widget of the TUI and will contain other
+ windows. Each child of this widget is responsible for displaying a specific
+ functionality.
+
+ :param parent: Reference to the TUI object.
+ """
+ def __init__(self, parent: App) -> None:
+ self.parent = parent
+ footer = StatusBar()
+ body = HistoryWindow(parent)
+ super().__init__(body, footer=footer)
+
+
+class TUILogHandler(Handler):
+ """
+ This handler routes all the log messages to the TUI screen.
+ It is installed to the root logger to so that the log message from all
+ libraries begin used is routed to the screen.
+
+ :param tui: Reference to the TUI object.
+ """
+ def __init__(self, tui: App) -> None:
+ super().__init__()
+ self.tui = tui
+
+ def emit(self, record: LogRecord) -> None:
+ """
+ Emits a record to the TUI screen.
+
+ Appends the log message to the TUI screen
+ """
+ level = record.levelname
+ msg = record.getMessage()
+ self.tui.add_to_history(msg, level)
+
+
+def main() -> None:
+ """
+ Driver of the whole script, parses arguments, initialize the TUI and
+ the logger.
+ """
+ parser = argparse.ArgumentParser(description='AQMP TUI')
+ parser.add_argument('qmp_server', help='Address of the QMP server. '
+ 'Format <UNIX socket path | TCP addr:port>')
+ parser.add_argument('--num-retries', type=int, default=10,
+ help='Number of times to reconnect before giving up.')
+ parser.add_argument('--retry-delay', type=int,
+ help='Time(s) to wait before next retry. '
+ 'Default action is to wait 2s between each retry.')
+ parser.add_argument('--log-file', help='The Log file name')
+ parser.add_argument('--log-level', default='WARNING',
+ help='Log level <CRITICAL|ERROR|WARNING|INFO|DEBUG|>')
+ parser.add_argument('--asyncio-debug', action='store_true',
+ help='Enable debug mode for asyncio loop. '
+ 'Generates lot of output, makes TUI unusable when '
+ 'logs are logged in the TUI. '
+ 'Use only when logging to a file.')
+ args = parser.parse_args()
+
+ try:
+ address = QEMUMonitorProtocol.parse_address(args.qmp_server)
+ except QMPBadPortError as err:
+ parser.error(str(err))
+
+ app = App(address, args.num_retries, args.retry_delay)
+
+ root_logger = logging.getLogger()
+ root_logger.setLevel(logging.getLevelName(args.log_level))
+
+ if args.log_file:
+ root_logger.addHandler(logging.FileHandler(args.log_file))
+ else:
+ root_logger.addHandler(TUILogHandler(app))
+
+ app.run(args.asyncio_debug)
+
+
+if __name__ == '__main__':
+ main()
diff --git a/python/qemu/aqmp/error.py b/python/qemu/aqmp/error.py
new file mode 100644
index 0000000000..781f49b008
--- /dev/null
+++ b/python/qemu/aqmp/error.py
@@ -0,0 +1,50 @@
+"""
+AQMP Error Classes
+
+This package seeks to provide semantic error classes that are intended
+to be used directly by clients when they would like to handle particular
+semantic failures (e.g. "failed to connect") without needing to know the
+enumeration of possible reasons for that failure.
+
+AQMPError serves as the ancestor for all exceptions raised by this
+package, and is suitable for use in handling semantic errors from this
+library. In most cases, individual public methods will attempt to catch
+and re-encapsulate various exceptions to provide a semantic
+error-handling interface.
+
+.. admonition:: AQMP Exception Hierarchy Reference
+
+ | `Exception`
+ | +-- `AQMPError`
+ | +-- `ConnectError`
+ | +-- `StateError`
+ | +-- `ExecInterruptedError`
+ | +-- `ExecuteError`
+ | +-- `ListenerError`
+ | +-- `ProtocolError`
+ | +-- `DeserializationError`
+ | +-- `UnexpectedTypeError`
+ | +-- `ServerParseError`
+ | +-- `BadReplyError`
+ | +-- `GreetingError`
+ | +-- `NegotiationError`
+"""
+
+
+class AQMPError(Exception):
+ """Abstract error class for all errors originating from this package."""
+
+
+class ProtocolError(AQMPError):
+ """
+ Abstract error class for protocol failures.
+
+ Semantically, these errors are generally the fault of either the
+ protocol server or as a result of a bug in this library.
+
+ :param error_message: Human-readable string describing the error.
+ """
+ def __init__(self, error_message: str):
+ super().__init__(error_message)
+ #: Human-readable error message, without any prefix.
+ self.error_message: str = error_message
diff --git a/python/qemu/aqmp/events.py b/python/qemu/aqmp/events.py
new file mode 100644
index 0000000000..fb81d21610
--- /dev/null
+++ b/python/qemu/aqmp/events.py
@@ -0,0 +1,706 @@
+"""
+AQMP Events and EventListeners
+
+Asynchronous QMP uses `EventListener` objects to listen for events. An
+`EventListener` is a FIFO event queue that can be pre-filtered to listen
+for only specific events. Each `EventListener` instance receives its own
+copy of events that it hears, so events may be consumed without fear or
+worry for depriving other listeners of events they need to hear.
+
+
+EventListener Tutorial
+----------------------
+
+In all of the following examples, we assume that we have a `QMPClient`
+instantiated named ``qmp`` that is already connected.
+
+
+`listener()` context blocks with one name
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+The most basic usage is by using the `listener()` context manager to
+construct them:
+
+.. code:: python
+
+ with qmp.listener('STOP') as listener:
+ await qmp.execute('stop')
+ await listener.get()
+
+The listener is active only for the duration of the ‘with’ block. This
+instance listens only for ‘STOP’ events.
+
+
+`listener()` context blocks with two or more names
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Multiple events can be selected for by providing any ``Iterable[str]``:
+
+.. code:: python
+
+ with qmp.listener(('STOP', 'RESUME')) as listener:
+ await qmp.execute('stop')
+ event = await listener.get()
+ assert event['event'] == 'STOP'
+
+ await qmp.execute('cont')
+ event = await listener.get()
+ assert event['event'] == 'RESUME'
+
+
+`listener()` context blocks with no names
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+By omitting names entirely, you can listen to ALL events.
+
+.. code:: python
+
+ with qmp.listener() as listener:
+ await qmp.execute('stop')
+ event = await listener.get()
+ assert event['event'] == 'STOP'
+
+This isn’t a very good use case for this feature: In a non-trivial
+running system, we may not know what event will arrive next. Grabbing
+the top of a FIFO queue returning multiple kinds of events may be prone
+to error.
+
+
+Using async iterators to retrieve events
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+If you’d like to simply watch what events happen to arrive, you can use
+the listener as an async iterator:
+
+.. code:: python
+
+ with qmp.listener() as listener:
+ async for event in listener:
+ print(f"Event arrived: {event['event']}")
+
+This is analogous to the following code:
+
+.. code:: python
+
+ with qmp.listener() as listener:
+ while True:
+ event = listener.get()
+ print(f"Event arrived: {event['event']}")
+
+This event stream will never end, so these blocks will never terminate.
+
+
+Using asyncio.Task to concurrently retrieve events
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Since a listener’s event stream will never terminate, it is not likely
+useful to use that form in a script. For longer-running clients, we can
+create event handlers by using `asyncio.Task` to create concurrent
+coroutines:
+
+.. code:: python
+
+ async def print_events(listener):
+ try:
+ async for event in listener:
+ print(f"Event arrived: {event['event']}")
+ except asyncio.CancelledError:
+ return
+
+ with qmp.listener() as listener:
+ task = asyncio.Task(print_events(listener))
+ await qmp.execute('stop')
+ await qmp.execute('cont')
+ task.cancel()
+ await task
+
+However, there is no guarantee that these events will be received by the
+time we leave this context block. Once the context block is exited, the
+listener will cease to hear any new events, and becomes inert.
+
+Be mindful of the timing: the above example will *probably*– but does
+not *guarantee*– that both STOP/RESUMED events will be printed. The
+example below outlines how to use listeners outside of a context block.
+
+
+Using `register_listener()` and `remove_listener()`
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+To create a listener with a longer lifetime, beyond the scope of a
+single block, create a listener and then call `register_listener()`:
+
+.. code:: python
+
+ class MyClient:
+ def __init__(self, qmp):
+ self.qmp = qmp
+ self.listener = EventListener()
+
+ async def print_events(self):
+ try:
+ async for event in self.listener:
+ print(f"Event arrived: {event['event']}")
+ except asyncio.CancelledError:
+ return
+
+ async def run(self):
+ self.task = asyncio.Task(self.print_events)
+ self.qmp.register_listener(self.listener)
+ await qmp.execute('stop')
+ await qmp.execute('cont')
+
+ async def stop(self):
+ self.task.cancel()
+ await self.task
+ self.qmp.remove_listener(self.listener)
+
+The listener can be deactivated by using `remove_listener()`. When it is
+removed, any possible pending events are cleared and it can be
+re-registered at a later time.
+
+
+Using the built-in all events listener
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+The `QMPClient` object creates its own default listener named
+:py:obj:`~Events.events` that can be used for the same purpose without
+having to create your own:
+
+.. code:: python
+
+ async def print_events(listener):
+ try:
+ async for event in listener:
+ print(f"Event arrived: {event['event']}")
+ except asyncio.CancelledError:
+ return
+
+ task = asyncio.Task(print_events(qmp.events))
+
+ await qmp.execute('stop')
+ await qmp.execute('cont')
+
+ task.cancel()
+ await task
+
+
+Using both .get() and async iterators
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+The async iterator and `get()` methods pull events from the same FIFO
+queue. If you mix the usage of both, be aware: Events are emitted
+precisely once per listener.
+
+If multiple contexts try to pull events from the same listener instance,
+events are still emitted only precisely once.
+
+This restriction can be lifted by creating additional listeners.
+
+
+Creating multiple listeners
+~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Additional `EventListener` objects can be created at-will. Each one
+receives its own copy of events, with separate FIFO event queues.
+
+.. code:: python
+
+ my_listener = EventListener()
+ qmp.register_listener(my_listener)
+
+ await qmp.execute('stop')
+ copy1 = await my_listener.get()
+ copy2 = await qmp.events.get()
+
+ assert copy1 == copy2
+
+In this example, we await an event from both a user-created
+`EventListener` and the built-in events listener. Both receive the same
+event.
+
+
+Clearing listeners
+~~~~~~~~~~~~~~~~~~
+
+`EventListener` objects can be cleared, clearing all events seen thus far:
+
+.. code:: python
+
+ await qmp.execute('stop')
+ qmp.events.clear()
+ await qmp.execute('cont')
+ event = await qmp.events.get()
+ assert event['event'] == 'RESUME'
+
+`EventListener` objects are FIFO queues. If events are not consumed,
+they will remain in the queue until they are witnessed or discarded via
+`clear()`. FIFO queues will be drained automatically upon leaving a
+context block, or when calling `remove_listener()`.
+
+
+Accessing listener history
+~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+`EventListener` objects record their history. Even after being cleared,
+you can obtain a record of all events seen so far:
+
+.. code:: python
+
+ await qmp.execute('stop')
+ await qmp.execute('cont')
+ qmp.events.clear()
+
+ assert len(qmp.events.history) == 2
+ assert qmp.events.history[0]['event'] == 'STOP'
+ assert qmp.events.history[1]['event'] == 'RESUME'
+
+The history is updated immediately and does not require the event to be
+witnessed first.
+
+
+Using event filters
+~~~~~~~~~~~~~~~~~~~
+
+`EventListener` objects can be given complex filtering criteria if names
+are not sufficient:
+
+.. code:: python
+
+ def job1_filter(event) -> bool:
+ event_data = event.get('data', {})
+ event_job_id = event_data.get('id')
+ return event_job_id == "job1"
+
+ with qmp.listener('JOB_STATUS_CHANGE', job1_filter) as listener:
+ await qmp.execute('blockdev-backup', arguments={'job-id': 'job1', ...})
+ async for event in listener:
+ if event['data']['status'] == 'concluded':
+ break
+
+These filters might be most useful when parameterized. `EventListener`
+objects expect a function that takes only a single argument (the raw
+event, as a `Message`) and returns a bool; True if the event should be
+accepted into the stream. You can create a function that adapts this
+signature to accept configuration parameters:
+
+.. code:: python
+
+ def job_filter(job_id: str) -> EventFilter:
+ def filter(event: Message) -> bool:
+ return event['data']['id'] == job_id
+ return filter
+
+ with qmp.listener('JOB_STATUS_CHANGE', job_filter('job2')) as listener:
+ await qmp.execute('blockdev-backup', arguments={'job-id': 'job2', ...})
+ async for event in listener:
+ if event['data']['status'] == 'concluded':
+ break
+
+
+Activating an existing listener with `listen()`
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Listeners with complex, long configurations can also be created manually
+and activated temporarily by using `listen()` instead of `listener()`:
+
+.. code:: python
+
+ listener = EventListener(('BLOCK_JOB_COMPLETED', 'BLOCK_JOB_CANCELLED',
+ 'BLOCK_JOB_ERROR', 'BLOCK_JOB_READY',
+ 'BLOCK_JOB_PENDING', 'JOB_STATUS_CHANGE'))
+
+ with qmp.listen(listener):
+ await qmp.execute('blockdev-backup', arguments={'job-id': 'job3', ...})
+ async for event in listener:
+ print(event)
+ if event['event'] == 'BLOCK_JOB_COMPLETED':
+ break
+
+Any events that are not witnessed by the time the block is left will be
+cleared from the queue; entering the block is an implicit
+`register_listener()` and leaving the block is an implicit
+`remove_listener()`.
+
+
+Activating multiple existing listeners with `listen()`
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+While `listener()` is only capable of creating a single listener,
+`listen()` is capable of activating multiple listeners simultaneously:
+
+.. code:: python
+
+ def job_filter(job_id: str) -> EventFilter:
+ def filter(event: Message) -> bool:
+ return event['data']['id'] == job_id
+ return filter
+
+ jobA = EventListener('JOB_STATUS_CHANGE', job_filter('jobA'))
+ jobB = EventListener('JOB_STATUS_CHANGE', job_filter('jobB'))
+
+ with qmp.listen(jobA, jobB):
+ qmp.execute('blockdev-create', arguments={'job-id': 'jobA', ...})
+ qmp.execute('blockdev-create', arguments={'job-id': 'jobB', ...})
+
+ async for event in jobA.get():
+ if event['data']['status'] == 'concluded':
+ break
+ async for event in jobB.get():
+ if event['data']['status'] == 'concluded':
+ break
+
+
+Extending the `EventListener` class
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+In the case that a more specialized `EventListener` is desired to
+provide either more functionality or more compact syntax for specialized
+cases, it can be extended.
+
+One of the key methods to extend or override is
+:py:meth:`~EventListener.accept()`. The default implementation checks an
+incoming message for:
+
+1. A qualifying name, if any :py:obj:`~EventListener.names` were
+ specified at initialization time
+2. That :py:obj:`~EventListener.event_filter()` returns True.
+
+This can be modified however you see fit to change the criteria for
+inclusion in the stream.
+
+For convenience, a ``JobListener`` class could be created that simply
+bakes in configuration so it does not need to be repeated:
+
+.. code:: python
+
+ class JobListener(EventListener):
+ def __init__(self, job_id: str):
+ super().__init__(('BLOCK_JOB_COMPLETED', 'BLOCK_JOB_CANCELLED',
+ 'BLOCK_JOB_ERROR', 'BLOCK_JOB_READY',
+ 'BLOCK_JOB_PENDING', 'JOB_STATUS_CHANGE'))
+ self.job_id = job_id
+
+ def accept(self, event) -> bool:
+ if not super().accept(event):
+ return False
+ if event['event'] in ('BLOCK_JOB_PENDING', 'JOB_STATUS_CHANGE'):
+ return event['data']['id'] == job_id
+ return event['data']['device'] == job_id
+
+From here on out, you can conjure up a custom-purpose listener that
+listens only for job-related events for a specific job-id easily:
+
+.. code:: python
+
+ listener = JobListener('job4')
+ with qmp.listener(listener):
+ await qmp.execute('blockdev-backup', arguments={'job-id': 'job4', ...})
+ async for event in listener:
+ print(event)
+ if event['event'] == 'BLOCK_JOB_COMPLETED':
+ break
+
+
+Experimental Interfaces & Design Issues
+---------------------------------------
+
+These interfaces are not ones I am sure I will keep or otherwise modify
+heavily.
+
+qmp.listener()’s type signature
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+`listener()` does not return anything, because it was assumed the caller
+already had a handle to the listener. However, for
+``qmp.listener(EventListener())`` forms, the caller will not have saved
+a handle to the listener.
+
+Because this function can accept *many* listeners, I found it hard to
+accurately type in a way where it could be used in both “one” or “many”
+forms conveniently and in a statically type-safe manner.
+
+Ultimately, I removed the return altogether, but perhaps with more time
+I can work out a way to re-add it.
+
+
+API Reference
+-------------
+
+"""
+
+import asyncio
+from contextlib import contextmanager
+import logging
+from typing import (
+ AsyncIterator,
+ Callable,
+ Iterable,
+ Iterator,
+ List,
+ Optional,
+ Set,
+ Tuple,
+ Union,
+)
+
+from .error import AQMPError
+from .message import Message
+
+
+EventNames = Union[str, Iterable[str], None]
+EventFilter = Callable[[Message], bool]
+
+
+class ListenerError(AQMPError):
+ """
+ Generic error class for `EventListener`-related problems.
+ """
+
+
+class EventListener:
+ """
+ Selectively listens for events with runtime configurable filtering.
+
+ This class is designed to be directly usable for the most common cases,
+ but it can be extended to provide more rigorous control.
+
+ :param names:
+ One or more names of events to listen for.
+ When not provided, listen for ALL events.
+ :param event_filter:
+ An optional event filtering function.
+ When names are also provided, this acts as a secondary filter.
+
+ When ``names`` and ``event_filter`` are both provided, the names
+ will be filtered first, and then the filter function will be called
+ second. The event filter function can assume that the format of the
+ event is a known format.
+ """
+ def __init__(
+ self,
+ names: EventNames = None,
+ event_filter: Optional[EventFilter] = None,
+ ):
+ # Queue of 'heard' events yet to be witnessed by a caller.
+ self._queue: 'asyncio.Queue[Message]' = asyncio.Queue()
+
+ # Intended as a historical record, NOT a processing queue or backlog.
+ self._history: List[Message] = []
+
+ #: Primary event filter, based on one or more event names.
+ self.names: Set[str] = set()
+ if isinstance(names, str):
+ self.names.add(names)
+ elif names is not None:
+ self.names.update(names)
+
+ #: Optional, secondary event filter.
+ self.event_filter: Optional[EventFilter] = event_filter
+
+ @property
+ def history(self) -> Tuple[Message, ...]:
+ """
+ A read-only history of all events seen so far.
+
+ This represents *every* event, including those not yet witnessed
+ via `get()` or ``async for``. It persists between `clear()`
+ calls and is immutable.
+ """
+ return tuple(self._history)
+
+ def accept(self, event: Message) -> bool:
+ """
+ Determine if this listener accepts this event.
+
+ This method determines which events will appear in the stream.
+ The default implementation simply checks the event against the
+ list of names and the event_filter to decide if this
+ `EventListener` accepts a given event. It can be
+ overridden/extended to provide custom listener behavior.
+
+ User code is not expected to need to invoke this method.
+
+ :param event: The event under consideration.
+ :return: `True`, if this listener accepts this event.
+ """
+ name_ok = (not self.names) or (event['event'] in self.names)
+ return name_ok and (
+ (not self.event_filter) or self.event_filter(event)
+ )
+
+ async def put(self, event: Message) -> None:
+ """
+ Conditionally put a new event into the FIFO queue.
+
+ This method is not designed to be invoked from user code, and it
+ should not need to be overridden. It is a public interface so
+ that `QMPClient` has an interface by which it can inform
+ registered listeners of new events.
+
+ The event will be put into the queue if
+ :py:meth:`~EventListener.accept()` returns `True`.
+
+ :param event: The new event to put into the FIFO queue.
+ """
+ if not self.accept(event):
+ return
+
+ self._history.append(event)
+ await self._queue.put(event)
+
+ async def get(self) -> Message:
+ """
+ Wait for the very next event in this stream.
+
+ If one is already available, return that one.
+ """
+ return await self._queue.get()
+
+ def clear(self) -> None:
+ """
+ Clear this listener of all pending events.
+
+ Called when an `EventListener` is being unregistered, this clears the
+ pending FIFO queue synchronously. It can be also be used to
+ manually clear any pending events, if desired.
+
+ .. warning::
+ Take care when discarding events. Cleared events will be
+ silently tossed on the floor. All events that were ever
+ accepted by this listener are visible in `history()`.
+ """
+ while True:
+ try:
+ self._queue.get_nowait()
+ except asyncio.QueueEmpty:
+ break
+
+ def __aiter__(self) -> AsyncIterator[Message]:
+ return self
+
+ async def __anext__(self) -> Message:
+ """
+ Enables the `EventListener` to function as an async iterator.
+
+ It may be used like this:
+
+ .. code:: python
+
+ async for event in listener:
+ print(event)
+
+ These iterators will never terminate of their own accord; you
+ must provide break conditions or otherwise prepare to run them
+ in an `asyncio.Task` that can be cancelled.
+ """
+ return await self.get()
+
+
+class Events:
+ """
+ Events is a mix-in class that adds event functionality to the QMP class.
+
+ It's designed specifically as a mix-in for `QMPClient`, and it
+ relies upon the class it is being mixed into having a 'logger'
+ property.
+ """
+ def __init__(self) -> None:
+ self._listeners: List[EventListener] = []
+
+ #: Default, all-events `EventListener`.
+ self.events: EventListener = EventListener()
+ self.register_listener(self.events)
+
+ # Parent class needs to have a logger
+ self.logger: logging.Logger
+
+ async def _event_dispatch(self, msg: Message) -> None:
+ """
+ Given a new event, propagate it to all of the active listeners.
+
+ :param msg: The event to propagate.
+ """
+ for listener in self._listeners:
+ await listener.put(msg)
+
+ def register_listener(self, listener: EventListener) -> None:
+ """
+ Register and activate an `EventListener`.
+
+ :param listener: The listener to activate.
+ :raise ListenerError: If the given listener is already registered.
+ """
+ if listener in self._listeners:
+ raise ListenerError("Attempted to re-register existing listener")
+ self.logger.debug("Registering %s.", str(listener))
+ self._listeners.append(listener)
+
+ def remove_listener(self, listener: EventListener) -> None:
+ """
+ Unregister and deactivate an `EventListener`.
+
+ The removed listener will have its pending events cleared via
+ `clear()`. The listener can be re-registered later when
+ desired.
+
+ :param listener: The listener to deactivate.
+ :raise ListenerError: If the given listener is not registered.
+ """
+ if listener == self.events:
+ raise ListenerError("Cannot remove the default listener.")
+ self.logger.debug("Removing %s.", str(listener))
+ listener.clear()
+ self._listeners.remove(listener)
+
+ @contextmanager
+ def listen(self, *listeners: EventListener) -> Iterator[None]:
+ r"""
+ Context manager: Temporarily listen with an `EventListener`.
+
+ Accepts one or more `EventListener` objects and registers them,
+ activating them for the duration of the context block.
+
+ `EventListener` objects will have any pending events in their
+ FIFO queue cleared upon exiting the context block, when they are
+ deactivated.
+
+ :param \*listeners: One or more EventListeners to activate.
+ :raise ListenerError: If the given listener(s) are already active.
+ """
+ _added = []
+
+ try:
+ for listener in listeners:
+ self.register_listener(listener)
+ _added.append(listener)
+
+ yield
+
+ finally:
+ for listener in _added:
+ self.remove_listener(listener)
+
+ @contextmanager
+ def listener(
+ self,
+ names: EventNames = (),
+ event_filter: Optional[EventFilter] = None
+ ) -> Iterator[EventListener]:
+ """
+ Context manager: Temporarily listen with a new `EventListener`.
+
+ Creates an `EventListener` object and registers it, activating
+ it for the duration of the context block.
+
+ :param names:
+ One or more names of events to listen for.
+ When not provided, listen for ALL events.
+ :param event_filter:
+ An optional event filtering function.
+ When names are also provided, this acts as a secondary filter.
+
+ :return: The newly created and active `EventListener`.
+ """
+ listener = EventListener(names, event_filter)
+ with self.listen(listener):
+ yield listener
diff --git a/python/qemu/aqmp/message.py b/python/qemu/aqmp/message.py
new file mode 100644
index 0000000000..f76ccc9074
--- /dev/null
+++ b/python/qemu/aqmp/message.py
@@ -0,0 +1,209 @@
+"""
+QMP Message Format
+
+This module provides the `Message` class, which represents a single QMP
+message sent to or from the server.
+"""
+
+import json
+from json import JSONDecodeError
+from typing import (
+ Dict,
+ Iterator,
+ Mapping,
+ MutableMapping,
+ Optional,
+ Union,
+)
+
+from .error import ProtocolError
+
+
+class Message(MutableMapping[str, object]):
+ """
+ Represents a single QMP protocol message.
+
+ QMP uses JSON objects as its basic communicative unit; so this
+ Python object is a :py:obj:`~collections.abc.MutableMapping`. It may
+ be instantiated from either another mapping (like a `dict`), or from
+ raw `bytes` that still need to be deserialized.
+
+ Once instantiated, it may be treated like any other MutableMapping::
+
+ >>> msg = Message(b'{"hello": "world"}')
+ >>> assert msg['hello'] == 'world'
+ >>> msg['id'] = 'foobar'
+ >>> print(msg)
+ {
+ "hello": "world",
+ "id": "foobar"
+ }
+
+ It can be converted to `bytes`::
+
+ >>> msg = Message({"hello": "world"})
+ >>> print(bytes(msg))
+ b'{"hello":"world","id":"foobar"}'
+
+ Or back into a garden-variety `dict`::
+
+ >>> dict(msg)
+ {'hello': 'world'}
+
+
+ :param value: Initial value, if any.
+ :param eager:
+ When `True`, attempt to serialize or deserialize the initial value
+ immediately, so that conversion exceptions are raised during
+ the call to ``__init__()``.
+ """
+ # pylint: disable=too-many-ancestors
+
+ def __init__(self,
+ value: Union[bytes, Mapping[str, object]] = b'{}', *,
+ eager: bool = True):
+ self._data: Optional[bytes] = None
+ self._obj: Optional[Dict[str, object]] = None
+
+ if isinstance(value, bytes):
+ self._data = value
+ if eager:
+ self._obj = self._deserialize(self._data)
+ else:
+ self._obj = dict(value)
+ if eager:
+ self._data = self._serialize(self._obj)
+
+ # Methods necessary to implement the MutableMapping interface, see:
+ # https://docs.python.org/3/library/collections.abc.html#collections.abc.MutableMapping
+
+ # We get pop, popitem, clear, update, setdefault, __contains__,
+ # keys, items, values, get, __eq__ and __ne__ for free.
+
+ def __getitem__(self, key: str) -> object:
+ return self._object[key]
+
+ def __setitem__(self, key: str, value: object) -> None:
+ self._object[key] = value
+ self._data = None
+
+ def __delitem__(self, key: str) -> None:
+ del self._object[key]
+ self._data = None
+
+ def __iter__(self) -> Iterator[str]:
+ return iter(self._object)
+
+ def __len__(self) -> int:
+ return len(self._object)
+
+ # Dunder methods not related to MutableMapping:
+
+ def __repr__(self) -> str:
+ if self._obj is not None:
+ return f"Message({self._object!r})"
+ return f"Message({bytes(self)!r})"
+
+ def __str__(self) -> str:
+ """Pretty-printed representation of this QMP message."""
+ return json.dumps(self._object, indent=2)
+
+ def __bytes__(self) -> bytes:
+ """bytes representing this QMP message."""
+ if self._data is None:
+ self._data = self._serialize(self._obj or {})
+ return self._data
+
+ # Conversion Methods
+
+ @property
+ def _object(self) -> Dict[str, object]:
+ """
+ A `dict` representing this QMP message.
+
+ Generated on-demand, if required. This property is private
+ because it returns an object that could be used to invalidate
+ the internal state of the `Message` object.
+ """
+ if self._obj is None:
+ self._obj = self._deserialize(self._data or b'{}')
+ return self._obj
+
+ @classmethod
+ def _serialize(cls, value: object) -> bytes:
+ """
+ Serialize a JSON object as `bytes`.
+
+ :raise ValueError: When the object cannot be serialized.
+ :raise TypeError: When the object cannot be serialized.
+
+ :return: `bytes` ready to be sent over the wire.
+ """
+ return json.dumps(value, separators=(',', ':')).encode('utf-8')
+
+ @classmethod
+ def _deserialize(cls, data: bytes) -> Dict[str, object]:
+ """
+ Deserialize JSON `bytes` into a native Python `dict`.
+
+ :raise DeserializationError:
+ If JSON deserialization fails for any reason.
+ :raise UnexpectedTypeError:
+ If the data does not represent a JSON object.
+
+ :return: A `dict` representing this QMP message.
+ """
+ try:
+ obj = json.loads(data)
+ except JSONDecodeError as err:
+ emsg = "Failed to deserialize QMP message."
+ raise DeserializationError(emsg, data) from err
+ if not isinstance(obj, dict):
+ raise UnexpectedTypeError(
+ "QMP message is not a JSON object.",
+ obj
+ )
+ return obj
+
+
+class DeserializationError(ProtocolError):
+ """
+ A QMP message was not understood as JSON.
+
+ When this Exception is raised, ``__cause__`` will be set to the
+ `json.JSONDecodeError` Exception, which can be interrogated for
+ further details.
+
+ :param error_message: Human-readable string describing the error.
+ :param raw: The raw `bytes` that prompted the failure.
+ """
+ def __init__(self, error_message: str, raw: bytes):
+ super().__init__(error_message)
+ #: The raw `bytes` that were not understood as JSON.
+ self.raw: bytes = raw
+
+ def __str__(self) -> str:
+ return "\n".join([
+ super().__str__(),
+ f" raw bytes were: {str(self.raw)}",
+ ])
+
+
+class UnexpectedTypeError(ProtocolError):
+ """
+ A QMP message was JSON, but not a JSON object.
+
+ :param error_message: Human-readable string describing the error.
+ :param value: The deserialized JSON value that wasn't an object.
+ """
+ def __init__(self, error_message: str, value: object):
+ super().__init__(error_message)
+ #: The JSON value that was expected to be an object.
+ self.value: object = value
+
+ def __str__(self) -> str:
+ strval = json.dumps(self.value, indent=2)
+ return "\n".join([
+ super().__str__(),
+ f" json value was: {strval}",
+ ])
diff --git a/python/qemu/aqmp/models.py b/python/qemu/aqmp/models.py
new file mode 100644
index 0000000000..24c94123ac
--- /dev/null
+++ b/python/qemu/aqmp/models.py
@@ -0,0 +1,133 @@
+"""
+QMP Data Models
+
+This module provides simplistic data classes that represent the few
+structures that the QMP spec mandates; they are used to verify incoming
+data to make sure it conforms to spec.
+"""
+# pylint: disable=too-few-public-methods
+
+from collections import abc
+from typing import (
+ Any,
+ Mapping,
+ Optional,
+ Sequence,
+)
+
+
+class Model:
+ """
+ Abstract data model, representing some QMP object of some kind.
+
+ :param raw: The raw object to be validated.
+ :raise KeyError: If any required fields are absent.
+ :raise TypeError: If any required fields have the wrong type.
+ """
+ def __init__(self, raw: Mapping[str, Any]):
+ self._raw = raw
+
+ def _check_key(self, key: str) -> None:
+ if key not in self._raw:
+ raise KeyError(f"'{self._name}' object requires '{key}' member")
+
+ def _check_value(self, key: str, type_: type, typestr: str) -> None:
+ assert key in self._raw
+ if not isinstance(self._raw[key], type_):
+ raise TypeError(
+ f"'{self._name}' member '{key}' must be a {typestr}"
+ )
+
+ def _check_member(self, key: str, type_: type, typestr: str) -> None:
+ self._check_key(key)
+ self._check_value(key, type_, typestr)
+
+ @property
+ def _name(self) -> str:
+ return type(self).__name__
+
+ def __repr__(self) -> str:
+ return f"{self._name}({self._raw!r})"
+
+
+class Greeting(Model):
+ """
+ Defined in qmp-spec.txt, section 2.2, "Server Greeting".
+
+ :param raw: The raw Greeting object.
+ :raise KeyError: If any required fields are absent.
+ :raise TypeError: If any required fields have the wrong type.
+ """
+ def __init__(self, raw: Mapping[str, Any]):
+ super().__init__(raw)
+ #: 'QMP' member
+ self.QMP: QMPGreeting # pylint: disable=invalid-name
+
+ self._check_member('QMP', abc.Mapping, "JSON object")
+ self.QMP = QMPGreeting(self._raw['QMP'])
+
+
+class QMPGreeting(Model):
+ """
+ Defined in qmp-spec.txt, section 2.2, "Server Greeting".
+
+ :param raw: The raw QMPGreeting object.
+ :raise KeyError: If any required fields are absent.
+ :raise TypeError: If any required fields have the wrong type.
+ """
+ def __init__(self, raw: Mapping[str, Any]):
+ super().__init__(raw)
+ #: 'version' member
+ self.version: Mapping[str, object]
+ #: 'capabilities' member
+ self.capabilities: Sequence[object]
+
+ self._check_member('version', abc.Mapping, "JSON object")
+ self.version = self._raw['version']
+
+ self._check_member('capabilities', abc.Sequence, "JSON array")
+ self.capabilities = self._raw['capabilities']
+
+
+class ErrorResponse(Model):
+ """
+ Defined in qmp-spec.txt, section 2.4.2, "error".
+
+ :param raw: The raw ErrorResponse object.
+ :raise KeyError: If any required fields are absent.
+ :raise TypeError: If any required fields have the wrong type.
+ """
+ def __init__(self, raw: Mapping[str, Any]):
+ super().__init__(raw)
+ #: 'error' member
+ self.error: ErrorInfo
+ #: 'id' member
+ self.id: Optional[object] = None # pylint: disable=invalid-name
+
+ self._check_member('error', abc.Mapping, "JSON object")
+ self.error = ErrorInfo(self._raw['error'])
+
+ if 'id' in raw:
+ self.id = raw['id']
+
+
+class ErrorInfo(Model):
+ """
+ Defined in qmp-spec.txt, section 2.4.2, "error".
+
+ :param raw: The raw ErrorInfo object.
+ :raise KeyError: If any required fields are absent.
+ :raise TypeError: If any required fields have the wrong type.
+ """
+ def __init__(self, raw: Mapping[str, Any]):
+ super().__init__(raw)
+ #: 'class' member, with an underscore to avoid conflicts in Python.
+ self.class_: str
+ #: 'desc' member
+ self.desc: str
+
+ self._check_member('class', str, "string")
+ self.class_ = self._raw['class']
+
+ self._check_member('desc', str, "string")
+ self.desc = self._raw['desc']
diff --git a/python/qemu/aqmp/protocol.py b/python/qemu/aqmp/protocol.py
new file mode 100644
index 0000000000..32e78749c1
--- /dev/null
+++ b/python/qemu/aqmp/protocol.py
@@ -0,0 +1,902 @@
+"""
+Generic Asynchronous Message-based Protocol Support
+
+This module provides a generic framework for sending and receiving
+messages over an asyncio stream. `AsyncProtocol` is an abstract class
+that implements the core mechanisms of a simple send/receive protocol,
+and is designed to be extended.
+
+In this package, it is used as the implementation for the `QMPClient`
+class.
+"""
+
+import asyncio
+from asyncio import StreamReader, StreamWriter
+from enum import Enum
+from functools import wraps
+import logging
+from ssl import SSLContext
+from typing import (
+ Any,
+ Awaitable,
+ Callable,
+ Generic,
+ List,
+ Optional,
+ Tuple,
+ TypeVar,
+ Union,
+ cast,
+)
+
+from .error import AQMPError
+from .util import (
+ bottom_half,
+ create_task,
+ exception_summary,
+ flush,
+ is_closing,
+ pretty_traceback,
+ upper_half,
+ wait_closed,
+)
+
+
+T = TypeVar('T')
+_TaskFN = Callable[[], Awaitable[None]] # aka ``async def func() -> None``
+_FutureT = TypeVar('_FutureT', bound=Optional['asyncio.Future[Any]'])
+
+
+class Runstate(Enum):
+ """Protocol session runstate."""
+
+ #: Fully quiesced and disconnected.
+ IDLE = 0
+ #: In the process of connecting or establishing a session.
+ CONNECTING = 1
+ #: Fully connected and active session.
+ RUNNING = 2
+ #: In the process of disconnecting.
+ #: Runstate may be returned to `IDLE` by calling `disconnect()`.
+ DISCONNECTING = 3
+
+
+class ConnectError(AQMPError):
+ """
+ Raised when the initial connection process has failed.
+
+ This Exception always wraps a "root cause" exception that can be
+ interrogated for additional information.
+
+ :param error_message: Human-readable string describing the error.
+ :param exc: The root-cause exception.
+ """
+ def __init__(self, error_message: str, exc: Exception):
+ super().__init__(error_message)
+ #: Human-readable error string
+ self.error_message: str = error_message
+ #: Wrapped root cause exception
+ self.exc: Exception = exc
+
+ def __str__(self) -> str:
+ return f"{self.error_message}: {self.exc!s}"
+
+
+class StateError(AQMPError):
+ """
+ An API command (connect, execute, etc) was issued at an inappropriate time.
+
+ This error is raised when a command like
+ :py:meth:`~AsyncProtocol.connect()` is issued at an inappropriate
+ time.
+
+ :param error_message: Human-readable string describing the state violation.
+ :param state: The actual `Runstate` seen at the time of the violation.
+ :param required: The `Runstate` required to process this command.
+ """
+ def __init__(self, error_message: str,
+ state: Runstate, required: Runstate):
+ super().__init__(error_message)
+ self.error_message = error_message
+ self.state = state
+ self.required = required
+
+
+F = TypeVar('F', bound=Callable[..., Any]) # pylint: disable=invalid-name
+
+
+# Don't Panic.
+def require(required_state: Runstate) -> Callable[[F], F]:
+ """
+ Decorator: protect a method so it can only be run in a certain `Runstate`.
+
+ :param required_state: The `Runstate` required to invoke this method.
+ :raise StateError: When the required `Runstate` is not met.
+ """
+ def _decorator(func: F) -> F:
+ # _decorator is the decorator that is built by calling the
+ # require() decorator factory; e.g.:
+ #
+ # @require(Runstate.IDLE) def foo(): ...
+ # will replace 'foo' with the result of '_decorator(foo)'.
+
+ @wraps(func)
+ def _wrapper(proto: 'AsyncProtocol[Any]',
+ *args: Any, **kwargs: Any) -> Any:
+ # _wrapper is the function that gets executed prior to the
+ # decorated method.
+
+ name = type(proto).__name__
+
+ if proto.runstate != required_state:
+ if proto.runstate == Runstate.CONNECTING:
+ emsg = f"{name} is currently connecting."
+ elif proto.runstate == Runstate.DISCONNECTING:
+ emsg = (f"{name} is disconnecting."
+ " Call disconnect() to return to IDLE state.")
+ elif proto.runstate == Runstate.RUNNING:
+ emsg = f"{name} is already connected and running."
+ elif proto.runstate == Runstate.IDLE:
+ emsg = f"{name} is disconnected and idle."
+ else:
+ assert False
+ raise StateError(emsg, proto.runstate, required_state)
+ # No StateError, so call the wrapped method.
+ return func(proto, *args, **kwargs)
+
+ # Return the decorated method;
+ # Transforming Func to Decorated[Func].
+ return cast(F, _wrapper)
+
+ # Return the decorator instance from the decorator factory. Phew!
+ return _decorator
+
+
+class AsyncProtocol(Generic[T]):
+ """
+ AsyncProtocol implements a generic async message-based protocol.
+
+ This protocol assumes the basic unit of information transfer between
+ client and server is a "message", the details of which are left up
+ to the implementation. It assumes the sending and receiving of these
+ messages is full-duplex and not necessarily correlated; i.e. it
+ supports asynchronous inbound messages.
+
+ It is designed to be extended by a specific protocol which provides
+ the implementations for how to read and send messages. These must be
+ defined in `_do_recv()` and `_do_send()`, respectively.
+
+ Other callbacks have a default implementation, but are intended to be
+ either extended or overridden:
+
+ - `_establish_session`:
+ The base implementation starts the reader/writer tasks.
+ A protocol implementation can override this call, inserting
+ actions to be taken prior to starting the reader/writer tasks
+ before the super() call; actions needing to occur afterwards
+ can be written after the super() call.
+ - `_on_message`:
+ Actions to be performed when a message is received.
+ - `_cb_outbound`:
+ Logging/Filtering hook for all outbound messages.
+ - `_cb_inbound`:
+ Logging/Filtering hook for all inbound messages.
+ This hook runs *before* `_on_message()`.
+
+ :param name:
+ Name used for logging messages, if any. By default, messages
+ will log to 'qemu.aqmp.protocol', but each individual connection
+ can be given its own logger by giving it a name; messages will
+ then log to 'qemu.aqmp.protocol.${name}'.
+ """
+ # pylint: disable=too-many-instance-attributes
+
+ #: Logger object for debugging messages from this connection.
+ logger = logging.getLogger(__name__)
+
+ # Maximum allowable size of read buffer
+ _limit = (64 * 1024)
+
+ # -------------------------
+ # Section: Public interface
+ # -------------------------
+
+ def __init__(self, name: Optional[str] = None) -> None:
+ #: The nickname for this connection, if any.
+ self.name: Optional[str] = name
+ if self.name is not None:
+ self.logger = self.logger.getChild(self.name)
+
+ # stream I/O
+ self._reader: Optional[StreamReader] = None
+ self._writer: Optional[StreamWriter] = None
+
+ # Outbound Message queue
+ self._outgoing: asyncio.Queue[T]
+
+ # Special, long-running tasks:
+ self._reader_task: Optional[asyncio.Future[None]] = None
+ self._writer_task: Optional[asyncio.Future[None]] = None
+
+ # Aggregate of the above two tasks, used for Exception management.
+ self._bh_tasks: Optional[asyncio.Future[Tuple[None, None]]] = None
+
+ #: Disconnect task. The disconnect implementation runs in a task
+ #: so that asynchronous disconnects (initiated by the
+ #: reader/writer) are allowed to wait for the reader/writers to
+ #: exit.
+ self._dc_task: Optional[asyncio.Future[None]] = None
+
+ self._runstate = Runstate.IDLE
+ self._runstate_changed: Optional[asyncio.Event] = None
+
+ def __repr__(self) -> str:
+ cls_name = type(self).__name__
+ tokens = []
+ if self.name is not None:
+ tokens.append(f"name={self.name!r}")
+ tokens.append(f"runstate={self.runstate.name}")
+ return f"<{cls_name} {' '.join(tokens)}>"
+
+ @property # @upper_half
+ def runstate(self) -> Runstate:
+ """The current `Runstate` of the connection."""
+ return self._runstate
+
+ @upper_half
+ async def runstate_changed(self) -> Runstate:
+ """
+ Wait for the `runstate` to change, then return that runstate.
+ """
+ await self._runstate_event.wait()
+ return self.runstate
+
+ @upper_half
+ @require(Runstate.IDLE)
+ async def accept(self, address: Union[str, Tuple[str, int]],
+ ssl: Optional[SSLContext] = None) -> None:
+ """
+ Accept a connection and begin processing message queues.
+
+ If this call fails, `runstate` is guaranteed to be set back to `IDLE`.
+
+ :param address:
+ Address to listen to; UNIX socket path or TCP address/port.
+ :param ssl: SSL context to use, if any.
+
+ :raise StateError: When the `Runstate` is not `IDLE`.
+ :raise ConnectError: If a connection could not be accepted.
+ """
+ await self._new_session(address, ssl, accept=True)
+
+ @upper_half
+ @require(Runstate.IDLE)
+ async def connect(self, address: Union[str, Tuple[str, int]],
+ ssl: Optional[SSLContext] = None) -> None:
+ """
+ Connect to the server and begin processing message queues.
+
+ If this call fails, `runstate` is guaranteed to be set back to `IDLE`.
+
+ :param address:
+ Address to connect to; UNIX socket path or TCP address/port.
+ :param ssl: SSL context to use, if any.
+
+ :raise StateError: When the `Runstate` is not `IDLE`.
+ :raise ConnectError: If a connection cannot be made to the server.
+ """
+ await self._new_session(address, ssl)
+
+ @upper_half
+ async def disconnect(self) -> None:
+ """
+ Disconnect and wait for all tasks to fully stop.
+
+ If there was an exception that caused the reader/writers to
+ terminate prematurely, it will be raised here.
+
+ :raise Exception: When the reader or writer terminate unexpectedly.
+ """
+ self.logger.debug("disconnect() called.")
+ self._schedule_disconnect()
+ await self._wait_disconnect()
+
+ # --------------------------
+ # Section: Session machinery
+ # --------------------------
+
+ @property
+ def _runstate_event(self) -> asyncio.Event:
+ # asyncio.Event() objects should not be created prior to entrance into
+ # an event loop, so we can ensure we create it in the correct context.
+ # Create it on-demand *only* at the behest of an 'async def' method.
+ if not self._runstate_changed:
+ self._runstate_changed = asyncio.Event()
+ return self._runstate_changed
+
+ @upper_half
+ @bottom_half
+ def _set_state(self, state: Runstate) -> None:
+ """
+ Change the `Runstate` of the protocol connection.
+
+ Signals the `runstate_changed` event.
+ """
+ if state == self._runstate:
+ return
+
+ self.logger.debug("Transitioning from '%s' to '%s'.",
+ str(self._runstate), str(state))
+ self._runstate = state
+ self._runstate_event.set()
+ self._runstate_event.clear()
+
+ @upper_half
+ async def _new_session(self,
+ address: Union[str, Tuple[str, int]],
+ ssl: Optional[SSLContext] = None,
+ accept: bool = False) -> None:
+ """
+ Establish a new connection and initialize the session.
+
+ Connect or accept a new connection, then begin the protocol
+ session machinery. If this call fails, `runstate` is guaranteed
+ to be set back to `IDLE`.
+
+ :param address:
+ Address to connect to/listen on;
+ UNIX socket path or TCP address/port.
+ :param ssl: SSL context to use, if any.
+ :param accept: Accept a connection instead of connecting when `True`.
+
+ :raise ConnectError:
+ When a connection or session cannot be established.
+
+ This exception will wrap a more concrete one. In most cases,
+ the wrapped exception will be `OSError` or `EOFError`. If a
+ protocol-level failure occurs while establishing a new
+ session, the wrapped error may also be an `AQMPError`.
+ """
+ assert self.runstate == Runstate.IDLE
+
+ try:
+ phase = "connection"
+ await self._establish_connection(address, ssl, accept)
+
+ phase = "session"
+ await self._establish_session()
+
+ except BaseException as err:
+ emsg = f"Failed to establish {phase}"
+ self.logger.error("%s: %s", emsg, exception_summary(err))
+ self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
+ try:
+ # Reset from CONNECTING back to IDLE.
+ await self.disconnect()
+ except:
+ emsg = "Unexpected bottom half exception"
+ self.logger.critical("%s:\n%s\n", emsg, pretty_traceback())
+ raise
+
+ # NB: CancelledError is not a BaseException before Python 3.8
+ if isinstance(err, asyncio.CancelledError):
+ raise
+
+ if isinstance(err, Exception):
+ raise ConnectError(emsg, err) from err
+
+ # Raise BaseExceptions un-wrapped, they're more important.
+ raise
+
+ assert self.runstate == Runstate.RUNNING
+
+ @upper_half
+ async def _establish_connection(
+ self,
+ address: Union[str, Tuple[str, int]],
+ ssl: Optional[SSLContext] = None,
+ accept: bool = False
+ ) -> None:
+ """
+ Establish a new connection.
+
+ :param address:
+ Address to connect to/listen on;
+ UNIX socket path or TCP address/port.
+ :param ssl: SSL context to use, if any.
+ :param accept: Accept a connection instead of connecting when `True`.
+ """
+ assert self.runstate == Runstate.IDLE
+ self._set_state(Runstate.CONNECTING)
+
+ # Allow runstate watchers to witness 'CONNECTING' state; some
+ # failures in the streaming layer are synchronous and will not
+ # otherwise yield.
+ await asyncio.sleep(0)
+
+ if accept:
+ await self._do_accept(address, ssl)
+ else:
+ await self._do_connect(address, ssl)
+
+ @upper_half
+ async def _do_accept(self, address: Union[str, Tuple[str, int]],
+ ssl: Optional[SSLContext] = None) -> None:
+ """
+ Acting as the transport server, accept a single connection.
+
+ :param address:
+ Address to listen on; UNIX socket path or TCP address/port.
+ :param ssl: SSL context to use, if any.
+
+ :raise OSError: For stream-related errors.
+ """
+ self.logger.debug("Awaiting connection on %s ...", address)
+ connected = asyncio.Event()
+ server: Optional[asyncio.AbstractServer] = None
+
+ async def _client_connected_cb(reader: asyncio.StreamReader,
+ writer: asyncio.StreamWriter) -> None:
+ """Used to accept a single incoming connection, see below."""
+ nonlocal server
+ nonlocal connected
+
+ # A connection has been accepted; stop listening for new ones.
+ assert server is not None
+ server.close()
+ await server.wait_closed()
+ server = None
+
+ # Register this client as being connected
+ self._reader, self._writer = (reader, writer)
+
+ # Signal back: We've accepted a client!
+ connected.set()
+
+ if isinstance(address, tuple):
+ coro = asyncio.start_server(
+ _client_connected_cb,
+ host=address[0],
+ port=address[1],
+ ssl=ssl,
+ backlog=1,
+ limit=self._limit,
+ )
+ else:
+ coro = asyncio.start_unix_server(
+ _client_connected_cb,
+ path=address,
+ ssl=ssl,
+ backlog=1,
+ limit=self._limit,
+ )
+
+ server = await coro # Starts listening
+ await connected.wait() # Waits for the callback to fire (and finish)
+ assert server is None
+
+ self.logger.debug("Connection accepted.")
+
+ @upper_half
+ async def _do_connect(self, address: Union[str, Tuple[str, int]],
+ ssl: Optional[SSLContext] = None) -> None:
+ """
+ Acting as the transport client, initiate a connection to a server.
+
+ :param address:
+ Address to connect to; UNIX socket path or TCP address/port.
+ :param ssl: SSL context to use, if any.
+
+ :raise OSError: For stream-related errors.
+ """
+ self.logger.debug("Connecting to %s ...", address)
+
+ if isinstance(address, tuple):
+ connect = asyncio.open_connection(
+ address[0],
+ address[1],
+ ssl=ssl,
+ limit=self._limit,
+ )
+ else:
+ connect = asyncio.open_unix_connection(
+ path=address,
+ ssl=ssl,
+ limit=self._limit,
+ )
+ self._reader, self._writer = await connect
+
+ self.logger.debug("Connected.")
+
+ @upper_half
+ async def _establish_session(self) -> None:
+ """
+ Establish a new session.
+
+ Starts the readers/writer tasks; subclasses may perform their
+ own negotiations here. The Runstate will be RUNNING upon
+ successful conclusion.
+ """
+ assert self.runstate == Runstate.CONNECTING
+
+ self._outgoing = asyncio.Queue()
+
+ reader_coro = self._bh_loop_forever(self._bh_recv_message, 'Reader')
+ writer_coro = self._bh_loop_forever(self._bh_send_message, 'Writer')
+
+ self._reader_task = create_task(reader_coro)
+ self._writer_task = create_task(writer_coro)
+
+ self._bh_tasks = asyncio.gather(
+ self._reader_task,
+ self._writer_task,
+ )
+
+ self._set_state(Runstate.RUNNING)
+ await asyncio.sleep(0) # Allow runstate_event to process
+
+ @upper_half
+ @bottom_half
+ def _schedule_disconnect(self) -> None:
+ """
+ Initiate a disconnect; idempotent.
+
+ This method is used both in the upper-half as a direct
+ consequence of `disconnect()`, and in the bottom-half in the
+ case of unhandled exceptions in the reader/writer tasks.
+
+ It can be invoked no matter what the `runstate` is.
+ """
+ if not self._dc_task:
+ self._set_state(Runstate.DISCONNECTING)
+ self.logger.debug("Scheduling disconnect.")
+ self._dc_task = create_task(self._bh_disconnect())
+
+ @upper_half
+ async def _wait_disconnect(self) -> None:
+ """
+ Waits for a previously scheduled disconnect to finish.
+
+ This method will gather any bottom half exceptions and re-raise
+ the one that occurred first; presuming it to be the root cause
+ of any subsequent Exceptions. It is intended to be used in the
+ upper half of the call chain.
+
+ :raise Exception:
+ Arbitrary exception re-raised on behalf of the reader/writer.
+ """
+ assert self.runstate == Runstate.DISCONNECTING
+ assert self._dc_task
+
+ aws: List[Awaitable[object]] = [self._dc_task]
+ if self._bh_tasks:
+ aws.insert(0, self._bh_tasks)
+ all_defined_tasks = asyncio.gather(*aws)
+
+ # Ensure disconnect is done; Exception (if any) is not raised here:
+ await asyncio.wait((self._dc_task,))
+
+ try:
+ await all_defined_tasks # Raise Exceptions from the bottom half.
+ finally:
+ self._cleanup()
+ self._set_state(Runstate.IDLE)
+
+ @upper_half
+ def _cleanup(self) -> None:
+ """
+ Fully reset this object to a clean state and return to `IDLE`.
+ """
+ def _paranoid_task_erase(task: _FutureT) -> Optional[_FutureT]:
+ # Help to erase a task, ENSURING it is fully quiesced first.
+ assert (task is None) or task.done()
+ return None if (task and task.done()) else task
+
+ assert self.runstate == Runstate.DISCONNECTING
+ self._dc_task = _paranoid_task_erase(self._dc_task)
+ self._reader_task = _paranoid_task_erase(self._reader_task)
+ self._writer_task = _paranoid_task_erase(self._writer_task)
+ self._bh_tasks = _paranoid_task_erase(self._bh_tasks)
+
+ self._reader = None
+ self._writer = None
+
+ # NB: _runstate_changed cannot be cleared because we still need it to
+ # send the final runstate changed event ...!
+
+ # ----------------------------
+ # Section: Bottom Half methods
+ # ----------------------------
+
+ @bottom_half
+ async def _bh_disconnect(self) -> None:
+ """
+ Disconnect and cancel all outstanding tasks.
+
+ It is designed to be called from its task context,
+ :py:obj:`~AsyncProtocol._dc_task`. By running in its own task,
+ it is free to wait on any pending actions that may still need to
+ occur in either the reader or writer tasks.
+ """
+ assert self.runstate == Runstate.DISCONNECTING
+
+ def _done(task: Optional['asyncio.Future[Any]']) -> bool:
+ return task is not None and task.done()
+
+ # NB: We can't rely on _bh_tasks being done() here, it may not
+ # yet have had a chance to run and gather itself.
+ tasks = tuple(filter(None, (self._writer_task, self._reader_task)))
+ error_pathway = _done(self._reader_task) or _done(self._writer_task)
+
+ try:
+ # Try to flush the writer, if possible:
+ if not error_pathway:
+ await self._bh_flush_writer()
+ except BaseException as err:
+ error_pathway = True
+ emsg = "Failed to flush the writer"
+ self.logger.error("%s: %s", emsg, exception_summary(err))
+ self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
+ raise
+ finally:
+ # Cancel any still-running tasks:
+ if self._writer_task is not None and not self._writer_task.done():
+ self.logger.debug("Cancelling writer task.")
+ self._writer_task.cancel()
+ if self._reader_task is not None and not self._reader_task.done():
+ self.logger.debug("Cancelling reader task.")
+ self._reader_task.cancel()
+
+ # Close out the tasks entirely (Won't raise):
+ if tasks:
+ self.logger.debug("Waiting for tasks to complete ...")
+ await asyncio.wait(tasks)
+
+ # Lastly, close the stream itself. (May raise):
+ await self._bh_close_stream(error_pathway)
+ self.logger.debug("Disconnected.")
+
+ @bottom_half
+ async def _bh_flush_writer(self) -> None:
+ if not self._writer_task:
+ return
+
+ self.logger.debug("Draining the outbound queue ...")
+ await self._outgoing.join()
+ if self._writer is not None:
+ self.logger.debug("Flushing the StreamWriter ...")
+ await flush(self._writer)
+
+ @bottom_half
+ async def _bh_close_stream(self, error_pathway: bool = False) -> None:
+ # NB: Closing the writer also implcitly closes the reader.
+ if not self._writer:
+ return
+
+ if not is_closing(self._writer):
+ self.logger.debug("Closing StreamWriter.")
+ self._writer.close()
+
+ self.logger.debug("Waiting for StreamWriter to close ...")
+ try:
+ await wait_closed(self._writer)
+ except Exception: # pylint: disable=broad-except
+ # It's hard to tell if the Stream is already closed or
+ # not. Even if one of the tasks has failed, it may have
+ # failed for a higher-layered protocol reason. The
+ # stream could still be open and perfectly fine.
+ # I don't know how to discern its health here.
+
+ if error_pathway:
+ # We already know that *something* went wrong. Let's
+ # just trust that the Exception we already have is the
+ # better one to present to the user, even if we don't
+ # genuinely *know* the relationship between the two.
+ self.logger.debug(
+ "Discarding Exception from wait_closed:\n%s\n",
+ pretty_traceback(),
+ )
+ else:
+ # Oops, this is a brand-new error!
+ raise
+ finally:
+ self.logger.debug("StreamWriter closed.")
+
+ @bottom_half
+ async def _bh_loop_forever(self, async_fn: _TaskFN, name: str) -> None:
+ """
+ Run one of the bottom-half methods in a loop forever.
+
+ If the bottom half ever raises any exception, schedule a
+ disconnect that will terminate the entire loop.
+
+ :param async_fn: The bottom-half method to run in a loop.
+ :param name: The name of this task, used for logging.
+ """
+ try:
+ while True:
+ await async_fn()
+ except asyncio.CancelledError:
+ # We have been cancelled by _bh_disconnect, exit gracefully.
+ self.logger.debug("Task.%s: cancelled.", name)
+ return
+ except BaseException as err:
+ self.logger.error("Task.%s: %s",
+ name, exception_summary(err))
+ self.logger.debug("Task.%s: failure:\n%s\n",
+ name, pretty_traceback())
+ self._schedule_disconnect()
+ raise
+ finally:
+ self.logger.debug("Task.%s: exiting.", name)
+
+ @bottom_half
+ async def _bh_send_message(self) -> None:
+ """
+ Wait for an outgoing message, then send it.
+
+ Designed to be run in `_bh_loop_forever()`.
+ """
+ msg = await self._outgoing.get()
+ try:
+ await self._send(msg)
+ finally:
+ self._outgoing.task_done()
+
+ @bottom_half
+ async def _bh_recv_message(self) -> None:
+ """
+ Wait for an incoming message and call `_on_message` to route it.
+
+ Designed to be run in `_bh_loop_forever()`.
+ """
+ msg = await self._recv()
+ await self._on_message(msg)
+
+ # --------------------
+ # Section: Message I/O
+ # --------------------
+
+ @upper_half
+ @bottom_half
+ def _cb_outbound(self, msg: T) -> T:
+ """
+ Callback: outbound message hook.
+
+ This is intended for subclasses to be able to add arbitrary
+ hooks to filter or manipulate outgoing messages. The base
+ implementation does nothing but log the message without any
+ manipulation of the message.
+
+ :param msg: raw outbound message
+ :return: final outbound message
+ """
+ self.logger.debug("--> %s", str(msg))
+ return msg
+
+ @upper_half
+ @bottom_half
+ def _cb_inbound(self, msg: T) -> T:
+ """
+ Callback: inbound message hook.
+
+ This is intended for subclasses to be able to add arbitrary
+ hooks to filter or manipulate incoming messages. The base
+ implementation does nothing but log the message without any
+ manipulation of the message.
+
+ This method does not "handle" incoming messages; it is a filter.
+ The actual "endpoint" for incoming messages is `_on_message()`.
+
+ :param msg: raw inbound message
+ :return: processed inbound message
+ """
+ self.logger.debug("<-- %s", str(msg))
+ return msg
+
+ @upper_half
+ @bottom_half
+ async def _readline(self) -> bytes:
+ """
+ Wait for a newline from the incoming reader.
+
+ This method is provided as a convenience for upper-layer
+ protocols, as many are line-based.
+
+ This method *may* return a sequence of bytes without a trailing
+ newline if EOF occurs, but *some* bytes were received. In this
+ case, the next call will raise `EOFError`. It is assumed that
+ the layer 5 protocol will decide if there is anything meaningful
+ to be done with a partial message.
+
+ :raise OSError: For stream-related errors.
+ :raise EOFError:
+ If the reader stream is at EOF and there are no bytes to return.
+ :return: bytes, including the newline.
+ """
+ assert self._reader is not None
+ msg_bytes = await self._reader.readline()
+
+ if not msg_bytes:
+ if self._reader.at_eof():
+ raise EOFError
+
+ return msg_bytes
+
+ @upper_half
+ @bottom_half
+ async def _do_recv(self) -> T:
+ """
+ Abstract: Read from the stream and return a message.
+
+ Very low-level; intended to only be called by `_recv()`.
+ """
+ raise NotImplementedError
+
+ @upper_half
+ @bottom_half
+ async def _recv(self) -> T:
+ """
+ Read an arbitrary protocol message.
+
+ .. warning::
+ This method is intended primarily for `_bh_recv_message()`
+ to use in an asynchronous task loop. Using it outside of
+ this loop will "steal" messages from the normal routing
+ mechanism. It is safe to use prior to `_establish_session()`,
+ but should not be used otherwise.
+
+ This method uses `_do_recv()` to retrieve the raw message, and
+ then transforms it using `_cb_inbound()`.
+
+ :return: A single (filtered, processed) protocol message.
+ """
+ message = await self._do_recv()
+ return self._cb_inbound(message)
+
+ @upper_half
+ @bottom_half
+ def _do_send(self, msg: T) -> None:
+ """
+ Abstract: Write a message to the stream.
+
+ Very low-level; intended to only be called by `_send()`.
+ """
+ raise NotImplementedError
+
+ @upper_half
+ @bottom_half
+ async def _send(self, msg: T) -> None:
+ """
+ Send an arbitrary protocol message.
+
+ This method will transform any outgoing messages according to
+ `_cb_outbound()`.
+
+ .. warning::
+ Like `_recv()`, this method is intended to be called by
+ the writer task loop that processes outgoing
+ messages. Calling it directly may circumvent logic
+ implemented by the caller meant to correlate outgoing and
+ incoming messages.
+
+ :raise OSError: For problems with the underlying stream.
+ """
+ msg = self._cb_outbound(msg)
+ self._do_send(msg)
+
+ @bottom_half
+ async def _on_message(self, msg: T) -> None:
+ """
+ Called to handle the receipt of a new message.
+
+ .. caution::
+ This is executed from within the reader loop, so be advised
+ that waiting on either the reader or writer task will lead
+ to deadlock. Additionally, any unhandled exceptions will
+ directly cause the loop to halt, so logic may be best-kept
+ to a minimum if at all possible.
+
+ :param msg: The incoming message, already logged/filtered.
+ """
+ # Nothing to do in the abstract case.
diff --git a/python/qemu/aqmp/py.typed b/python/qemu/aqmp/py.typed
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/python/qemu/aqmp/py.typed
diff --git a/python/qemu/aqmp/qmp_client.py b/python/qemu/aqmp/qmp_client.py
new file mode 100644
index 0000000000..82e9dab124
--- /dev/null
+++ b/python/qemu/aqmp/qmp_client.py
@@ -0,0 +1,621 @@
+"""
+QMP Protocol Implementation
+
+This module provides the `QMPClient` class, which can be used to connect
+and send commands to a QMP server such as QEMU. The QMP class can be
+used to either connect to a listening server, or used to listen and
+accept an incoming connection from that server.
+"""
+
+import asyncio
+import logging
+from typing import (
+ Dict,
+ List,
+ Mapping,
+ Optional,
+ Union,
+ cast,
+)
+
+from .error import AQMPError, ProtocolError
+from .events import Events
+from .message import Message
+from .models import ErrorResponse, Greeting
+from .protocol import AsyncProtocol, Runstate, require
+from .util import (
+ bottom_half,
+ exception_summary,
+ pretty_traceback,
+ upper_half,
+)
+
+
+class _WrappedProtocolError(ProtocolError):
+ """
+ Abstract exception class for Protocol errors that wrap an Exception.
+
+ :param error_message: Human-readable string describing the error.
+ :param exc: The root-cause exception.
+ """
+ def __init__(self, error_message: str, exc: Exception):
+ super().__init__(error_message)
+ self.exc = exc
+
+ def __str__(self) -> str:
+ return f"{self.error_message}: {self.exc!s}"
+
+
+class GreetingError(_WrappedProtocolError):
+ """
+ An exception occurred during the Greeting phase.
+
+ :param error_message: Human-readable string describing the error.
+ :param exc: The root-cause exception.
+ """
+
+
+class NegotiationError(_WrappedProtocolError):
+ """
+ An exception occurred during the Negotiation phase.
+
+ :param error_message: Human-readable string describing the error.
+ :param exc: The root-cause exception.
+ """
+
+
+class ExecuteError(AQMPError):
+ """
+ Exception raised by `QMPClient.execute()` on RPC failure.
+
+ :param error_response: The RPC error response object.
+ :param sent: The sent RPC message that caused the failure.
+ :param received: The raw RPC error reply received.
+ """
+ def __init__(self, error_response: ErrorResponse,
+ sent: Message, received: Message):
+ super().__init__(error_response.error.desc)
+ #: The sent `Message` that caused the failure
+ self.sent: Message = sent
+ #: The received `Message` that indicated failure
+ self.received: Message = received
+ #: The parsed error response
+ self.error: ErrorResponse = error_response
+ #: The QMP error class
+ self.error_class: str = error_response.error.class_
+
+
+class ExecInterruptedError(AQMPError):
+ """
+ Exception raised by `execute()` (et al) when an RPC is interrupted.
+
+ This error is raised when an `execute()` statement could not be
+ completed. This can occur because the connection itself was
+ terminated before a reply was received.
+
+ The true cause of the interruption will be available via `disconnect()`.
+ """
+
+
+class _MsgProtocolError(ProtocolError):
+ """
+ Abstract error class for protocol errors that have a `Message` object.
+
+ This Exception class is used for protocol errors where the `Message`
+ was mechanically understood, but was found to be inappropriate or
+ malformed.
+
+ :param error_message: Human-readable string describing the error.
+ :param msg: The QMP `Message` that caused the error.
+ """
+ def __init__(self, error_message: str, msg: Message):
+ super().__init__(error_message)
+ #: The received `Message` that caused the error.
+ self.msg: Message = msg
+
+ def __str__(self) -> str:
+ return "\n".join([
+ super().__str__(),
+ f" Message was: {str(self.msg)}\n",
+ ])
+
+
+class ServerParseError(_MsgProtocolError):
+ """
+ The Server sent a `Message` indicating parsing failure.
+
+ i.e. A reply has arrived from the server, but it is missing the "ID"
+ field, indicating a parsing error.
+
+ :param error_message: Human-readable string describing the error.
+ :param msg: The QMP `Message` that caused the error.
+ """
+
+
+class BadReplyError(_MsgProtocolError):
+ """
+ An execution reply was successfully routed, but not understood.
+
+ If a QMP message is received with an 'id' field to allow it to be
+ routed, but is otherwise malformed, this exception will be raised.
+
+ A reply message is malformed if it is missing either the 'return' or
+ 'error' keys, or if the 'error' value has missing keys or members of
+ the wrong type.
+
+ :param error_message: Human-readable string describing the error.
+ :param msg: The malformed reply that was received.
+ :param sent: The message that was sent that prompted the error.
+ """
+ def __init__(self, error_message: str, msg: Message, sent: Message):
+ super().__init__(error_message, msg)
+ #: The sent `Message` that caused the failure
+ self.sent = sent
+
+
+class QMPClient(AsyncProtocol[Message], Events):
+ """
+ Implements a QMP client connection.
+
+ QMP can be used to establish a connection as either the transport
+ client or server, though this class always acts as the QMP client.
+
+ :param name: Optional nickname for the connection, used for logging.
+
+ Basic script-style usage looks like this::
+
+ qmp = QMPClient('my_virtual_machine_name')
+ await qmp.connect(('127.0.0.1', 1234))
+ ...
+ res = await qmp.execute('block-query')
+ ...
+ await qmp.disconnect()
+
+ Basic async client-style usage looks like this::
+
+ class Client:
+ def __init__(self, name: str):
+ self.qmp = QMPClient(name)
+
+ async def watch_events(self):
+ try:
+ async for event in self.qmp.events:
+ print(f"Event: {event['event']}")
+ except asyncio.CancelledError:
+ return
+
+ async def run(self, address='/tmp/qemu.socket'):
+ await self.qmp.connect(address)
+ asyncio.create_task(self.watch_events())
+ await self.qmp.runstate_changed.wait()
+ await self.disconnect()
+
+ See `aqmp.events` for more detail on event handling patterns.
+ """
+ #: Logger object used for debugging messages.
+ logger = logging.getLogger(__name__)
+
+ # Read buffer limit; large enough to accept query-qmp-schema
+ _limit = (256 * 1024)
+
+ # Type alias for pending execute() result items
+ _PendingT = Union[Message, ExecInterruptedError]
+
+ def __init__(self, name: Optional[str] = None) -> None:
+ super().__init__(name)
+ Events.__init__(self)
+
+ #: Whether or not to await a greeting after establishing a connection.
+ self.await_greeting: bool = True
+
+ #: Whether or not to perform capabilities negotiation upon connection.
+ #: Implies `await_greeting`.
+ self.negotiate: bool = True
+
+ # Cached Greeting, if one was awaited.
+ self._greeting: Optional[Greeting] = None
+
+ # Command ID counter
+ self._execute_id = 0
+
+ # Incoming RPC reply messages.
+ self._pending: Dict[
+ Union[str, None],
+ 'asyncio.Queue[QMPClient._PendingT]'
+ ] = {}
+
+ @upper_half
+ async def _establish_session(self) -> None:
+ """
+ Initiate the QMP session.
+
+ Wait for the QMP greeting and perform capabilities negotiation.
+
+ :raise GreetingError: When the greeting is not understood.
+ :raise NegotiationError: If the negotiation fails.
+ :raise EOFError: When the server unexpectedly hangs up.
+ :raise OSError: For underlying stream errors.
+ """
+ self._greeting = None
+ self._pending = {}
+
+ if self.await_greeting or self.negotiate:
+ self._greeting = await self._get_greeting()
+
+ if self.negotiate:
+ await self._negotiate()
+
+ # This will start the reader/writers:
+ await super()._establish_session()
+
+ @upper_half
+ async def _get_greeting(self) -> Greeting:
+ """
+ :raise GreetingError: When the greeting is not understood.
+ :raise EOFError: When the server unexpectedly hangs up.
+ :raise OSError: For underlying stream errors.
+
+ :return: the Greeting object given by the server.
+ """
+ self.logger.debug("Awaiting greeting ...")
+
+ try:
+ msg = await self._recv()
+ return Greeting(msg)
+ except (ProtocolError, KeyError, TypeError) as err:
+ emsg = "Did not understand Greeting"
+ self.logger.error("%s: %s", emsg, exception_summary(err))
+ self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
+ raise GreetingError(emsg, err) from err
+ except BaseException as err:
+ # EOFError, OSError, or something unexpected.
+ emsg = "Failed to receive Greeting"
+ self.logger.error("%s: %s", emsg, exception_summary(err))
+ self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
+ raise
+
+ @upper_half
+ async def _negotiate(self) -> None:
+ """
+ Perform QMP capabilities negotiation.
+
+ :raise NegotiationError: When negotiation fails.
+ :raise EOFError: When the server unexpectedly hangs up.
+ :raise OSError: For underlying stream errors.
+ """
+ self.logger.debug("Negotiating capabilities ...")
+
+ arguments: Dict[str, List[str]] = {'enable': []}
+ if self._greeting and 'oob' in self._greeting.QMP.capabilities:
+ arguments['enable'].append('oob')
+ msg = self.make_execute_msg('qmp_capabilities', arguments=arguments)
+
+ # It's not safe to use execute() here, because the reader/writers
+ # aren't running. AsyncProtocol *requires* that a new session
+ # does not fail after the reader/writers are running!
+ try:
+ await self._send(msg)
+ reply = await self._recv()
+ assert 'return' in reply
+ assert 'error' not in reply
+ except (ProtocolError, AssertionError) as err:
+ emsg = "Negotiation failed"
+ self.logger.error("%s: %s", emsg, exception_summary(err))
+ self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
+ raise NegotiationError(emsg, err) from err
+ except BaseException as err:
+ # EOFError, OSError, or something unexpected.
+ emsg = "Negotiation failed"
+ self.logger.error("%s: %s", emsg, exception_summary(err))
+ self.logger.debug("%s:\n%s\n", emsg, pretty_traceback())
+ raise
+
+ @bottom_half
+ async def _bh_disconnect(self) -> None:
+ try:
+ await super()._bh_disconnect()
+ finally:
+ if self._pending:
+ self.logger.debug("Cancelling pending executions")
+ keys = self._pending.keys()
+ for key in keys:
+ self.logger.debug("Cancelling execution '%s'", key)
+ self._pending[key].put_nowait(
+ ExecInterruptedError("Disconnected")
+ )
+
+ self.logger.debug("QMP Disconnected.")
+
+ @upper_half
+ def _cleanup(self) -> None:
+ super()._cleanup()
+ assert not self._pending
+
+ @bottom_half
+ async def _on_message(self, msg: Message) -> None:
+ """
+ Add an incoming message to the appropriate queue/handler.
+
+ :raise ServerParseError: When Message indicates server parse failure.
+ """
+ # Incoming messages are not fully parsed/validated here;
+ # do only light peeking to know how to route the messages.
+
+ if 'event' in msg:
+ await self._event_dispatch(msg)
+ return
+
+ # Below, we assume everything left is an execute/exec-oob response.
+
+ exec_id = cast(Optional[str], msg.get('id'))
+
+ if exec_id in self._pending:
+ await self._pending[exec_id].put(msg)
+ return
+
+ # We have a message we can't route back to a caller.
+
+ is_error = 'error' in msg
+ has_id = 'id' in msg
+
+ if is_error and not has_id:
+ # This is very likely a server parsing error.
+ # It doesn't inherently belong to any pending execution.
+ # Instead of performing clever recovery, just terminate.
+ # See "NOTE" in qmp-spec.txt, section 2.4.2
+ raise ServerParseError(
+ ("Server sent an error response without an ID, "
+ "but there are no ID-less executions pending. "
+ "Assuming this is a server parser failure."),
+ msg
+ )
+
+ # qmp-spec.txt, section 2.4:
+ # 'Clients should drop all the responses
+ # that have an unknown "id" field.'
+ self.logger.log(
+ logging.ERROR if is_error else logging.WARNING,
+ "Unknown ID '%s', message dropped.",
+ exec_id,
+ )
+ self.logger.debug("Unroutable message: %s", str(msg))
+
+ @upper_half
+ @bottom_half
+ async def _do_recv(self) -> Message:
+ """
+ :raise OSError: When a stream error is encountered.
+ :raise EOFError: When the stream is at EOF.
+ :raise ProtocolError:
+ When the Message is not understood.
+ See also `Message._deserialize`.
+
+ :return: A single QMP `Message`.
+ """
+ msg_bytes = await self._readline()
+ msg = Message(msg_bytes, eager=True)
+ return msg
+
+ @upper_half
+ @bottom_half
+ def _do_send(self, msg: Message) -> None:
+ """
+ :raise ValueError: JSON serialization failure
+ :raise TypeError: JSON serialization failure
+ :raise OSError: When a stream error is encountered.
+ """
+ assert self._writer is not None
+ self._writer.write(bytes(msg))
+
+ @upper_half
+ def _get_exec_id(self) -> str:
+ exec_id = f"__aqmp#{self._execute_id:05d}"
+ self._execute_id += 1
+ return exec_id
+
+ @upper_half
+ async def _issue(self, msg: Message) -> Union[None, str]:
+ """
+ Issue a QMP `Message` and do not wait for a reply.
+
+ :param msg: The QMP `Message` to send to the server.
+
+ :return: The ID of the `Message` sent.
+ """
+ msg_id: Optional[str] = None
+ if 'id' in msg:
+ assert isinstance(msg['id'], str)
+ msg_id = msg['id']
+
+ self._pending[msg_id] = asyncio.Queue(maxsize=1)
+ await self._outgoing.put(msg)
+
+ return msg_id
+
+ @upper_half
+ async def _reply(self, msg_id: Union[str, None]) -> Message:
+ """
+ Await a reply to a previously issued QMP message.
+
+ :param msg_id: The ID of the previously issued message.
+
+ :return: The reply from the server.
+ :raise ExecInterruptedError:
+ When the reply could not be retrieved because the connection
+ was lost, or some other problem.
+ """
+ queue = self._pending[msg_id]
+ result = await queue.get()
+
+ try:
+ if isinstance(result, ExecInterruptedError):
+ raise result
+ return result
+ finally:
+ del self._pending[msg_id]
+
+ @upper_half
+ async def _execute(self, msg: Message, assign_id: bool = True) -> Message:
+ """
+ Send a QMP `Message` to the server and await a reply.
+
+ This method *assumes* you are sending some kind of an execute
+ statement that *will* receive a reply.
+
+ An execution ID will be assigned if assign_id is `True`. It can be
+ disabled, but this requires that an ID is manually assigned
+ instead. For manually assigned IDs, you must not use the string
+ '__aqmp#' anywhere in the ID.
+
+ :param msg: The QMP `Message` to execute.
+ :param assign_id: If True, assign a new execution ID.
+
+ :return: Execution reply from the server.
+ :raise ExecInterruptedError:
+ When the reply could not be retrieved because the connection
+ was lost, or some other problem.
+ """
+ if assign_id:
+ msg['id'] = self._get_exec_id()
+ elif 'id' in msg:
+ assert isinstance(msg['id'], str)
+ assert '__aqmp#' not in msg['id']
+
+ exec_id = await self._issue(msg)
+ return await self._reply(exec_id)
+
+ @upper_half
+ @require(Runstate.RUNNING)
+ async def _raw(
+ self,
+ msg: Union[Message, Mapping[str, object], bytes],
+ assign_id: bool = True,
+ ) -> Message:
+ """
+ Issue a raw `Message` to the QMP server and await a reply.
+
+ :param msg:
+ A Message to send to the server. It may be a `Message`, any
+ Mapping (including Dict), or raw bytes.
+ :param assign_id:
+ Assign an arbitrary execution ID to this message. If
+ `False`, the existing id must either be absent (and no other
+ such pending execution may omit an ID) or a string. If it is
+ a string, it must not start with '__aqmp#' and no other such
+ pending execution may currently be using that ID.
+
+ :return: Execution reply from the server.
+
+ :raise ExecInterruptedError:
+ When the reply could not be retrieved because the connection
+ was lost, or some other problem.
+ :raise TypeError:
+ When assign_id is `False`, an ID is given, and it is not a string.
+ :raise ValueError:
+ When assign_id is `False`, but the ID is not usable;
+ Either because it starts with '__aqmp#' or it is already in-use.
+ """
+ # 1. convert generic Mapping or bytes to a QMP Message
+ # 2. copy Message objects so that we assign an ID only to the copy.
+ msg = Message(msg)
+
+ exec_id = msg.get('id')
+ if not assign_id and 'id' in msg:
+ if not isinstance(exec_id, str):
+ raise TypeError(f"ID ('{exec_id}') must be a string.")
+ if exec_id.startswith('__aqmp#'):
+ raise ValueError(
+ f"ID ('{exec_id}') must not start with '__aqmp#'."
+ )
+
+ if not assign_id and exec_id in self._pending:
+ raise ValueError(
+ f"ID '{exec_id}' is in-use and cannot be used."
+ )
+
+ return await self._execute(msg, assign_id=assign_id)
+
+ @upper_half
+ @require(Runstate.RUNNING)
+ async def execute_msg(self, msg: Message) -> object:
+ """
+ Execute a QMP command and return its value.
+
+ :param msg: The QMP `Message` to execute.
+
+ :return:
+ The command execution return value from the server. The type of
+ object returned depends on the command that was issued,
+ though most in QEMU return a `dict`.
+ :raise ValueError:
+ If the QMP `Message` does not have either the 'execute' or
+ 'exec-oob' fields set.
+ :raise ExecuteError: When the server returns an error response.
+ :raise ExecInterruptedError: if the connection was terminated early.
+ """
+ if not ('execute' in msg or 'exec-oob' in msg):
+ raise ValueError("Requires 'execute' or 'exec-oob' message")
+
+ # Copy the Message so that the ID assigned by _execute() is
+ # local to this method; allowing the ID to be seen in raised
+ # Exceptions but without modifying the caller's held copy.
+ msg = Message(msg)
+ reply = await self._execute(msg)
+
+ if 'error' in reply:
+ try:
+ error_response = ErrorResponse(reply)
+ except (KeyError, TypeError) as err:
+ # Error response was malformed.
+ raise BadReplyError(
+ "QMP error reply is malformed", reply, msg,
+ ) from err
+
+ raise ExecuteError(error_response, msg, reply)
+
+ if 'return' not in reply:
+ raise BadReplyError(
+ "QMP reply is missing a 'error' or 'return' member",
+ reply, msg,
+ )
+
+ return reply['return']
+
+ @classmethod
+ def make_execute_msg(cls, cmd: str,
+ arguments: Optional[Mapping[str, object]] = None,
+ oob: bool = False) -> Message:
+ """
+ Create an executable message to be sent by `execute_msg` later.
+
+ :param cmd: QMP command name.
+ :param arguments: Arguments (if any). Must be JSON-serializable.
+ :param oob: If `True`, execute "out of band".
+
+ :return: An executable QMP `Message`.
+ """
+ msg = Message({'exec-oob' if oob else 'execute': cmd})
+ if arguments is not None:
+ msg['arguments'] = arguments
+ return msg
+
+ @upper_half
+ async def execute(self, cmd: str,
+ arguments: Optional[Mapping[str, object]] = None,
+ oob: bool = False) -> object:
+ """
+ Execute a QMP command and return its value.
+
+ :param cmd: QMP command name.
+ :param arguments: Arguments (if any). Must be JSON-serializable.
+ :param oob: If `True`, execute "out of band".
+
+ :return:
+ The command execution return value from the server. The type of
+ object returned depends on the command that was issued,
+ though most in QEMU return a `dict`.
+ :raise ExecuteError: When the server returns an error response.
+ :raise ExecInterruptedError: if the connection was terminated early.
+ """
+ msg = self.make_execute_msg(cmd, arguments, oob=oob)
+ return await self.execute_msg(msg)
diff --git a/python/qemu/aqmp/util.py b/python/qemu/aqmp/util.py
new file mode 100644
index 0000000000..eaa5fc7d5f
--- /dev/null
+++ b/python/qemu/aqmp/util.py
@@ -0,0 +1,217 @@
+"""
+Miscellaneous Utilities
+
+This module provides asyncio utilities and compatibility wrappers for
+Python 3.6 to provide some features that otherwise become available in
+Python 3.7+.
+
+Various logging and debugging utilities are also provided, such as
+`exception_summary()` and `pretty_traceback()`, used primarily for
+adding information into the logging stream.
+"""
+
+import asyncio
+import sys
+import traceback
+from typing import (
+ Any,
+ Coroutine,
+ Optional,
+ TypeVar,
+ cast,
+)
+
+
+T = TypeVar('T')
+
+
+# --------------------------
+# Section: Utility Functions
+# --------------------------
+
+
+async def flush(writer: asyncio.StreamWriter) -> None:
+ """
+ Utility function to ensure a StreamWriter is *fully* drained.
+
+ `asyncio.StreamWriter.drain` only promises we will return to below
+ the "high-water mark". This function ensures we flush the entire
+ buffer -- by setting the high water mark to 0 and then calling
+ drain. The flow control limits are restored after the call is
+ completed.
+ """
+ transport = cast(asyncio.WriteTransport, writer.transport)
+
+ # https://github.com/python/typeshed/issues/5779
+ low, high = transport.get_write_buffer_limits() # type: ignore
+ transport.set_write_buffer_limits(0, 0)
+ try:
+ await writer.drain()
+ finally:
+ transport.set_write_buffer_limits(high, low)
+
+
+def upper_half(func: T) -> T:
+ """
+ Do-nothing decorator that annotates a method as an "upper-half" method.
+
+ These methods must not call bottom-half functions directly, but can
+ schedule them to run.
+ """
+ return func
+
+
+def bottom_half(func: T) -> T:
+ """
+ Do-nothing decorator that annotates a method as a "bottom-half" method.
+
+ These methods must take great care to handle their own exceptions whenever
+ possible. If they go unhandled, they will cause termination of the loop.
+
+ These methods do not, in general, have the ability to directly
+ report information to a caller’s context and will usually be
+ collected as a Task result instead.
+
+ They must not call upper-half functions directly.
+ """
+ return func
+
+
+# -------------------------------
+# Section: Compatibility Wrappers
+# -------------------------------
+
+
+def create_task(coro: Coroutine[Any, Any, T],
+ loop: Optional[asyncio.AbstractEventLoop] = None
+ ) -> 'asyncio.Future[T]':
+ """
+ Python 3.6-compatible `asyncio.create_task` wrapper.
+
+ :param coro: The coroutine to execute in a task.
+ :param loop: Optionally, the loop to create the task in.
+
+ :return: An `asyncio.Future` object.
+ """
+ if sys.version_info >= (3, 7):
+ if loop is not None:
+ return loop.create_task(coro)
+ return asyncio.create_task(coro) # pylint: disable=no-member
+
+ # Python 3.6:
+ return asyncio.ensure_future(coro, loop=loop)
+
+
+def is_closing(writer: asyncio.StreamWriter) -> bool:
+ """
+ Python 3.6-compatible `asyncio.StreamWriter.is_closing` wrapper.
+
+ :param writer: The `asyncio.StreamWriter` object.
+ :return: `True` if the writer is closing, or closed.
+ """
+ if sys.version_info >= (3, 7):
+ return writer.is_closing()
+
+ # Python 3.6:
+ transport = writer.transport
+ assert isinstance(transport, asyncio.WriteTransport)
+ return transport.is_closing()
+
+
+async def wait_closed(writer: asyncio.StreamWriter) -> None:
+ """
+ Python 3.6-compatible `asyncio.StreamWriter.wait_closed` wrapper.
+
+ :param writer: The `asyncio.StreamWriter` to wait on.
+ """
+ if sys.version_info >= (3, 7):
+ await writer.wait_closed()
+ return
+
+ # Python 3.6
+ transport = writer.transport
+ assert isinstance(transport, asyncio.WriteTransport)
+
+ while not transport.is_closing():
+ await asyncio.sleep(0)
+
+ # This is an ugly workaround, but it's the best I can come up with.
+ sock = transport.get_extra_info('socket')
+
+ if sock is None:
+ # Our transport doesn't have a socket? ...
+ # Nothing we can reasonably do.
+ return
+
+ while sock.fileno() != -1:
+ await asyncio.sleep(0)
+
+
+def asyncio_run(coro: Coroutine[Any, Any, T], *, debug: bool = False) -> T:
+ """
+ Python 3.6-compatible `asyncio.run` wrapper.
+
+ :param coro: A coroutine to execute now.
+ :return: The return value from the coroutine.
+ """
+ if sys.version_info >= (3, 7):
+ return asyncio.run(coro, debug=debug)
+
+ # Python 3.6
+ loop = asyncio.get_event_loop()
+ loop.set_debug(debug)
+ ret = loop.run_until_complete(coro)
+ loop.close()
+
+ return ret
+
+
+# ----------------------------
+# Section: Logging & Debugging
+# ----------------------------
+
+
+def exception_summary(exc: BaseException) -> str:
+ """
+ Return a summary string of an arbitrary exception.
+
+ It will be of the form "ExceptionType: Error Message", if the error
+ string is non-empty, and just "ExceptionType" otherwise.
+ """
+ name = type(exc).__qualname__
+ smod = type(exc).__module__
+ if smod not in ("__main__", "builtins"):
+ name = smod + '.' + name
+
+ error = str(exc)
+ if error:
+ return f"{name}: {error}"
+ return name
+
+
+def pretty_traceback(prefix: str = " | ") -> str:
+ """
+ Formats the current traceback, indented to provide visual distinction.
+
+ This is useful for printing a traceback within a traceback for
+ debugging purposes when encapsulating errors to deliver them up the
+ stack; when those errors are printed, this helps provide a nice
+ visual grouping to quickly identify the parts of the error that
+ belong to the inner exception.
+
+ :param prefix: The prefix to append to each line of the traceback.
+ :return: A string, formatted something like the following::
+
+ | Traceback (most recent call last):
+ | File "foobar.py", line 42, in arbitrary_example
+ | foo.baz()
+ | ArbitraryError: [Errno 42] Something bad happened!
+ """
+ output = "".join(traceback.format_exception(*sys.exc_info()))
+
+ exc_lines = []
+ for line in output.split('\n'):
+ exc_lines.append(prefix + line)
+
+ # The last line is always empty, omit it
+ return "\n".join(exc_lines[:-1])
diff --git a/python/setup.cfg b/python/setup.cfg
index fdca265fec..417e937839 100644
--- a/python/setup.cfg
+++ b/python/setup.cfg
@@ -27,6 +27,7 @@ packages =
qemu.qmp
qemu.machine
qemu.utils
+ qemu.aqmp
[options.package_data]
* = py.typed
@@ -36,18 +37,27 @@ packages =
# version, use e.g. "pipenv install --dev pylint==3.0.0".
# Subsequently, edit 'Pipfile' to remove e.g. 'pylint = "==3.0.0'.
devel =
- avocado-framework >= 87.0
+ avocado-framework >= 90.0
flake8 >= 3.6.0
fusepy >= 2.0.4
isort >= 5.1.2
mypy >= 0.770
pylint >= 2.8.0
tox >= 3.18.0
+ urwid >= 2.1.2
+ urwid-readline >= 0.13
+ Pygments >= 2.9.0
# Provides qom-fuse functionality
fuse =
fusepy >= 2.0.4
+# AQMP TUI dependencies
+tui =
+ urwid >= 2.1.2
+ urwid-readline >= 0.13
+ Pygments >= 2.9.0
+
[options.entry_points]
console_scripts =
qom = qemu.qmp.qom:main
@@ -58,6 +68,7 @@ console_scripts =
qom-fuse = qemu.qmp.qom_fuse:QOMFuse.entry_point [fuse]
qemu-ga-client = qemu.qmp.qemu_ga_client:main
qmp-shell = qemu.qmp.qmp_shell:main
+ aqmp-tui = qemu.aqmp.aqmp_tui:main [tui]
[flake8]
extend-ignore = E722 # Prefer pylint's bare-except checks to flake8's
@@ -73,8 +84,22 @@ namespace_packages = True
# fusepy has no type stubs:
allow_subclassing_any = True
+[mypy-qemu.aqmp.aqmp_tui]
+# urwid and urwid_readline have no type stubs:
+allow_subclassing_any = True
+
+# The following missing import directives are because these libraries do not
+# provide type stubs. Allow them on an as-needed basis for mypy.
[mypy-fuse]
-# fusepy has no type stubs:
+ignore_missing_imports = True
+
+[mypy-urwid]
+ignore_missing_imports = True
+
+[mypy-urwid_readline]
+ignore_missing_imports = True
+
+[mypy-pygments]
ignore_missing_imports = True
[pylint.messages control]
@@ -88,6 +113,8 @@ ignore_missing_imports = True
# no Warning level messages displayed, use "--disable=all --enable=classes
# --disable=W".
disable=consider-using-f-string,
+ too-many-function-args, # mypy handles this with less false positives.
+ no-member, # mypy also handles this better.
[pylint.basic]
# Good variable names which should always be accepted, separated by a comma.
@@ -100,6 +127,7 @@ good-names=i,
fh, # fh = open(...)
fd, # fd = os.open(...)
c, # for c in string: ...
+ T, # for TypeVars. See pylint#3401
[pylint.similarities]
# Ignore imports when computing similarities.
@@ -134,5 +162,16 @@ allowlist_externals = make
deps =
.[devel]
.[fuse] # Workaround to trigger tox venv rebuild
+ .[tui] # Workaround to trigger tox venv rebuild
commands =
make check
+
+# Coverage.py [https://coverage.readthedocs.io/en/latest/] is a tool for
+# measuring code coverage of Python programs. It monitors your program,
+# noting which parts of the code have been executed, then analyzes the
+# source to identify code that could have been executed but was not.
+
+[coverage:run]
+concurrency = multiprocessing
+source = qemu/
+parallel = true
diff --git a/python/tests/protocol.py b/python/tests/protocol.py
new file mode 100644
index 0000000000..5cd7938be3
--- /dev/null
+++ b/python/tests/protocol.py
@@ -0,0 +1,583 @@
+import asyncio
+from contextlib import contextmanager
+import os
+import socket
+from tempfile import TemporaryDirectory
+
+import avocado
+
+from qemu.aqmp import ConnectError, Runstate
+from qemu.aqmp.protocol import AsyncProtocol, StateError
+from qemu.aqmp.util import asyncio_run, create_task
+
+
+class NullProtocol(AsyncProtocol[None]):
+ """
+ NullProtocol is a test mockup of an AsyncProtocol implementation.
+
+ It adds a fake_session instance variable that enables a code path
+ that bypasses the actual connection logic, but still allows the
+ reader/writers to start.
+
+ Because the message type is defined as None, an asyncio.Event named
+ 'trigger_input' is created that prohibits the reader from
+ incessantly being able to yield None; this event can be poked to
+ simulate an incoming message.
+
+ For testing symmetry with do_recv, an interface is added to "send" a
+ Null message.
+
+ For testing purposes, a "simulate_disconnection" method is also
+ added which allows us to trigger a bottom half disconnect without
+ injecting any real errors into the reader/writer loops; in essence
+ it performs exactly half of what disconnect() normally does.
+ """
+ def __init__(self, name=None):
+ self.fake_session = False
+ self.trigger_input: asyncio.Event
+ super().__init__(name)
+
+ async def _establish_session(self):
+ self.trigger_input = asyncio.Event()
+ await super()._establish_session()
+
+ async def _do_accept(self, address, ssl=None):
+ if not self.fake_session:
+ await super()._do_accept(address, ssl)
+
+ async def _do_connect(self, address, ssl=None):
+ if not self.fake_session:
+ await super()._do_connect(address, ssl)
+
+ async def _do_recv(self) -> None:
+ await self.trigger_input.wait()
+ self.trigger_input.clear()
+
+ def _do_send(self, msg: None) -> None:
+ pass
+
+ async def send_msg(self) -> None:
+ await self._outgoing.put(None)
+
+ async def simulate_disconnect(self) -> None:
+ """
+ Simulates a bottom-half disconnect.
+
+ This method schedules a disconnection but does not wait for it
+ to complete. This is used to put the loop into the DISCONNECTING
+ state without fully quiescing it back to IDLE. This is normally
+ something you cannot coax AsyncProtocol to do on purpose, but it
+ will be similar to what happens with an unhandled Exception in
+ the reader/writer.
+
+ Under normal circumstances, the library design requires you to
+ await on disconnect(), which awaits the disconnect task and
+ returns bottom half errors as a pre-condition to allowing the
+ loop to return back to IDLE.
+ """
+ self._schedule_disconnect()
+
+
+class LineProtocol(AsyncProtocol[str]):
+ def __init__(self, name=None):
+ super().__init__(name)
+ self.rx_history = []
+
+ async def _do_recv(self) -> str:
+ raw = await self._readline()
+ msg = raw.decode()
+ self.rx_history.append(msg)
+ return msg
+
+ def _do_send(self, msg: str) -> None:
+ assert self._writer is not None
+ self._writer.write(msg.encode() + b'\n')
+
+ async def send_msg(self, msg: str) -> None:
+ await self._outgoing.put(msg)
+
+
+def run_as_task(coro, allow_cancellation=False):
+ """
+ Run a given coroutine as a task.
+
+ Optionally, wrap it in a try..except block that allows this
+ coroutine to be canceled gracefully.
+ """
+ async def _runner():
+ try:
+ await coro
+ except asyncio.CancelledError:
+ if allow_cancellation:
+ return
+ raise
+ return create_task(_runner())
+
+
+@contextmanager
+def jammed_socket():
+ """
+ Opens up a random unused TCP port on localhost, then jams it.
+ """
+ socks = []
+
+ try:
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
+ sock.bind(('127.0.0.1', 0))
+ sock.listen(1)
+ address = sock.getsockname()
+
+ socks.append(sock)
+
+ # I don't *fully* understand why, but it takes *two* un-accepted
+ # connections to start jamming the socket.
+ for _ in range(2):
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ sock.connect(address)
+ socks.append(sock)
+
+ yield address
+
+ finally:
+ for sock in socks:
+ sock.close()
+
+
+class Smoke(avocado.Test):
+
+ def setUp(self):
+ self.proto = NullProtocol()
+
+ def test__repr__(self):
+ self.assertEqual(
+ repr(self.proto),
+ "<NullProtocol runstate=IDLE>"
+ )
+
+ def testRunstate(self):
+ self.assertEqual(
+ self.proto.runstate,
+ Runstate.IDLE
+ )
+
+ def testDefaultName(self):
+ self.assertEqual(
+ self.proto.name,
+ None
+ )
+
+ def testLogger(self):
+ self.assertEqual(
+ self.proto.logger.name,
+ 'qemu.aqmp.protocol'
+ )
+
+ def testName(self):
+ self.proto = NullProtocol('Steve')
+
+ self.assertEqual(
+ self.proto.name,
+ 'Steve'
+ )
+
+ self.assertEqual(
+ self.proto.logger.name,
+ 'qemu.aqmp.protocol.Steve'
+ )
+
+ self.assertEqual(
+ repr(self.proto),
+ "<NullProtocol name='Steve' runstate=IDLE>"
+ )
+
+
+class TestBase(avocado.Test):
+
+ def setUp(self):
+ self.proto = NullProtocol(type(self).__name__)
+ self.assertEqual(self.proto.runstate, Runstate.IDLE)
+ self.runstate_watcher = None
+
+ def tearDown(self):
+ self.assertEqual(self.proto.runstate, Runstate.IDLE)
+
+ async def _asyncSetUp(self):
+ pass
+
+ async def _asyncTearDown(self):
+ if self.runstate_watcher:
+ await self.runstate_watcher
+
+ @staticmethod
+ def async_test(async_test_method):
+ """
+ Decorator; adds SetUp and TearDown to async tests.
+ """
+ async def _wrapper(self, *args, **kwargs):
+ loop = asyncio.get_event_loop()
+ loop.set_debug(True)
+
+ await self._asyncSetUp()
+ await async_test_method(self, *args, **kwargs)
+ await self._asyncTearDown()
+
+ return _wrapper
+
+ # Definitions
+
+ # The states we expect a "bad" connect/accept attempt to transition through
+ BAD_CONNECTION_STATES = (
+ Runstate.CONNECTING,
+ Runstate.DISCONNECTING,
+ Runstate.IDLE,
+ )
+
+ # The states we expect a "good" session to transition through
+ GOOD_CONNECTION_STATES = (
+ Runstate.CONNECTING,
+ Runstate.RUNNING,
+ Runstate.DISCONNECTING,
+ Runstate.IDLE,
+ )
+
+ # Helpers
+
+ async def _watch_runstates(self, *states):
+ """
+ This launches a task alongside (most) tests below to confirm that
+ the sequence of runstate changes that occur is exactly as
+ anticipated.
+ """
+ async def _watcher():
+ for state in states:
+ new_state = await self.proto.runstate_changed()
+ self.assertEqual(
+ new_state,
+ state,
+ msg=f"Expected state '{state.name}'",
+ )
+
+ self.runstate_watcher = create_task(_watcher())
+ # Kick the loop and force the task to block on the event.
+ await asyncio.sleep(0)
+
+
+class State(TestBase):
+
+ @TestBase.async_test
+ async def testSuperfluousDisconnect(self):
+ """
+ Test calling disconnect() while already disconnected.
+ """
+ await self._watch_runstates(
+ Runstate.DISCONNECTING,
+ Runstate.IDLE,
+ )
+ await self.proto.disconnect()
+
+
+class Connect(TestBase):
+ """
+ Tests primarily related to calling Connect().
+ """
+ async def _bad_connection(self, family: str):
+ assert family in ('INET', 'UNIX')
+
+ if family == 'INET':
+ await self.proto.connect(('127.0.0.1', 0))
+ elif family == 'UNIX':
+ await self.proto.connect('/dev/null')
+
+ async def _hanging_connection(self):
+ with jammed_socket() as addr:
+ await self.proto.connect(addr)
+
+ async def _bad_connection_test(self, family: str):
+ await self._watch_runstates(*self.BAD_CONNECTION_STATES)
+
+ with self.assertRaises(ConnectError) as context:
+ await self._bad_connection(family)
+
+ self.assertIsInstance(context.exception.exc, OSError)
+ self.assertEqual(
+ context.exception.error_message,
+ "Failed to establish connection"
+ )
+
+ @TestBase.async_test
+ async def testBadINET(self):
+ """
+ Test an immediately rejected call to an IP target.
+ """
+ await self._bad_connection_test('INET')
+
+ @TestBase.async_test
+ async def testBadUNIX(self):
+ """
+ Test an immediately rejected call to a UNIX socket target.
+ """
+ await self._bad_connection_test('UNIX')
+
+ @TestBase.async_test
+ async def testCancellation(self):
+ """
+ Test what happens when a connection attempt is aborted.
+ """
+ # Note that accept() cannot be cancelled outright, as it isn't a task.
+ # However, we can wrap it in a task and cancel *that*.
+ await self._watch_runstates(*self.BAD_CONNECTION_STATES)
+ task = run_as_task(self._hanging_connection(), allow_cancellation=True)
+
+ state = await self.proto.runstate_changed()
+ self.assertEqual(state, Runstate.CONNECTING)
+
+ # This is insider baseball, but the connection attempt has
+ # yielded *just* before the actual connection attempt, so kick
+ # the loop to make sure it's truly wedged.
+ await asyncio.sleep(0)
+
+ task.cancel()
+ await task
+
+ @TestBase.async_test
+ async def testTimeout(self):
+ """
+ Test what happens when a connection attempt times out.
+ """
+ await self._watch_runstates(*self.BAD_CONNECTION_STATES)
+ task = run_as_task(self._hanging_connection())
+
+ # More insider baseball: to improve the speed of this test while
+ # guaranteeing that the connection even gets a chance to start,
+ # verify that the connection hangs *first*, then await the
+ # result of the task with a nearly-zero timeout.
+
+ state = await self.proto.runstate_changed()
+ self.assertEqual(state, Runstate.CONNECTING)
+ await asyncio.sleep(0)
+
+ with self.assertRaises(asyncio.TimeoutError):
+ await asyncio.wait_for(task, timeout=0)
+
+ @TestBase.async_test
+ async def testRequire(self):
+ """
+ Test what happens when a connection attempt is made while CONNECTING.
+ """
+ await self._watch_runstates(*self.BAD_CONNECTION_STATES)
+ task = run_as_task(self._hanging_connection(), allow_cancellation=True)
+
+ state = await self.proto.runstate_changed()
+ self.assertEqual(state, Runstate.CONNECTING)
+
+ with self.assertRaises(StateError) as context:
+ await self._bad_connection('UNIX')
+
+ self.assertEqual(
+ context.exception.error_message,
+ "NullProtocol is currently connecting."
+ )
+ self.assertEqual(context.exception.state, Runstate.CONNECTING)
+ self.assertEqual(context.exception.required, Runstate.IDLE)
+
+ task.cancel()
+ await task
+
+ @TestBase.async_test
+ async def testImplicitRunstateInit(self):
+ """
+ Test what happens if we do not wait on the runstate event until
+ AFTER a connection is made, i.e., connect()/accept() themselves
+ initialize the runstate event. All of the above tests force the
+ initialization by waiting on the runstate *first*.
+ """
+ task = run_as_task(self._hanging_connection(), allow_cancellation=True)
+
+ # Kick the loop to coerce the state change
+ await asyncio.sleep(0)
+ assert self.proto.runstate == Runstate.CONNECTING
+
+ # We already missed the transition to CONNECTING
+ await self._watch_runstates(Runstate.DISCONNECTING, Runstate.IDLE)
+
+ task.cancel()
+ await task
+
+
+class Accept(Connect):
+ """
+ All of the same tests as Connect, but using the accept() interface.
+ """
+ async def _bad_connection(self, family: str):
+ assert family in ('INET', 'UNIX')
+
+ if family == 'INET':
+ await self.proto.accept(('example.com', 1))
+ elif family == 'UNIX':
+ await self.proto.accept('/dev/null')
+
+ async def _hanging_connection(self):
+ with TemporaryDirectory(suffix='.aqmp') as tmpdir:
+ sock = os.path.join(tmpdir, type(self.proto).__name__ + ".sock")
+ await self.proto.accept(sock)
+
+
+class FakeSession(TestBase):
+
+ def setUp(self):
+ super().setUp()
+ self.proto.fake_session = True
+
+ async def _asyncSetUp(self):
+ await super()._asyncSetUp()
+ await self._watch_runstates(*self.GOOD_CONNECTION_STATES)
+
+ async def _asyncTearDown(self):
+ await self.proto.disconnect()
+ await super()._asyncTearDown()
+
+ ####
+
+ @TestBase.async_test
+ async def testFakeConnect(self):
+
+ """Test the full state lifecycle (via connect) with a no-op session."""
+ await self.proto.connect('/not/a/real/path')
+ self.assertEqual(self.proto.runstate, Runstate.RUNNING)
+
+ @TestBase.async_test
+ async def testFakeAccept(self):
+ """Test the full state lifecycle (via accept) with a no-op session."""
+ await self.proto.accept('/not/a/real/path')
+ self.assertEqual(self.proto.runstate, Runstate.RUNNING)
+
+ @TestBase.async_test
+ async def testFakeRecv(self):
+ """Test receiving a fake/null message."""
+ await self.proto.accept('/not/a/real/path')
+
+ logname = self.proto.logger.name
+ with self.assertLogs(logname, level='DEBUG') as context:
+ self.proto.trigger_input.set()
+ self.proto.trigger_input.clear()
+ await asyncio.sleep(0) # Kick reader.
+
+ self.assertEqual(
+ context.output,
+ [f"DEBUG:{logname}:<-- None"],
+ )
+
+ @TestBase.async_test
+ async def testFakeSend(self):
+ """Test sending a fake/null message."""
+ await self.proto.accept('/not/a/real/path')
+
+ logname = self.proto.logger.name
+ with self.assertLogs(logname, level='DEBUG') as context:
+ # Cheat: Send a Null message to nobody.
+ await self.proto.send_msg()
+ # Kick writer; awaiting on a queue.put isn't sufficient to yield.
+ await asyncio.sleep(0)
+
+ self.assertEqual(
+ context.output,
+ [f"DEBUG:{logname}:--> None"],
+ )
+
+ async def _prod_session_api(
+ self,
+ current_state: Runstate,
+ error_message: str,
+ accept: bool = True
+ ):
+ with self.assertRaises(StateError) as context:
+ if accept:
+ await self.proto.accept('/not/a/real/path')
+ else:
+ await self.proto.connect('/not/a/real/path')
+
+ self.assertEqual(context.exception.error_message, error_message)
+ self.assertEqual(context.exception.state, current_state)
+ self.assertEqual(context.exception.required, Runstate.IDLE)
+
+ @TestBase.async_test
+ async def testAcceptRequireRunning(self):
+ """Test that accept() cannot be called when Runstate=RUNNING"""
+ await self.proto.accept('/not/a/real/path')
+
+ await self._prod_session_api(
+ Runstate.RUNNING,
+ "NullProtocol is already connected and running.",
+ accept=True,
+ )
+
+ @TestBase.async_test
+ async def testConnectRequireRunning(self):
+ """Test that connect() cannot be called when Runstate=RUNNING"""
+ await self.proto.accept('/not/a/real/path')
+
+ await self._prod_session_api(
+ Runstate.RUNNING,
+ "NullProtocol is already connected and running.",
+ accept=False,
+ )
+
+ @TestBase.async_test
+ async def testAcceptRequireDisconnecting(self):
+ """Test that accept() cannot be called when Runstate=DISCONNECTING"""
+ await self.proto.accept('/not/a/real/path')
+
+ # Cheat: force a disconnect.
+ await self.proto.simulate_disconnect()
+
+ await self._prod_session_api(
+ Runstate.DISCONNECTING,
+ ("NullProtocol is disconnecting."
+ " Call disconnect() to return to IDLE state."),
+ accept=True,
+ )
+
+ @TestBase.async_test
+ async def testConnectRequireDisconnecting(self):
+ """Test that connect() cannot be called when Runstate=DISCONNECTING"""
+ await self.proto.accept('/not/a/real/path')
+
+ # Cheat: force a disconnect.
+ await self.proto.simulate_disconnect()
+
+ await self._prod_session_api(
+ Runstate.DISCONNECTING,
+ ("NullProtocol is disconnecting."
+ " Call disconnect() to return to IDLE state."),
+ accept=False,
+ )
+
+
+class SimpleSession(TestBase):
+
+ def setUp(self):
+ super().setUp()
+ self.server = LineProtocol(type(self).__name__ + '-server')
+
+ async def _asyncSetUp(self):
+ await super()._asyncSetUp()
+ await self._watch_runstates(*self.GOOD_CONNECTION_STATES)
+
+ async def _asyncTearDown(self):
+ await self.proto.disconnect()
+ try:
+ await self.server.disconnect()
+ except EOFError:
+ pass
+ await super()._asyncTearDown()
+
+ @TestBase.async_test
+ async def testSmoke(self):
+ with TemporaryDirectory(suffix='.aqmp') as tmpdir:
+ sock = os.path.join(tmpdir, type(self.proto).__name__ + ".sock")
+ server_task = create_task(self.server.accept(sock))
+
+ # give the server a chance to start listening [...]
+ await asyncio.sleep(0)
+ await self.proto.connect(sock)