summaryrefslogtreecommitdiff
path: root/lib/ansible/utils/display.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/ansible/utils/display.py')
-rw-r--r--lib/ansible/utils/display.py374
1 files changed, 52 insertions, 322 deletions
diff --git a/lib/ansible/utils/display.py b/lib/ansible/utils/display.py
index 3f331ad8..7d98ad47 100644
--- a/lib/ansible/utils/display.py
+++ b/lib/ansible/utils/display.py
@@ -15,49 +15,34 @@
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
-from __future__ import annotations
-
-try:
- import curses
-except ImportError:
- HAS_CURSES = False
-else:
- # this will be set to False if curses.setupterm() fails
- HAS_CURSES = True
-
-import collections.abc as c
-import codecs
+from __future__ import (absolute_import, division, print_function)
+__metaclass__ = type
+
import ctypes.util
import fcntl
import getpass
-import io
import logging
import os
import random
import subprocess
import sys
-import termios
import textwrap
import threading
import time
-import tty
-import typing as t
-from functools import wraps
from struct import unpack, pack
+from termios import TIOCGWINSZ
from ansible import constants as C
-from ansible.errors import AnsibleError, AnsibleAssertionError, AnsiblePromptInterrupt, AnsiblePromptNoninteractive
-from ansible.module_utils.common.text.converters import to_bytes, to_text
+from ansible.errors import AnsibleError, AnsibleAssertionError
+from ansible.module_utils._text import to_bytes, to_text
from ansible.module_utils.six import text_type
from ansible.utils.color import stringc
from ansible.utils.multiprocessing import context as multiprocessing_context
from ansible.utils.singleton import Singleton
from ansible.utils.unsafe_proxy import wrap_var
+from functools import wraps
-if t.TYPE_CHECKING:
- # avoid circular import at runtime
- from ansible.executor.task_queue_manager import FinalQueue
_LIBC = ctypes.cdll.LoadLibrary(ctypes.util.find_library('c'))
# Set argtypes, to avoid segfault if the wrong type is provided,
@@ -67,11 +52,8 @@ _LIBC.wcswidth.argtypes = (ctypes.c_wchar_p, ctypes.c_int)
# Max for c_int
_MAX_INT = 2 ** (ctypes.sizeof(ctypes.c_int) * 8 - 1) - 1
-MOVE_TO_BOL = b'\r'
-CLEAR_TO_EOL = b'\x1b[K'
-
-def get_text_width(text: str) -> int:
+def get_text_width(text):
"""Function that utilizes ``wcswidth`` or ``wcwidth`` to determine the
number of columns used to display a text string.
@@ -122,20 +104,6 @@ def get_text_width(text: str) -> int:
return width if width >= 0 else 0
-def proxy_display(method):
-
- def proxyit(self, *args, **kwargs):
- if self._final_q:
- # If _final_q is set, that means we are in a WorkerProcess
- # and instead of displaying messages directly from the fork
- # we will proxy them through the queue
- return self._final_q.send_display(method.__name__, *args, **kwargs)
- else:
- return method(self, *args, **kwargs)
-
- return proxyit
-
-
class FilterBlackList(logging.Filter):
def __init__(self, blacklist):
self.blacklist = [logging.Filter(name) for name in blacklist]
@@ -196,7 +164,7 @@ b_COW_PATHS = (
)
-def _synchronize_textiowrapper(tio: t.TextIO, lock: threading.RLock):
+def _synchronize_textiowrapper(tio, lock):
# Ensure that a background thread can't hold the internal buffer lock on a file object
# during a fork, which causes forked children to hang. We're using display's existing lock for
# convenience (and entering the lock before a fork).
@@ -211,70 +179,15 @@ def _synchronize_textiowrapper(tio: t.TextIO, lock: threading.RLock):
buffer = tio.buffer
# monkeypatching the underlying file-like object isn't great, but likely safer than subclassing
- buffer.write = _wrap_with_lock(buffer.write, lock) # type: ignore[method-assign]
- buffer.flush = _wrap_with_lock(buffer.flush, lock) # type: ignore[method-assign]
-
-
-def setraw(fd: int, when: int = termios.TCSAFLUSH) -> None:
- """Put terminal into a raw mode.
-
- Copied from ``tty`` from CPython 3.11.0, and modified to not remove OPOST from OFLAG
-
- OPOST is kept to prevent an issue with multi line prompts from being corrupted now that display
- is proxied via the queue from forks. The problem is a race condition, in that we proxy the display
- over the fork, but before it can be displayed, this plugin will have continued executing, potentially
- setting stdout and stdin to raw which remove output post processing that commonly converts NL to CRLF
- """
- mode = termios.tcgetattr(fd)
- mode[tty.IFLAG] = mode[tty.IFLAG] & ~(termios.BRKINT | termios.ICRNL | termios.INPCK | termios.ISTRIP | termios.IXON)
- mode[tty.OFLAG] = mode[tty.OFLAG] & ~(termios.OPOST)
- mode[tty.CFLAG] = mode[tty.CFLAG] & ~(termios.CSIZE | termios.PARENB)
- mode[tty.CFLAG] = mode[tty.CFLAG] | termios.CS8
- mode[tty.LFLAG] = mode[tty.LFLAG] & ~(termios.ECHO | termios.ICANON | termios.IEXTEN | termios.ISIG)
- mode[tty.CC][termios.VMIN] = 1
- mode[tty.CC][termios.VTIME] = 0
- termios.tcsetattr(fd, when, mode)
-
-
-def clear_line(stdout: t.BinaryIO) -> None:
- stdout.write(b'\x1b[%s' % MOVE_TO_BOL)
- stdout.write(b'\x1b[%s' % CLEAR_TO_EOL)
-
-
-def setup_prompt(stdin_fd: int, stdout_fd: int, seconds: int, echo: bool) -> None:
- setraw(stdin_fd)
-
- # Only set stdout to raw mode if it is a TTY. This is needed when redirecting
- # stdout to a file since a file cannot be set to raw mode.
- if os.isatty(stdout_fd):
- setraw(stdout_fd)
-
- if echo:
- new_settings = termios.tcgetattr(stdin_fd)
- new_settings[3] = new_settings[3] | termios.ECHO
- termios.tcsetattr(stdin_fd, termios.TCSANOW, new_settings)
-
-
-def setupterm() -> None:
- # Nest the try except since curses.error is not available if curses did not import
- try:
- curses.setupterm()
- except (curses.error, TypeError, io.UnsupportedOperation):
- global HAS_CURSES
- HAS_CURSES = False
- else:
- global MOVE_TO_BOL
- global CLEAR_TO_EOL
- # curses.tigetstr() returns None in some circumstances
- MOVE_TO_BOL = curses.tigetstr('cr') or MOVE_TO_BOL
- CLEAR_TO_EOL = curses.tigetstr('el') or CLEAR_TO_EOL
+ buffer.write = _wrap_with_lock(buffer.write, lock)
+ buffer.flush = _wrap_with_lock(buffer.flush, lock)
class Display(metaclass=Singleton):
- def __init__(self, verbosity: int = 0) -> None:
+ def __init__(self, verbosity=0):
- self._final_q: FinalQueue | None = None
+ self._final_q = None
# NB: this lock is used to both prevent intermingled output between threads and to block writes during forks.
# Do not change the type of this lock or upgrade to a shared lock (eg multiprocessing.RLock).
@@ -284,11 +197,11 @@ class Display(metaclass=Singleton):
self.verbosity = verbosity
# list of all deprecation messages to prevent duplicate display
- self._deprecations: dict[str, int] = {}
- self._warns: dict[str, int] = {}
- self._errors: dict[str, int] = {}
+ self._deprecations = {}
+ self._warns = {}
+ self._errors = {}
- self.b_cowsay: bytes | None = None
+ self.b_cowsay = None
self.noncow = C.ANSIBLE_COW_SELECTION
self.set_cowsay_info()
@@ -299,12 +212,12 @@ class Display(metaclass=Singleton):
(out, err) = cmd.communicate()
if cmd.returncode:
raise Exception
- self.cows_available: set[str] = {to_text(c) for c in out.split()}
+ self.cows_available = {to_text(c) for c in out.split()} # set comprehension
if C.ANSIBLE_COW_ACCEPTLIST and any(C.ANSIBLE_COW_ACCEPTLIST):
self.cows_available = set(C.ANSIBLE_COW_ACCEPTLIST).intersection(self.cows_available)
except Exception:
# could not execute cowsay for some reason
- self.b_cowsay = None
+ self.b_cowsay = False
self._set_column_width()
@@ -315,25 +228,13 @@ class Display(metaclass=Singleton):
except Exception as ex:
self.warning(f"failed to patch stdout/stderr for fork-safety: {ex}")
- codecs.register_error('_replacing_warning_handler', self._replacing_warning_handler)
try:
- sys.stdout.reconfigure(errors='_replacing_warning_handler')
- sys.stderr.reconfigure(errors='_replacing_warning_handler')
+ sys.stdout.reconfigure(errors='replace')
+ sys.stderr.reconfigure(errors='replace')
except Exception as ex:
- self.warning(f"failed to reconfigure stdout/stderr with custom encoding error handler: {ex}")
+ self.warning(f"failed to reconfigure stdout/stderr with the replace error handler: {ex}")
- self.setup_curses = False
-
- def _replacing_warning_handler(self, exception: UnicodeError) -> tuple[str | bytes, int]:
- # TODO: This should probably be deferred until after the current display is completed
- # this will require some amount of new functionality
- self.deprecated(
- 'Non UTF-8 encoded data replaced with "?" while displaying text to stdout/stderr, this is temporary and will become an error',
- version='2.18',
- )
- return '?', exception.end
-
- def set_queue(self, queue: FinalQueue) -> None:
+ def set_queue(self, queue):
"""Set the _final_q on Display, so that we know to proxy display over the queue
instead of directly writing to stdout/stderr from forks
@@ -343,7 +244,7 @@ class Display(metaclass=Singleton):
raise RuntimeError('queue cannot be set in parent process')
self._final_q = queue
- def set_cowsay_info(self) -> None:
+ def set_cowsay_info(self):
if C.ANSIBLE_NOCOWS:
return
@@ -354,23 +255,18 @@ class Display(metaclass=Singleton):
if os.path.exists(b_cow_path):
self.b_cowsay = b_cow_path
- @proxy_display
- def display(
- self,
- msg: str,
- color: str | None = None,
- stderr: bool = False,
- screen_only: bool = False,
- log_only: bool = False,
- newline: bool = True,
- ) -> None:
+ def display(self, msg, color=None, stderr=False, screen_only=False, log_only=False, newline=True):
""" Display a message to the user
Note: msg *must* be a unicode string to prevent UnicodeError tracebacks.
"""
- if not isinstance(msg, str):
- raise TypeError(f'Display message must be str, not: {msg.__class__.__name__}')
+ if self._final_q:
+ # If _final_q is set, that means we are in a WorkerProcess
+ # and instead of displaying messages directly from the fork
+ # we will proxy them through the queue
+ return self._final_q.send_display(msg, color=color, stderr=stderr,
+ screen_only=screen_only, log_only=log_only, newline=newline)
nocolor = msg
@@ -425,32 +321,32 @@ class Display(metaclass=Singleton):
# actually log
logger.log(lvl, msg2)
- def v(self, msg: str, host: str | None = None) -> None:
+ def v(self, msg, host=None):
return self.verbose(msg, host=host, caplevel=0)
- def vv(self, msg: str, host: str | None = None) -> None:
+ def vv(self, msg, host=None):
return self.verbose(msg, host=host, caplevel=1)
- def vvv(self, msg: str, host: str | None = None) -> None:
+ def vvv(self, msg, host=None):
return self.verbose(msg, host=host, caplevel=2)
- def vvvv(self, msg: str, host: str | None = None) -> None:
+ def vvvv(self, msg, host=None):
return self.verbose(msg, host=host, caplevel=3)
- def vvvvv(self, msg: str, host: str | None = None) -> None:
+ def vvvvv(self, msg, host=None):
return self.verbose(msg, host=host, caplevel=4)
- def vvvvvv(self, msg: str, host: str | None = None) -> None:
+ def vvvvvv(self, msg, host=None):
return self.verbose(msg, host=host, caplevel=5)
- def debug(self, msg: str, host: str | None = None) -> None:
+ def debug(self, msg, host=None):
if C.DEFAULT_DEBUG:
if host is None:
self.display("%6d %0.5f: %s" % (os.getpid(), time.time(), msg), color=C.COLOR_DEBUG)
else:
self.display("%6d %0.5f [%s]: %s" % (os.getpid(), time.time(), host, msg), color=C.COLOR_DEBUG)
- def verbose(self, msg: str, host: str | None = None, caplevel: int = 2) -> None:
+ def verbose(self, msg, host=None, caplevel=2):
to_stderr = C.VERBOSE_TO_STDERR
if self.verbosity > caplevel:
@@ -459,14 +355,7 @@ class Display(metaclass=Singleton):
else:
self.display("<%s> %s" % (host, msg), color=C.COLOR_VERBOSE, stderr=to_stderr)
- def get_deprecation_message(
- self,
- msg: str,
- version: str | None = None,
- removed: bool = False,
- date: str | None = None,
- collection_name: str | None = None,
- ) -> str:
+ def get_deprecation_message(self, msg, version=None, removed=False, date=None, collection_name=None):
''' used to print out a deprecation message.'''
msg = msg.strip()
if msg and msg[-1] not in ['!', '?', '.']:
@@ -501,15 +390,7 @@ class Display(metaclass=Singleton):
return message_text
- @proxy_display
- def deprecated(
- self,
- msg: str,
- version: str | None = None,
- removed: bool = False,
- date: str | None = None,
- collection_name: str | None = None,
- ) -> None:
+ def deprecated(self, msg, version=None, removed=False, date=None, collection_name=None):
if not removed and not C.DEPRECATION_WARNINGS:
return
@@ -525,8 +406,7 @@ class Display(metaclass=Singleton):
self.display(message_text.strip(), color=C.COLOR_DEPRECATE, stderr=True)
self._deprecations[message_text] = 1
- @proxy_display
- def warning(self, msg: str, formatted: bool = False) -> None:
+ def warning(self, msg, formatted=False):
if not formatted:
new_msg = "[WARNING]: %s" % msg
@@ -539,11 +419,11 @@ class Display(metaclass=Singleton):
self.display(new_msg, color=C.COLOR_WARN, stderr=True)
self._warns[new_msg] = 1
- def system_warning(self, msg: str) -> None:
+ def system_warning(self, msg):
if C.SYSTEM_WARNINGS:
self.warning(msg)
- def banner(self, msg: str, color: str | None = None, cows: bool = True) -> None:
+ def banner(self, msg, color=None, cows=True):
'''
Prints a header-looking line with cowsay or stars with length depending on terminal width (3 minimum)
'''
@@ -566,7 +446,7 @@ class Display(metaclass=Singleton):
stars = u"*" * star_len
self.display(u"\n%s %s" % (msg, stars), color=color)
- def banner_cowsay(self, msg: str, color: str | None = None) -> None:
+ def banner_cowsay(self, msg, color=None):
if u": [" in msg:
msg = msg.replace(u"[", u"")
if msg.endswith(u"]"):
@@ -583,7 +463,7 @@ class Display(metaclass=Singleton):
(out, err) = cmd.communicate()
self.display(u"%s\n" % to_text(out), color=color)
- def error(self, msg: str, wrap_text: bool = True) -> None:
+ def error(self, msg, wrap_text=True):
if wrap_text:
new_msg = u"\n[ERROR]: %s" % msg
wrapped = textwrap.wrap(new_msg, self.columns)
@@ -595,24 +475,14 @@ class Display(metaclass=Singleton):
self._errors[new_msg] = 1
@staticmethod
- def prompt(msg: str, private: bool = False) -> str:
+ def prompt(msg, private=False):
if private:
return getpass.getpass(msg)
else:
return input(msg)
- def do_var_prompt(
- self,
- varname: str,
- private: bool = True,
- prompt: str | None = None,
- encrypt: str | None = None,
- confirm: bool = False,
- salt_size: int | None = None,
- salt: str | None = None,
- default: str | None = None,
- unsafe: bool = False,
- ) -> str:
+ def do_var_prompt(self, varname, private=True, prompt=None, encrypt=None, confirm=False, salt_size=None, salt=None, default=None, unsafe=None):
+
result = None
if sys.__stdin__.isatty():
@@ -645,7 +515,7 @@ class Display(metaclass=Singleton):
if encrypt:
# Circular import because encrypt needs a display class
from ansible.utils.encrypt import do_encrypt
- result = do_encrypt(result, encrypt, salt_size=salt_size, salt=salt)
+ result = do_encrypt(result, encrypt, salt_size, salt)
# handle utf-8 chars
result = to_text(result, errors='surrogate_or_strict')
@@ -654,149 +524,9 @@ class Display(metaclass=Singleton):
result = wrap_var(result)
return result
- def _set_column_width(self) -> None:
+ def _set_column_width(self):
if os.isatty(1):
- tty_size = unpack('HHHH', fcntl.ioctl(1, termios.TIOCGWINSZ, pack('HHHH', 0, 0, 0, 0)))[1]
+ tty_size = unpack('HHHH', fcntl.ioctl(1, TIOCGWINSZ, pack('HHHH', 0, 0, 0, 0)))[1]
else:
tty_size = 0
self.columns = max(79, tty_size - 1)
-
- def prompt_until(
- self,
- msg: str,
- private: bool = False,
- seconds: int | None = None,
- interrupt_input: c.Container[bytes] | None = None,
- complete_input: c.Container[bytes] | None = None,
- ) -> bytes:
- if self._final_q:
- from ansible.executor.process.worker import current_worker
- self._final_q.send_prompt(
- worker_id=current_worker.worker_id, prompt=msg, private=private, seconds=seconds,
- interrupt_input=interrupt_input, complete_input=complete_input
- )
- return current_worker.worker_queue.get()
-
- if HAS_CURSES and not self.setup_curses:
- setupterm()
- self.setup_curses = True
-
- if (
- self._stdin_fd is None
- or not os.isatty(self._stdin_fd)
- # Compare the current process group to the process group associated
- # with terminal of the given file descriptor to determine if the process
- # is running in the background.
- or os.getpgrp() != os.tcgetpgrp(self._stdin_fd)
- ):
- raise AnsiblePromptNoninteractive('stdin is not interactive')
-
- # When seconds/interrupt_input/complete_input are all None, this does mostly the same thing as input/getpass,
- # but self.prompt may raise a KeyboardInterrupt, which must be caught in the main thread.
- # If the main thread handled this, it would also need to send a newline to the tty of any hanging pids.
- # if seconds is None and interrupt_input is None and complete_input is None:
- # try:
- # return self.prompt(msg, private=private)
- # except KeyboardInterrupt:
- # # can't catch in the results_thread_main daemon thread
- # raise AnsiblePromptInterrupt('user interrupt')
-
- self.display(msg)
- result = b''
- with self._lock:
- original_stdin_settings = termios.tcgetattr(self._stdin_fd)
- try:
- setup_prompt(self._stdin_fd, self._stdout_fd, seconds, not private)
-
- # flush the buffer to make sure no previous key presses
- # are read in below
- termios.tcflush(self._stdin, termios.TCIFLUSH)
-
- # read input 1 char at a time until the optional timeout or complete/interrupt condition is met
- return self._read_non_blocking_stdin(echo=not private, seconds=seconds, interrupt_input=interrupt_input, complete_input=complete_input)
- finally:
- # restore the old settings for the duped stdin stdin_fd
- termios.tcsetattr(self._stdin_fd, termios.TCSADRAIN, original_stdin_settings)
-
- def _read_non_blocking_stdin(
- self,
- echo: bool = False,
- seconds: int | None = None,
- interrupt_input: c.Container[bytes] | None = None,
- complete_input: c.Container[bytes] | None = None,
- ) -> bytes:
- if self._final_q:
- raise NotImplementedError
-
- if seconds is not None:
- start = time.time()
- if interrupt_input is None:
- try:
- interrupt = termios.tcgetattr(sys.stdin.buffer.fileno())[6][termios.VINTR]
- except Exception:
- interrupt = b'\x03' # value for Ctrl+C
-
- try:
- backspace_sequences = [termios.tcgetattr(self._stdin_fd)[6][termios.VERASE]]
- except Exception:
- # unsupported/not present, use default
- backspace_sequences = [b'\x7f', b'\x08']
-
- result_string = b''
- while seconds is None or (time.time() - start < seconds):
- key_pressed = None
- try:
- os.set_blocking(self._stdin_fd, False)
- while key_pressed is None and (seconds is None or (time.time() - start < seconds)):
- key_pressed = self._stdin.read(1)
- # throttle to prevent excess CPU consumption
- time.sleep(C.DEFAULT_INTERNAL_POLL_INTERVAL)
- finally:
- os.set_blocking(self._stdin_fd, True)
- if key_pressed is None:
- key_pressed = b''
-
- if (interrupt_input is None and key_pressed == interrupt) or (interrupt_input is not None and key_pressed.lower() in interrupt_input):
- clear_line(self._stdout)
- raise AnsiblePromptInterrupt('user interrupt')
- if (complete_input is None and key_pressed in (b'\r', b'\n')) or (complete_input is not None and key_pressed.lower() in complete_input):
- clear_line(self._stdout)
- break
- elif key_pressed in backspace_sequences:
- clear_line(self._stdout)
- result_string = result_string[:-1]
- if echo:
- self._stdout.write(result_string)
- self._stdout.flush()
- else:
- result_string += key_pressed
- return result_string
-
- @property
- def _stdin(self) -> t.BinaryIO | None:
- if self._final_q:
- raise NotImplementedError
- try:
- return sys.stdin.buffer
- except AttributeError:
- return None
-
- @property
- def _stdin_fd(self) -> int | None:
- try:
- return self._stdin.fileno()
- except (ValueError, AttributeError):
- return None
-
- @property
- def _stdout(self) -> t.BinaryIO:
- if self._final_q:
- raise NotImplementedError
- return sys.stdout.buffer
-
- @property
- def _stdout_fd(self) -> int | None:
- try:
- return self._stdout.fileno()
- except (ValueError, AttributeError):
- return None