summaryrefslogtreecommitdiff
path: root/lib/ansible/executor/process/worker.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/ansible/executor/process/worker.py')
-rw-r--r--lib/ansible/executor/process/worker.py54
1 files changed, 44 insertions, 10 deletions
diff --git a/lib/ansible/executor/process/worker.py b/lib/ansible/executor/process/worker.py
index 5113b83d..c043137c 100644
--- a/lib/ansible/executor/process/worker.py
+++ b/lib/ansible/executor/process/worker.py
@@ -24,10 +24,11 @@ import sys
import traceback
from jinja2.exceptions import TemplateNotFound
+from multiprocessing.queues import Queue
-from ansible.errors import AnsibleConnectionFailure
+from ansible.errors import AnsibleConnectionFailure, AnsibleError
from ansible.executor.task_executor import TaskExecutor
-from ansible.module_utils._text import to_text
+from ansible.module_utils.common.text.converters import to_text
from ansible.utils.display import Display
from ansible.utils.multiprocessing import context as multiprocessing_context
@@ -35,6 +36,17 @@ __all__ = ['WorkerProcess']
display = Display()
+current_worker = None
+
+
+class WorkerQueue(Queue):
+ """Queue that raises AnsibleError items on get()."""
+ def get(self, *args, **kwargs):
+ result = super(WorkerQueue, self).get(*args, **kwargs)
+ if isinstance(result, AnsibleError):
+ raise result
+ return result
+
class WorkerProcess(multiprocessing_context.Process): # type: ignore[name-defined]
'''
@@ -43,7 +55,7 @@ class WorkerProcess(multiprocessing_context.Process): # type: ignore[name-defin
for reading later.
'''
- def __init__(self, final_q, task_vars, host, task, play_context, loader, variable_manager, shared_loader_obj):
+ def __init__(self, final_q, task_vars, host, task, play_context, loader, variable_manager, shared_loader_obj, worker_id):
super(WorkerProcess, self).__init__()
# takes a task queue manager as the sole param:
@@ -60,6 +72,9 @@ class WorkerProcess(multiprocessing_context.Process): # type: ignore[name-defin
# clear var to ensure we only delete files for this child
self._loader._tempfiles = set()
+ self.worker_queue = WorkerQueue(ctx=multiprocessing_context)
+ self.worker_id = worker_id
+
def _save_stdin(self):
self._new_stdin = None
try:
@@ -155,6 +170,9 @@ class WorkerProcess(multiprocessing_context.Process): # type: ignore[name-defin
# Set the queue on Display so calls to Display.display are proxied over the queue
display.set_queue(self._final_q)
+ global current_worker
+ current_worker = self
+
try:
# execute the task and build a TaskResult from the result
display.debug("running TaskExecutor() for %s/%s" % (self._host, self._task))
@@ -166,7 +184,8 @@ class WorkerProcess(multiprocessing_context.Process): # type: ignore[name-defin
self._new_stdin,
self._loader,
self._shared_loader_obj,
- self._final_q
+ self._final_q,
+ self._variable_manager,
).run()
display.debug("done running TaskExecutor() for %s/%s [%s]" % (self._host, self._task, self._task._uuid))
@@ -175,12 +194,27 @@ class WorkerProcess(multiprocessing_context.Process): # type: ignore[name-defin
# put the result on the result queue
display.debug("sending task result for task %s" % self._task._uuid)
- self._final_q.send_task_result(
- self._host.name,
- self._task._uuid,
- executor_result,
- task_fields=self._task.dump_attrs(),
- )
+ try:
+ self._final_q.send_task_result(
+ self._host.name,
+ self._task._uuid,
+ executor_result,
+ task_fields=self._task.dump_attrs(),
+ )
+ except Exception as e:
+ display.debug(f'failed to send task result ({e}), sending surrogate result')
+ self._final_q.send_task_result(
+ self._host.name,
+ self._task._uuid,
+ # Overriding the task result, to represent the failure
+ {
+ 'failed': True,
+ 'msg': f'{e}',
+ 'exception': traceback.format_exc(),
+ },
+ # The failure pickling may have been caused by the task attrs, omit for safety
+ {},
+ )
display.debug("done sending task result for task %s" % self._task._uuid)
except AnsibleConnectionFailure: