summaryrefslogtreecommitdiff
path: root/lib/ansible/executor/task_queue_manager.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/ansible/executor/task_queue_manager.py')
-rw-r--r--lib/ansible/executor/task_queue_manager.py35
1 files changed, 25 insertions, 10 deletions
diff --git a/lib/ansible/executor/task_queue_manager.py b/lib/ansible/executor/task_queue_manager.py
index dcfc38a7..3bbf3d59 100644
--- a/lib/ansible/executor/task_queue_manager.py
+++ b/lib/ansible/executor/task_queue_manager.py
@@ -24,6 +24,7 @@ import sys
import tempfile
import threading
import time
+import typing as t
import multiprocessing.queues
from ansible import constants as C
@@ -33,7 +34,7 @@ from ansible.executor.play_iterator import PlayIterator
from ansible.executor.stats import AggregateStats
from ansible.executor.task_result import TaskResult
from ansible.module_utils.six import string_types
-from ansible.module_utils._text import to_text, to_native
+from ansible.module_utils.common.text.converters import to_text, to_native
from ansible.playbook.play_context import PlayContext
from ansible.playbook.task import Task
from ansible.plugins.loader import callback_loader, strategy_loader, module_loader
@@ -45,6 +46,7 @@ from ansible.utils.display import Display
from ansible.utils.lock import lock_decorator
from ansible.utils.multiprocessing import context as multiprocessing_context
+from dataclasses import dataclass
__all__ = ['TaskQueueManager']
@@ -59,20 +61,30 @@ class CallbackSend:
class DisplaySend:
- def __init__(self, *args, **kwargs):
+ def __init__(self, method, *args, **kwargs):
+ self.method = method
self.args = args
self.kwargs = kwargs
-class FinalQueue(multiprocessing.queues.Queue):
+@dataclass
+class PromptSend:
+ worker_id: int
+ prompt: str
+ private: bool = True
+ seconds: int = None
+ interrupt_input: t.Iterable[bytes] = None
+ complete_input: t.Iterable[bytes] = None
+
+
+class FinalQueue(multiprocessing.queues.SimpleQueue):
def __init__(self, *args, **kwargs):
kwargs['ctx'] = multiprocessing_context
- super(FinalQueue, self).__init__(*args, **kwargs)
+ super().__init__(*args, **kwargs)
def send_callback(self, method_name, *args, **kwargs):
self.put(
CallbackSend(method_name, *args, **kwargs),
- block=False
)
def send_task_result(self, *args, **kwargs):
@@ -82,13 +94,16 @@ class FinalQueue(multiprocessing.queues.Queue):
tr = TaskResult(*args, **kwargs)
self.put(
tr,
- block=False
)
- def send_display(self, *args, **kwargs):
+ def send_display(self, method, *args, **kwargs):
+ self.put(
+ DisplaySend(method, *args, **kwargs),
+ )
+
+ def send_prompt(self, **kwargs):
self.put(
- DisplaySend(*args, **kwargs),
- block=False
+ PromptSend(**kwargs),
)
@@ -217,7 +232,7 @@ class TaskQueueManager:
callback_name = cnames[0]
else:
# fallback to 'old loader name'
- (callback_name, _) = os.path.splitext(os.path.basename(callback_plugin._original_path))
+ (callback_name, ext) = os.path.splitext(os.path.basename(callback_plugin._original_path))
display.vvvvv("Attempting to use '%s' callback." % (callback_name))
if callback_type == 'stdout':