summaryrefslogtreecommitdiff
path: root/lib/ansible/playbook/conditional.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/ansible/playbook/conditional.py')
-rw-r--r--lib/ansible/playbook/conditional.py197
1 files changed, 152 insertions, 45 deletions
diff --git a/lib/ansible/playbook/conditional.py b/lib/ansible/playbook/conditional.py
index 449b4a9c..d994f8f4 100644
--- a/lib/ansible/playbook/conditional.py
+++ b/lib/ansible/playbook/conditional.py
@@ -19,18 +19,28 @@
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
-import typing as t
+import ast
+import re
+from jinja2.compiler import generate
+from jinja2.exceptions import UndefinedError
+
+from ansible import constants as C
from ansible.errors import AnsibleError, AnsibleUndefinedVariable, AnsibleTemplateError
-from ansible.module_utils.common.text.converters import to_native
+from ansible.module_utils.six import text_type
+from ansible.module_utils._text import to_native, to_text
from ansible.playbook.attribute import FieldAttribute
-from ansible.template import Templar
from ansible.utils.display import Display
display = Display()
+DEFINED_REGEX = re.compile(r'(hostvars\[.+\]|[\w_]+)\s+(not\s+is|is|is\s+not)\s+(defined|undefined)')
+LOOKUP_REGEX = re.compile(r'lookup\s*\(')
+VALID_VAR_REGEX = re.compile("^[_A-Za-z][_a-zA-Z0-9]*$")
+
class Conditional:
+
'''
This is a mix-in class, to be used with Base to allow the object
to be run conditionally when a condition is met or skipped.
@@ -47,69 +57,166 @@ class Conditional:
raise AnsibleError("a loader must be specified when using Conditional() directly")
else:
self._loader = loader
- super().__init__()
+ super(Conditional, self).__init__()
def _validate_when(self, attr, name, value):
if not isinstance(value, list):
setattr(self, name, [value])
- def evaluate_conditional(self, templar: Templar, all_vars: dict[str, t.Any]) -> bool:
+ def extract_defined_undefined(self, conditional):
+ results = []
+
+ cond = conditional
+ m = DEFINED_REGEX.search(cond)
+ while m:
+ results.append(m.groups())
+ cond = cond[m.end():]
+ m = DEFINED_REGEX.search(cond)
+
+ return results
+
+ def evaluate_conditional(self, templar, all_vars):
'''
Loops through the conditionals set on this object, returning
False if any of them evaluate as such.
'''
- return self.evaluate_conditional_with_result(templar, all_vars)[0]
-
- def evaluate_conditional_with_result(self, templar: Templar, all_vars: dict[str, t.Any]) -> tuple[bool, t.Optional[str]]:
- """Loops through the conditionals set on this object, returning
- False if any of them evaluate as such as well as the condition
- that was false.
- """
- for conditional in self.when:
- if conditional is None or conditional == "":
- res = True
- elif isinstance(conditional, bool):
- res = conditional
- else:
- try:
+
+ # since this is a mix-in, it may not have an underlying datastructure
+ # associated with it, so we pull it out now in case we need it for
+ # error reporting below
+ ds = None
+ if hasattr(self, '_ds'):
+ ds = getattr(self, '_ds')
+
+ result = True
+ try:
+ for conditional in self.when:
+
+ # do evaluation
+ if conditional is None or conditional == '':
+ res = True
+ elif isinstance(conditional, bool):
+ res = conditional
+ else:
res = self._check_conditional(conditional, templar, all_vars)
- except AnsibleError as e:
- raise AnsibleError(
- "The conditional check '%s' failed. The error was: %s" % (to_native(conditional), to_native(e)),
- obj=getattr(self, '_ds', None)
- )
- display.debug("Evaluated conditional (%s): %s" % (conditional, res))
- if not res:
- return res, conditional
+ # only update if still true, preserve false
+ if result:
+ result = res
- return True, None
+ display.debug("Evaluated conditional (%s): %s" % (conditional, res))
+ if not result:
+ break
+
+ except Exception as e:
+ raise AnsibleError("The conditional check '%s' failed. The error was: %s" % (to_native(conditional), to_native(e)), obj=ds)
+
+ return result
+
+ def _check_conditional(self, conditional, templar, all_vars):
+ '''
+ This method does the low-level evaluation of each conditional
+ set on this object, using jinja2 to wrap the conditionals for
+ evaluation.
+ '''
- def _check_conditional(self, conditional: str, templar: Templar, all_vars: dict[str, t.Any]) -> bool:
original = conditional
+
+ if templar.is_template(conditional):
+ display.warning('conditional statements should not include jinja2 '
+ 'templating delimiters such as {{ }} or {%% %%}. '
+ 'Found: %s' % conditional)
+
+ # make sure the templar is using the variables specified with this method
templar.available_variables = all_vars
+
try:
- if templar.is_template(conditional):
- display.warning(
- "conditional statements should not include jinja2 "
- "templating delimiters such as {{ }} or {%% %%}. "
- "Found: %s" % conditional
- )
- conditional = templar.template(conditional)
- if isinstance(conditional, bool):
- return conditional
- elif conditional == "":
- return False
+ # if the conditional is "unsafe", disable lookups
+ disable_lookups = hasattr(conditional, '__UNSAFE__')
+ conditional = templar.template(conditional, disable_lookups=disable_lookups)
+
+ if not isinstance(conditional, text_type) or conditional == "":
+ return conditional
# If the result of the first-pass template render (to resolve inline templates) is marked unsafe,
# explicitly fail since the next templating operation would never evaluate
if hasattr(conditional, '__UNSAFE__'):
raise AnsibleTemplateError('Conditional is marked as unsafe, and cannot be evaluated.')
+ # First, we do some low-level jinja2 parsing involving the AST format of the
+ # statement to ensure we don't do anything unsafe (using the disable_lookup flag above)
+ class CleansingNodeVisitor(ast.NodeVisitor):
+ def generic_visit(self, node, inside_call=False, inside_yield=False):
+ if isinstance(node, ast.Call):
+ inside_call = True
+ elif isinstance(node, ast.Yield):
+ inside_yield = True
+ elif isinstance(node, ast.Str):
+ if disable_lookups:
+ if inside_call and node.s.startswith("__"):
+ # calling things with a dunder is generally bad at this point...
+ raise AnsibleError(
+ "Invalid access found in the conditional: '%s'" % conditional
+ )
+ elif inside_yield:
+ # we're inside a yield, so recursively parse and traverse the AST
+ # of the result to catch forbidden syntax from executing
+ parsed = ast.parse(node.s, mode='exec')
+ cnv = CleansingNodeVisitor()
+ cnv.visit(parsed)
+ # iterate over all child nodes
+ for child_node in ast.iter_child_nodes(node):
+ self.generic_visit(
+ child_node,
+ inside_call=inside_call,
+ inside_yield=inside_yield
+ )
+ try:
+ res = templar.environment.parse(conditional, None, None)
+ res = generate(res, templar.environment, None, None)
+ parsed = ast.parse(res, mode='exec')
+
+ cnv = CleansingNodeVisitor()
+ cnv.visit(parsed)
+ except Exception as e:
+ raise AnsibleError("Invalid conditional detected: %s" % to_native(e))
+
+ # and finally we generate and template the presented string and look at the resulting string
# NOTE The spaces around True and False are intentional to short-circuit literal_eval for
# jinja2_native=False and avoid its expensive calls.
- return templar.template(
- "{%% if %s %%} True {%% else %%} False {%% endif %%}" % conditional,
- ).strip() == "True"
- except AnsibleUndefinedVariable as e:
- raise AnsibleUndefinedVariable("error while evaluating conditional (%s): %s" % (original, e))
+ presented = "{%% if %s %%} True {%% else %%} False {%% endif %%}" % conditional
+ val = templar.template(presented, disable_lookups=disable_lookups).strip()
+ if val == "True":
+ return True
+ elif val == "False":
+ return False
+ else:
+ raise AnsibleError("unable to evaluate conditional: %s" % original)
+ except (AnsibleUndefinedVariable, UndefinedError) as e:
+ # the templating failed, meaning most likely a variable was undefined. If we happened
+ # to be looking for an undefined variable, return True, otherwise fail
+ try:
+ # first we extract the variable name from the error message
+ var_name = re.compile(r"'(hostvars\[.+\]|[\w_]+)' is undefined").search(str(e)).groups()[0]
+ # next we extract all defined/undefined tests from the conditional string
+ def_undef = self.extract_defined_undefined(conditional)
+ # then we loop through these, comparing the error variable name against
+ # each def/undef test we found above. If there is a match, we determine
+ # whether the logic/state mean the variable should exist or not and return
+ # the corresponding True/False
+ for (du_var, logic, state) in def_undef:
+ # when we compare the var names, normalize quotes because something
+ # like hostvars['foo'] may be tested against hostvars["foo"]
+ if var_name.replace("'", '"') == du_var.replace("'", '"'):
+ # the should exist is a xor test between a negation in the logic portion
+ # against the state (defined or undefined)
+ should_exist = ('not' in logic) != (state == 'defined')
+ if should_exist:
+ return False
+ else:
+ return True
+ # as nothing above matched the failed var name, re-raise here to
+ # trigger the AnsibleUndefinedVariable exception again below
+ raise
+ except Exception:
+ raise AnsibleUndefinedVariable("error while evaluating conditional (%s): %s" % (original, e))