diff options
Diffstat (limited to 'lib/ansible/executor/process/worker.py')
-rw-r--r-- | lib/ansible/executor/process/worker.py | 54 |
1 files changed, 44 insertions, 10 deletions
diff --git a/lib/ansible/executor/process/worker.py b/lib/ansible/executor/process/worker.py index 5113b83d..c043137c 100644 --- a/lib/ansible/executor/process/worker.py +++ b/lib/ansible/executor/process/worker.py @@ -24,10 +24,11 @@ import sys import traceback from jinja2.exceptions import TemplateNotFound +from multiprocessing.queues import Queue -from ansible.errors import AnsibleConnectionFailure +from ansible.errors import AnsibleConnectionFailure, AnsibleError from ansible.executor.task_executor import TaskExecutor -from ansible.module_utils._text import to_text +from ansible.module_utils.common.text.converters import to_text from ansible.utils.display import Display from ansible.utils.multiprocessing import context as multiprocessing_context @@ -35,6 +36,17 @@ __all__ = ['WorkerProcess'] display = Display() +current_worker = None + + +class WorkerQueue(Queue): + """Queue that raises AnsibleError items on get().""" + def get(self, *args, **kwargs): + result = super(WorkerQueue, self).get(*args, **kwargs) + if isinstance(result, AnsibleError): + raise result + return result + class WorkerProcess(multiprocessing_context.Process): # type: ignore[name-defined] ''' @@ -43,7 +55,7 @@ class WorkerProcess(multiprocessing_context.Process): # type: ignore[name-defin for reading later. ''' - def __init__(self, final_q, task_vars, host, task, play_context, loader, variable_manager, shared_loader_obj): + def __init__(self, final_q, task_vars, host, task, play_context, loader, variable_manager, shared_loader_obj, worker_id): super(WorkerProcess, self).__init__() # takes a task queue manager as the sole param: @@ -60,6 +72,9 @@ class WorkerProcess(multiprocessing_context.Process): # type: ignore[name-defin # clear var to ensure we only delete files for this child self._loader._tempfiles = set() + self.worker_queue = WorkerQueue(ctx=multiprocessing_context) + self.worker_id = worker_id + def _save_stdin(self): self._new_stdin = None try: @@ -155,6 +170,9 @@ class WorkerProcess(multiprocessing_context.Process): # type: ignore[name-defin # Set the queue on Display so calls to Display.display are proxied over the queue display.set_queue(self._final_q) + global current_worker + current_worker = self + try: # execute the task and build a TaskResult from the result display.debug("running TaskExecutor() for %s/%s" % (self._host, self._task)) @@ -166,7 +184,8 @@ class WorkerProcess(multiprocessing_context.Process): # type: ignore[name-defin self._new_stdin, self._loader, self._shared_loader_obj, - self._final_q + self._final_q, + self._variable_manager, ).run() display.debug("done running TaskExecutor() for %s/%s [%s]" % (self._host, self._task, self._task._uuid)) @@ -175,12 +194,27 @@ class WorkerProcess(multiprocessing_context.Process): # type: ignore[name-defin # put the result on the result queue display.debug("sending task result for task %s" % self._task._uuid) - self._final_q.send_task_result( - self._host.name, - self._task._uuid, - executor_result, - task_fields=self._task.dump_attrs(), - ) + try: + self._final_q.send_task_result( + self._host.name, + self._task._uuid, + executor_result, + task_fields=self._task.dump_attrs(), + ) + except Exception as e: + display.debug(f'failed to send task result ({e}), sending surrogate result') + self._final_q.send_task_result( + self._host.name, + self._task._uuid, + # Overriding the task result, to represent the failure + { + 'failed': True, + 'msg': f'{e}', + 'exception': traceback.format_exc(), + }, + # The failure pickling may have been caused by the task attrs, omit for safety + {}, + ) display.debug("done sending task result for task %s" % self._task._uuid) except AnsibleConnectionFailure: |