diff options
Diffstat (limited to 'lib/ansible/utils/display.py')
-rw-r--r-- | lib/ansible/utils/display.py | 374 |
1 files changed, 52 insertions, 322 deletions
diff --git a/lib/ansible/utils/display.py b/lib/ansible/utils/display.py index 3f331ad8..7d98ad47 100644 --- a/lib/ansible/utils/display.py +++ b/lib/ansible/utils/display.py @@ -15,49 +15,34 @@ # You should have received a copy of the GNU General Public License # along with Ansible. If not, see <http://www.gnu.org/licenses/>. -from __future__ import annotations - -try: - import curses -except ImportError: - HAS_CURSES = False -else: - # this will be set to False if curses.setupterm() fails - HAS_CURSES = True - -import collections.abc as c -import codecs +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + import ctypes.util import fcntl import getpass -import io import logging import os import random import subprocess import sys -import termios import textwrap import threading import time -import tty -import typing as t -from functools import wraps from struct import unpack, pack +from termios import TIOCGWINSZ from ansible import constants as C -from ansible.errors import AnsibleError, AnsibleAssertionError, AnsiblePromptInterrupt, AnsiblePromptNoninteractive -from ansible.module_utils.common.text.converters import to_bytes, to_text +from ansible.errors import AnsibleError, AnsibleAssertionError +from ansible.module_utils._text import to_bytes, to_text from ansible.module_utils.six import text_type from ansible.utils.color import stringc from ansible.utils.multiprocessing import context as multiprocessing_context from ansible.utils.singleton import Singleton from ansible.utils.unsafe_proxy import wrap_var +from functools import wraps -if t.TYPE_CHECKING: - # avoid circular import at runtime - from ansible.executor.task_queue_manager import FinalQueue _LIBC = ctypes.cdll.LoadLibrary(ctypes.util.find_library('c')) # Set argtypes, to avoid segfault if the wrong type is provided, @@ -67,11 +52,8 @@ _LIBC.wcswidth.argtypes = (ctypes.c_wchar_p, ctypes.c_int) # Max for c_int _MAX_INT = 2 ** (ctypes.sizeof(ctypes.c_int) * 8 - 1) - 1 -MOVE_TO_BOL = b'\r' -CLEAR_TO_EOL = b'\x1b[K' - -def get_text_width(text: str) -> int: +def get_text_width(text): """Function that utilizes ``wcswidth`` or ``wcwidth`` to determine the number of columns used to display a text string. @@ -122,20 +104,6 @@ def get_text_width(text: str) -> int: return width if width >= 0 else 0 -def proxy_display(method): - - def proxyit(self, *args, **kwargs): - if self._final_q: - # If _final_q is set, that means we are in a WorkerProcess - # and instead of displaying messages directly from the fork - # we will proxy them through the queue - return self._final_q.send_display(method.__name__, *args, **kwargs) - else: - return method(self, *args, **kwargs) - - return proxyit - - class FilterBlackList(logging.Filter): def __init__(self, blacklist): self.blacklist = [logging.Filter(name) for name in blacklist] @@ -196,7 +164,7 @@ b_COW_PATHS = ( ) -def _synchronize_textiowrapper(tio: t.TextIO, lock: threading.RLock): +def _synchronize_textiowrapper(tio, lock): # Ensure that a background thread can't hold the internal buffer lock on a file object # during a fork, which causes forked children to hang. We're using display's existing lock for # convenience (and entering the lock before a fork). @@ -211,70 +179,15 @@ def _synchronize_textiowrapper(tio: t.TextIO, lock: threading.RLock): buffer = tio.buffer # monkeypatching the underlying file-like object isn't great, but likely safer than subclassing - buffer.write = _wrap_with_lock(buffer.write, lock) # type: ignore[method-assign] - buffer.flush = _wrap_with_lock(buffer.flush, lock) # type: ignore[method-assign] - - -def setraw(fd: int, when: int = termios.TCSAFLUSH) -> None: - """Put terminal into a raw mode. - - Copied from ``tty`` from CPython 3.11.0, and modified to not remove OPOST from OFLAG - - OPOST is kept to prevent an issue with multi line prompts from being corrupted now that display - is proxied via the queue from forks. The problem is a race condition, in that we proxy the display - over the fork, but before it can be displayed, this plugin will have continued executing, potentially - setting stdout and stdin to raw which remove output post processing that commonly converts NL to CRLF - """ - mode = termios.tcgetattr(fd) - mode[tty.IFLAG] = mode[tty.IFLAG] & ~(termios.BRKINT | termios.ICRNL | termios.INPCK | termios.ISTRIP | termios.IXON) - mode[tty.OFLAG] = mode[tty.OFLAG] & ~(termios.OPOST) - mode[tty.CFLAG] = mode[tty.CFLAG] & ~(termios.CSIZE | termios.PARENB) - mode[tty.CFLAG] = mode[tty.CFLAG] | termios.CS8 - mode[tty.LFLAG] = mode[tty.LFLAG] & ~(termios.ECHO | termios.ICANON | termios.IEXTEN | termios.ISIG) - mode[tty.CC][termios.VMIN] = 1 - mode[tty.CC][termios.VTIME] = 0 - termios.tcsetattr(fd, when, mode) - - -def clear_line(stdout: t.BinaryIO) -> None: - stdout.write(b'\x1b[%s' % MOVE_TO_BOL) - stdout.write(b'\x1b[%s' % CLEAR_TO_EOL) - - -def setup_prompt(stdin_fd: int, stdout_fd: int, seconds: int, echo: bool) -> None: - setraw(stdin_fd) - - # Only set stdout to raw mode if it is a TTY. This is needed when redirecting - # stdout to a file since a file cannot be set to raw mode. - if os.isatty(stdout_fd): - setraw(stdout_fd) - - if echo: - new_settings = termios.tcgetattr(stdin_fd) - new_settings[3] = new_settings[3] | termios.ECHO - termios.tcsetattr(stdin_fd, termios.TCSANOW, new_settings) - - -def setupterm() -> None: - # Nest the try except since curses.error is not available if curses did not import - try: - curses.setupterm() - except (curses.error, TypeError, io.UnsupportedOperation): - global HAS_CURSES - HAS_CURSES = False - else: - global MOVE_TO_BOL - global CLEAR_TO_EOL - # curses.tigetstr() returns None in some circumstances - MOVE_TO_BOL = curses.tigetstr('cr') or MOVE_TO_BOL - CLEAR_TO_EOL = curses.tigetstr('el') or CLEAR_TO_EOL + buffer.write = _wrap_with_lock(buffer.write, lock) + buffer.flush = _wrap_with_lock(buffer.flush, lock) class Display(metaclass=Singleton): - def __init__(self, verbosity: int = 0) -> None: + def __init__(self, verbosity=0): - self._final_q: FinalQueue | None = None + self._final_q = None # NB: this lock is used to both prevent intermingled output between threads and to block writes during forks. # Do not change the type of this lock or upgrade to a shared lock (eg multiprocessing.RLock). @@ -284,11 +197,11 @@ class Display(metaclass=Singleton): self.verbosity = verbosity # list of all deprecation messages to prevent duplicate display - self._deprecations: dict[str, int] = {} - self._warns: dict[str, int] = {} - self._errors: dict[str, int] = {} + self._deprecations = {} + self._warns = {} + self._errors = {} - self.b_cowsay: bytes | None = None + self.b_cowsay = None self.noncow = C.ANSIBLE_COW_SELECTION self.set_cowsay_info() @@ -299,12 +212,12 @@ class Display(metaclass=Singleton): (out, err) = cmd.communicate() if cmd.returncode: raise Exception - self.cows_available: set[str] = {to_text(c) for c in out.split()} + self.cows_available = {to_text(c) for c in out.split()} # set comprehension if C.ANSIBLE_COW_ACCEPTLIST and any(C.ANSIBLE_COW_ACCEPTLIST): self.cows_available = set(C.ANSIBLE_COW_ACCEPTLIST).intersection(self.cows_available) except Exception: # could not execute cowsay for some reason - self.b_cowsay = None + self.b_cowsay = False self._set_column_width() @@ -315,25 +228,13 @@ class Display(metaclass=Singleton): except Exception as ex: self.warning(f"failed to patch stdout/stderr for fork-safety: {ex}") - codecs.register_error('_replacing_warning_handler', self._replacing_warning_handler) try: - sys.stdout.reconfigure(errors='_replacing_warning_handler') - sys.stderr.reconfigure(errors='_replacing_warning_handler') + sys.stdout.reconfigure(errors='replace') + sys.stderr.reconfigure(errors='replace') except Exception as ex: - self.warning(f"failed to reconfigure stdout/stderr with custom encoding error handler: {ex}") + self.warning(f"failed to reconfigure stdout/stderr with the replace error handler: {ex}") - self.setup_curses = False - - def _replacing_warning_handler(self, exception: UnicodeError) -> tuple[str | bytes, int]: - # TODO: This should probably be deferred until after the current display is completed - # this will require some amount of new functionality - self.deprecated( - 'Non UTF-8 encoded data replaced with "?" while displaying text to stdout/stderr, this is temporary and will become an error', - version='2.18', - ) - return '?', exception.end - - def set_queue(self, queue: FinalQueue) -> None: + def set_queue(self, queue): """Set the _final_q on Display, so that we know to proxy display over the queue instead of directly writing to stdout/stderr from forks @@ -343,7 +244,7 @@ class Display(metaclass=Singleton): raise RuntimeError('queue cannot be set in parent process') self._final_q = queue - def set_cowsay_info(self) -> None: + def set_cowsay_info(self): if C.ANSIBLE_NOCOWS: return @@ -354,23 +255,18 @@ class Display(metaclass=Singleton): if os.path.exists(b_cow_path): self.b_cowsay = b_cow_path - @proxy_display - def display( - self, - msg: str, - color: str | None = None, - stderr: bool = False, - screen_only: bool = False, - log_only: bool = False, - newline: bool = True, - ) -> None: + def display(self, msg, color=None, stderr=False, screen_only=False, log_only=False, newline=True): """ Display a message to the user Note: msg *must* be a unicode string to prevent UnicodeError tracebacks. """ - if not isinstance(msg, str): - raise TypeError(f'Display message must be str, not: {msg.__class__.__name__}') + if self._final_q: + # If _final_q is set, that means we are in a WorkerProcess + # and instead of displaying messages directly from the fork + # we will proxy them through the queue + return self._final_q.send_display(msg, color=color, stderr=stderr, + screen_only=screen_only, log_only=log_only, newline=newline) nocolor = msg @@ -425,32 +321,32 @@ class Display(metaclass=Singleton): # actually log logger.log(lvl, msg2) - def v(self, msg: str, host: str | None = None) -> None: + def v(self, msg, host=None): return self.verbose(msg, host=host, caplevel=0) - def vv(self, msg: str, host: str | None = None) -> None: + def vv(self, msg, host=None): return self.verbose(msg, host=host, caplevel=1) - def vvv(self, msg: str, host: str | None = None) -> None: + def vvv(self, msg, host=None): return self.verbose(msg, host=host, caplevel=2) - def vvvv(self, msg: str, host: str | None = None) -> None: + def vvvv(self, msg, host=None): return self.verbose(msg, host=host, caplevel=3) - def vvvvv(self, msg: str, host: str | None = None) -> None: + def vvvvv(self, msg, host=None): return self.verbose(msg, host=host, caplevel=4) - def vvvvvv(self, msg: str, host: str | None = None) -> None: + def vvvvvv(self, msg, host=None): return self.verbose(msg, host=host, caplevel=5) - def debug(self, msg: str, host: str | None = None) -> None: + def debug(self, msg, host=None): if C.DEFAULT_DEBUG: if host is None: self.display("%6d %0.5f: %s" % (os.getpid(), time.time(), msg), color=C.COLOR_DEBUG) else: self.display("%6d %0.5f [%s]: %s" % (os.getpid(), time.time(), host, msg), color=C.COLOR_DEBUG) - def verbose(self, msg: str, host: str | None = None, caplevel: int = 2) -> None: + def verbose(self, msg, host=None, caplevel=2): to_stderr = C.VERBOSE_TO_STDERR if self.verbosity > caplevel: @@ -459,14 +355,7 @@ class Display(metaclass=Singleton): else: self.display("<%s> %s" % (host, msg), color=C.COLOR_VERBOSE, stderr=to_stderr) - def get_deprecation_message( - self, - msg: str, - version: str | None = None, - removed: bool = False, - date: str | None = None, - collection_name: str | None = None, - ) -> str: + def get_deprecation_message(self, msg, version=None, removed=False, date=None, collection_name=None): ''' used to print out a deprecation message.''' msg = msg.strip() if msg and msg[-1] not in ['!', '?', '.']: @@ -501,15 +390,7 @@ class Display(metaclass=Singleton): return message_text - @proxy_display - def deprecated( - self, - msg: str, - version: str | None = None, - removed: bool = False, - date: str | None = None, - collection_name: str | None = None, - ) -> None: + def deprecated(self, msg, version=None, removed=False, date=None, collection_name=None): if not removed and not C.DEPRECATION_WARNINGS: return @@ -525,8 +406,7 @@ class Display(metaclass=Singleton): self.display(message_text.strip(), color=C.COLOR_DEPRECATE, stderr=True) self._deprecations[message_text] = 1 - @proxy_display - def warning(self, msg: str, formatted: bool = False) -> None: + def warning(self, msg, formatted=False): if not formatted: new_msg = "[WARNING]: %s" % msg @@ -539,11 +419,11 @@ class Display(metaclass=Singleton): self.display(new_msg, color=C.COLOR_WARN, stderr=True) self._warns[new_msg] = 1 - def system_warning(self, msg: str) -> None: + def system_warning(self, msg): if C.SYSTEM_WARNINGS: self.warning(msg) - def banner(self, msg: str, color: str | None = None, cows: bool = True) -> None: + def banner(self, msg, color=None, cows=True): ''' Prints a header-looking line with cowsay or stars with length depending on terminal width (3 minimum) ''' @@ -566,7 +446,7 @@ class Display(metaclass=Singleton): stars = u"*" * star_len self.display(u"\n%s %s" % (msg, stars), color=color) - def banner_cowsay(self, msg: str, color: str | None = None) -> None: + def banner_cowsay(self, msg, color=None): if u": [" in msg: msg = msg.replace(u"[", u"") if msg.endswith(u"]"): @@ -583,7 +463,7 @@ class Display(metaclass=Singleton): (out, err) = cmd.communicate() self.display(u"%s\n" % to_text(out), color=color) - def error(self, msg: str, wrap_text: bool = True) -> None: + def error(self, msg, wrap_text=True): if wrap_text: new_msg = u"\n[ERROR]: %s" % msg wrapped = textwrap.wrap(new_msg, self.columns) @@ -595,24 +475,14 @@ class Display(metaclass=Singleton): self._errors[new_msg] = 1 @staticmethod - def prompt(msg: str, private: bool = False) -> str: + def prompt(msg, private=False): if private: return getpass.getpass(msg) else: return input(msg) - def do_var_prompt( - self, - varname: str, - private: bool = True, - prompt: str | None = None, - encrypt: str | None = None, - confirm: bool = False, - salt_size: int | None = None, - salt: str | None = None, - default: str | None = None, - unsafe: bool = False, - ) -> str: + def do_var_prompt(self, varname, private=True, prompt=None, encrypt=None, confirm=False, salt_size=None, salt=None, default=None, unsafe=None): + result = None if sys.__stdin__.isatty(): @@ -645,7 +515,7 @@ class Display(metaclass=Singleton): if encrypt: # Circular import because encrypt needs a display class from ansible.utils.encrypt import do_encrypt - result = do_encrypt(result, encrypt, salt_size=salt_size, salt=salt) + result = do_encrypt(result, encrypt, salt_size, salt) # handle utf-8 chars result = to_text(result, errors='surrogate_or_strict') @@ -654,149 +524,9 @@ class Display(metaclass=Singleton): result = wrap_var(result) return result - def _set_column_width(self) -> None: + def _set_column_width(self): if os.isatty(1): - tty_size = unpack('HHHH', fcntl.ioctl(1, termios.TIOCGWINSZ, pack('HHHH', 0, 0, 0, 0)))[1] + tty_size = unpack('HHHH', fcntl.ioctl(1, TIOCGWINSZ, pack('HHHH', 0, 0, 0, 0)))[1] else: tty_size = 0 self.columns = max(79, tty_size - 1) - - def prompt_until( - self, - msg: str, - private: bool = False, - seconds: int | None = None, - interrupt_input: c.Container[bytes] | None = None, - complete_input: c.Container[bytes] | None = None, - ) -> bytes: - if self._final_q: - from ansible.executor.process.worker import current_worker - self._final_q.send_prompt( - worker_id=current_worker.worker_id, prompt=msg, private=private, seconds=seconds, - interrupt_input=interrupt_input, complete_input=complete_input - ) - return current_worker.worker_queue.get() - - if HAS_CURSES and not self.setup_curses: - setupterm() - self.setup_curses = True - - if ( - self._stdin_fd is None - or not os.isatty(self._stdin_fd) - # Compare the current process group to the process group associated - # with terminal of the given file descriptor to determine if the process - # is running in the background. - or os.getpgrp() != os.tcgetpgrp(self._stdin_fd) - ): - raise AnsiblePromptNoninteractive('stdin is not interactive') - - # When seconds/interrupt_input/complete_input are all None, this does mostly the same thing as input/getpass, - # but self.prompt may raise a KeyboardInterrupt, which must be caught in the main thread. - # If the main thread handled this, it would also need to send a newline to the tty of any hanging pids. - # if seconds is None and interrupt_input is None and complete_input is None: - # try: - # return self.prompt(msg, private=private) - # except KeyboardInterrupt: - # # can't catch in the results_thread_main daemon thread - # raise AnsiblePromptInterrupt('user interrupt') - - self.display(msg) - result = b'' - with self._lock: - original_stdin_settings = termios.tcgetattr(self._stdin_fd) - try: - setup_prompt(self._stdin_fd, self._stdout_fd, seconds, not private) - - # flush the buffer to make sure no previous key presses - # are read in below - termios.tcflush(self._stdin, termios.TCIFLUSH) - - # read input 1 char at a time until the optional timeout or complete/interrupt condition is met - return self._read_non_blocking_stdin(echo=not private, seconds=seconds, interrupt_input=interrupt_input, complete_input=complete_input) - finally: - # restore the old settings for the duped stdin stdin_fd - termios.tcsetattr(self._stdin_fd, termios.TCSADRAIN, original_stdin_settings) - - def _read_non_blocking_stdin( - self, - echo: bool = False, - seconds: int | None = None, - interrupt_input: c.Container[bytes] | None = None, - complete_input: c.Container[bytes] | None = None, - ) -> bytes: - if self._final_q: - raise NotImplementedError - - if seconds is not None: - start = time.time() - if interrupt_input is None: - try: - interrupt = termios.tcgetattr(sys.stdin.buffer.fileno())[6][termios.VINTR] - except Exception: - interrupt = b'\x03' # value for Ctrl+C - - try: - backspace_sequences = [termios.tcgetattr(self._stdin_fd)[6][termios.VERASE]] - except Exception: - # unsupported/not present, use default - backspace_sequences = [b'\x7f', b'\x08'] - - result_string = b'' - while seconds is None or (time.time() - start < seconds): - key_pressed = None - try: - os.set_blocking(self._stdin_fd, False) - while key_pressed is None and (seconds is None or (time.time() - start < seconds)): - key_pressed = self._stdin.read(1) - # throttle to prevent excess CPU consumption - time.sleep(C.DEFAULT_INTERNAL_POLL_INTERVAL) - finally: - os.set_blocking(self._stdin_fd, True) - if key_pressed is None: - key_pressed = b'' - - if (interrupt_input is None and key_pressed == interrupt) or (interrupt_input is not None and key_pressed.lower() in interrupt_input): - clear_line(self._stdout) - raise AnsiblePromptInterrupt('user interrupt') - if (complete_input is None and key_pressed in (b'\r', b'\n')) or (complete_input is not None and key_pressed.lower() in complete_input): - clear_line(self._stdout) - break - elif key_pressed in backspace_sequences: - clear_line(self._stdout) - result_string = result_string[:-1] - if echo: - self._stdout.write(result_string) - self._stdout.flush() - else: - result_string += key_pressed - return result_string - - @property - def _stdin(self) -> t.BinaryIO | None: - if self._final_q: - raise NotImplementedError - try: - return sys.stdin.buffer - except AttributeError: - return None - - @property - def _stdin_fd(self) -> int | None: - try: - return self._stdin.fileno() - except (ValueError, AttributeError): - return None - - @property - def _stdout(self) -> t.BinaryIO: - if self._final_q: - raise NotImplementedError - return sys.stdout.buffer - - @property - def _stdout_fd(self) -> int | None: - try: - return self._stdout.fileno() - except (ValueError, AttributeError): - return None |