summaryrefslogtreecommitdiff
path: root/lib/ansible/executor/task_executor.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/ansible/executor/task_executor.py')
-rw-r--r--lib/ansible/executor/task_executor.py126
1 files changed, 74 insertions, 52 deletions
diff --git a/lib/ansible/executor/task_executor.py b/lib/ansible/executor/task_executor.py
index 02ace8f5..0e7394f6 100644
--- a/lib/ansible/executor/task_executor.py
+++ b/lib/ansible/executor/task_executor.py
@@ -20,14 +20,14 @@ from ansible.executor.task_result import TaskResult
from ansible.executor.module_common import get_action_args_with_defaults
from ansible.module_utils.parsing.convert_bool import boolean
from ansible.module_utils.six import binary_type
-from ansible.module_utils._text import to_text, to_native
+from ansible.module_utils.common.text.converters import to_text, to_native
from ansible.module_utils.connection import write_to_file_descriptor
from ansible.playbook.conditional import Conditional
from ansible.playbook.task import Task
from ansible.plugins import get_plugin_class
from ansible.plugins.loader import become_loader, cliconf_loader, connection_loader, httpapi_loader, netconf_loader, terminal_loader
from ansible.template import Templar
-from ansible.utils.collection_loader import AnsibleCollectionConfig, AnsibleCollectionRef
+from ansible.utils.collection_loader import AnsibleCollectionConfig
from ansible.utils.listify import listify_lookup_plugin_terms
from ansible.utils.unsafe_proxy import to_unsafe_text, wrap_var
from ansible.vars.clean import namespace_facts, clean_facts
@@ -82,7 +82,7 @@ class TaskExecutor:
class.
'''
- def __init__(self, host, task, job_vars, play_context, new_stdin, loader, shared_loader_obj, final_q):
+ def __init__(self, host, task, job_vars, play_context, new_stdin, loader, shared_loader_obj, final_q, variable_manager):
self._host = host
self._task = task
self._job_vars = job_vars
@@ -92,6 +92,7 @@ class TaskExecutor:
self._shared_loader_obj = shared_loader_obj
self._connection = None
self._final_q = final_q
+ self._variable_manager = variable_manager
self._loop_eval_error = None
self._task.squash()
@@ -136,6 +137,12 @@ class TaskExecutor:
self._task.ignore_errors = item_ignore
elif self._task.ignore_errors and not item_ignore:
self._task.ignore_errors = item_ignore
+ if 'unreachable' in item and item['unreachable']:
+ item_ignore_unreachable = item.pop('_ansible_ignore_unreachable')
+ if not res.get('unreachable'):
+ self._task.ignore_unreachable = item_ignore_unreachable
+ elif self._task.ignore_unreachable and not item_ignore_unreachable:
+ self._task.ignore_unreachable = item_ignore_unreachable
# ensure to accumulate these
for array in ['warnings', 'deprecations']:
@@ -215,21 +222,13 @@ class TaskExecutor:
templar = Templar(loader=self._loader, variables=self._job_vars)
items = None
- loop_cache = self._job_vars.get('_ansible_loop_cache')
- if loop_cache is not None:
- # _ansible_loop_cache may be set in `get_vars` when calculating `delegate_to`
- # to avoid reprocessing the loop
- items = loop_cache
- elif self._task.loop_with:
+ if self._task.loop_with:
if self._task.loop_with in self._shared_loader_obj.lookup_loader:
- fail = True
- if self._task.loop_with == 'first_found':
- # first_found loops are special. If the item is undefined then we want to fall through to the next value rather than failing.
- fail = False
+ # TODO: hardcoded so it fails for non first_found lookups, but thhis shoudl be generalized for those that don't do their own templating
+ # lookup prop/attribute?
+ fail = bool(self._task.loop_with != 'first_found')
loop_terms = listify_lookup_plugin_terms(terms=self._task.loop, templar=templar, fail_on_undefined=fail, convert_bare=False)
- if not fail:
- loop_terms = [t for t in loop_terms if not templar.is_template(t)]
# get lookup
mylookup = self._shared_loader_obj.lookup_loader.get(self._task.loop_with, loader=self._loader, templar=templar)
@@ -281,6 +280,7 @@ class TaskExecutor:
u" to something else to avoid variable collisions and unexpected behavior." % (self._task, loop_var))
ran_once = False
+ task_fields = None
no_log = False
items_len = len(items)
results = []
@@ -352,6 +352,7 @@ class TaskExecutor:
res['_ansible_item_result'] = True
res['_ansible_ignore_errors'] = task_fields.get('ignore_errors')
+ res['_ansible_ignore_unreachable'] = task_fields.get('ignore_unreachable')
# gets templated here unlike rest of loop_control fields, depends on loop_var above
try:
@@ -396,9 +397,25 @@ class TaskExecutor:
del task_vars[var]
self._task.no_log = no_log
+ # NOTE: run_once cannot contain loop vars because it's templated earlier also
+ # This is saving the post-validated field from the last loop so the strategy can use the templated value post task execution
+ self._task.run_once = task_fields.get('run_once')
+ self._task.action = task_fields.get('action')
return results
+ def _calculate_delegate_to(self, templar, variables):
+ """This method is responsible for effectively pre-validating Task.delegate_to and will
+ happen before Task.post_validate is executed
+ """
+ delegated_vars, delegated_host_name = self._variable_manager.get_delegated_vars_and_hostname(templar, self._task, variables)
+ # At the point this is executed it is safe to mutate self._task,
+ # since `self._task` is either a copy referred to by `tmp_task` in `_run_loop`
+ # or just a singular non-looped task
+ if delegated_host_name:
+ self._task.delegate_to = delegated_host_name
+ variables.update(delegated_vars)
+
def _execute(self, variables=None):
'''
The primary workhorse of the executor system, this runs the task
@@ -411,6 +428,8 @@ class TaskExecutor:
templar = Templar(loader=self._loader, variables=variables)
+ self._calculate_delegate_to(templar, variables)
+
context_validation_error = None
# a certain subset of variables exist.
@@ -450,9 +469,11 @@ class TaskExecutor:
# the fact that the conditional may specify that the task be skipped due to a
# variable not being present which would otherwise cause validation to fail
try:
- if not self._task.evaluate_conditional(templar, tempvars):
+ conditional_result, false_condition = self._task.evaluate_conditional_with_result(templar, tempvars)
+ if not conditional_result:
display.debug("when evaluation is False, skipping this task")
- return dict(changed=False, skipped=True, skip_reason='Conditional result was False', _ansible_no_log=no_log)
+ return dict(changed=False, skipped=True, skip_reason='Conditional result was False',
+ false_condition=false_condition, _ansible_no_log=no_log)
except AnsibleError as e:
# loop error takes precedence
if self._loop_eval_error is not None:
@@ -486,7 +507,7 @@ class TaskExecutor:
# if this task is a TaskInclude, we just return now with a success code so the
# main thread can expand the task list for the given host
- if self._task.action in C._ACTION_ALL_INCLUDE_TASKS:
+ if self._task.action in C._ACTION_INCLUDE_TASKS:
include_args = self._task.args.copy()
include_file = include_args.pop('_raw_params', None)
if not include_file:
@@ -570,25 +591,14 @@ class TaskExecutor:
# feed back into pc to ensure plugins not using get_option can get correct value
self._connection._play_context = self._play_context.set_task_and_variable_override(task=self._task, variables=vars_copy, templar=templar)
- # for persistent connections, initialize socket path and start connection manager
- if any(((self._connection.supports_persistence and C.USE_PERSISTENT_CONNECTIONS), self._connection.force_persistence)):
- self._play_context.timeout = self._connection.get_option('persistent_command_timeout')
- display.vvvv('attempting to start connection', host=self._play_context.remote_addr)
- display.vvvv('using connection plugin %s' % self._connection.transport, host=self._play_context.remote_addr)
-
- options = self._connection.get_options()
- socket_path = start_connection(self._play_context, options, self._task._uuid)
- display.vvvv('local domain socket path is %s' % socket_path, host=self._play_context.remote_addr)
- setattr(self._connection, '_socket_path', socket_path)
-
- # TODO: eventually remove this block as this should be a 'consequence' of 'forced_local' modules
+ # TODO: eventually remove this block as this should be a 'consequence' of 'forced_local' modules, right now rely on remote_is_local connection
# special handling for python interpreter for network_os, default to ansible python unless overridden
- if 'ansible_network_os' in cvars and 'ansible_python_interpreter' not in cvars:
+ if 'ansible_python_interpreter' not in cvars and 'ansible_network_os' in cvars and getattr(self._connection, '_remote_is_local', False):
# this also avoids 'python discovery'
cvars['ansible_python_interpreter'] = sys.executable
# get handler
- self._handler, module_context = self._get_action_handler_with_module_context(connection=self._connection, templar=templar)
+ self._handler, module_context = self._get_action_handler_with_module_context(templar=templar)
if module_context is not None:
module_defaults_fqcn = module_context.resolved_fqcn
@@ -606,17 +616,11 @@ class TaskExecutor:
if omit_token is not None:
self._task.args = remove_omit(self._task.args, omit_token)
- # Read some values from the task, so that we can modify them if need be
- if self._task.until:
- retries = self._task.retries
- if retries is None:
- retries = 3
- elif retries <= 0:
- retries = 1
- else:
- retries += 1
- else:
- retries = 1
+ retries = 1 # includes the default actual run + retries set by user/default
+ if self._task.retries is not None:
+ retries += max(0, self._task.retries)
+ elif self._task.until:
+ retries += 3 # the default is not set in FA because we need to differentiate "unset" value
delay = self._task.delay
if delay < 0:
@@ -722,7 +726,7 @@ class TaskExecutor:
result['failed'] = False
# Make attempts and retries available early to allow their use in changed/failed_when
- if self._task.until:
+ if retries > 1:
result['attempts'] = attempt
# set the changed property if it was missing.
@@ -754,7 +758,7 @@ class TaskExecutor:
if retries > 1:
cond = Conditional(loader=self._loader)
- cond.when = self._task.until
+ cond.when = self._task.until or [not result['failed']]
if cond.evaluate_conditional(templar, vars_copy):
break
else:
@@ -773,7 +777,7 @@ class TaskExecutor:
)
)
time.sleep(delay)
- self._handler = self._get_action_handler(connection=self._connection, templar=templar)
+ self._handler = self._get_action_handler(templar=templar)
else:
if retries > 1:
# we ran out of attempts, so mark the result as failed
@@ -1091,13 +1095,13 @@ class TaskExecutor:
return varnames
- def _get_action_handler(self, connection, templar):
+ def _get_action_handler(self, templar):
'''
Returns the correct action plugin to handle the requestion task action
'''
- return self._get_action_handler_with_module_context(connection, templar)[0]
+ return self._get_action_handler_with_module_context(templar)[0]
- def _get_action_handler_with_module_context(self, connection, templar):
+ def _get_action_handler_with_module_context(self, templar):
'''
Returns the correct action plugin to handle the requestion task action and the module context
'''
@@ -1134,10 +1138,29 @@ class TaskExecutor:
handler_name = 'ansible.legacy.normal'
collections = None # until then, we don't want the task's collection list to be consulted; use the builtin
+ # networking/psersistent connections handling
+ if any(((self._connection.supports_persistence and C.USE_PERSISTENT_CONNECTIONS), self._connection.force_persistence)):
+
+ # check handler in case we dont need to do all the work to setup persistent connection
+ handler_class = self._shared_loader_obj.action_loader.get(handler_name, class_only=True)
+ if getattr(handler_class, '_requires_connection', True):
+ # for persistent connections, initialize socket path and start connection manager
+ self._play_context.timeout = self._connection.get_option('persistent_command_timeout')
+ display.vvvv('attempting to start connection', host=self._play_context.remote_addr)
+ display.vvvv('using connection plugin %s' % self._connection.transport, host=self._play_context.remote_addr)
+
+ options = self._connection.get_options()
+ socket_path = start_connection(self._play_context, options, self._task._uuid)
+ display.vvvv('local domain socket path is %s' % socket_path, host=self._play_context.remote_addr)
+ setattr(self._connection, '_socket_path', socket_path)
+ else:
+ # TODO: set self._connection to dummy/noop connection, using local for now
+ self._connection = self._get_connection({}, templar, 'local')
+
handler = self._shared_loader_obj.action_loader.get(
handler_name,
task=self._task,
- connection=connection,
+ connection=self._connection,
play_context=self._play_context,
loader=self._loader,
templar=templar,
@@ -1213,8 +1236,7 @@ def start_connection(play_context, options, task_uuid):
else:
try:
result = json.loads(to_text(stderr, errors='surrogate_then_replace'))
- except getattr(json.decoder, 'JSONDecodeError', ValueError):
- # JSONDecodeError only available on Python 3.5+
+ except json.decoder.JSONDecodeError:
result = {'error': to_text(stderr, errors='surrogate_then_replace')}
if 'messages' in result: