diff options
Diffstat (limited to 'lib/ansible/executor/task_queue_manager.py')
-rw-r--r-- | lib/ansible/executor/task_queue_manager.py | 35 |
1 files changed, 25 insertions, 10 deletions
diff --git a/lib/ansible/executor/task_queue_manager.py b/lib/ansible/executor/task_queue_manager.py index dcfc38a7..3bbf3d59 100644 --- a/lib/ansible/executor/task_queue_manager.py +++ b/lib/ansible/executor/task_queue_manager.py @@ -24,6 +24,7 @@ import sys import tempfile import threading import time +import typing as t import multiprocessing.queues from ansible import constants as C @@ -33,7 +34,7 @@ from ansible.executor.play_iterator import PlayIterator from ansible.executor.stats import AggregateStats from ansible.executor.task_result import TaskResult from ansible.module_utils.six import string_types -from ansible.module_utils._text import to_text, to_native +from ansible.module_utils.common.text.converters import to_text, to_native from ansible.playbook.play_context import PlayContext from ansible.playbook.task import Task from ansible.plugins.loader import callback_loader, strategy_loader, module_loader @@ -45,6 +46,7 @@ from ansible.utils.display import Display from ansible.utils.lock import lock_decorator from ansible.utils.multiprocessing import context as multiprocessing_context +from dataclasses import dataclass __all__ = ['TaskQueueManager'] @@ -59,20 +61,30 @@ class CallbackSend: class DisplaySend: - def __init__(self, *args, **kwargs): + def __init__(self, method, *args, **kwargs): + self.method = method self.args = args self.kwargs = kwargs -class FinalQueue(multiprocessing.queues.Queue): +@dataclass +class PromptSend: + worker_id: int + prompt: str + private: bool = True + seconds: int = None + interrupt_input: t.Iterable[bytes] = None + complete_input: t.Iterable[bytes] = None + + +class FinalQueue(multiprocessing.queues.SimpleQueue): def __init__(self, *args, **kwargs): kwargs['ctx'] = multiprocessing_context - super(FinalQueue, self).__init__(*args, **kwargs) + super().__init__(*args, **kwargs) def send_callback(self, method_name, *args, **kwargs): self.put( CallbackSend(method_name, *args, **kwargs), - block=False ) def send_task_result(self, *args, **kwargs): @@ -82,13 +94,16 @@ class FinalQueue(multiprocessing.queues.Queue): tr = TaskResult(*args, **kwargs) self.put( tr, - block=False ) - def send_display(self, *args, **kwargs): + def send_display(self, method, *args, **kwargs): + self.put( + DisplaySend(method, *args, **kwargs), + ) + + def send_prompt(self, **kwargs): self.put( - DisplaySend(*args, **kwargs), - block=False + PromptSend(**kwargs), ) @@ -217,7 +232,7 @@ class TaskQueueManager: callback_name = cnames[0] else: # fallback to 'old loader name' - (callback_name, _) = os.path.splitext(os.path.basename(callback_plugin._original_path)) + (callback_name, ext) = os.path.splitext(os.path.basename(callback_plugin._original_path)) display.vvvvv("Attempting to use '%s' callback." % (callback_name)) if callback_type == 'stdout': |