diff options
Diffstat (limited to 'lib/ansible/playbook/conditional.py')
-rw-r--r-- | lib/ansible/playbook/conditional.py | 197 |
1 files changed, 152 insertions, 45 deletions
diff --git a/lib/ansible/playbook/conditional.py b/lib/ansible/playbook/conditional.py index 449b4a9c..d994f8f4 100644 --- a/lib/ansible/playbook/conditional.py +++ b/lib/ansible/playbook/conditional.py @@ -19,18 +19,28 @@ from __future__ import (absolute_import, division, print_function) __metaclass__ = type -import typing as t +import ast +import re +from jinja2.compiler import generate +from jinja2.exceptions import UndefinedError + +from ansible import constants as C from ansible.errors import AnsibleError, AnsibleUndefinedVariable, AnsibleTemplateError -from ansible.module_utils.common.text.converters import to_native +from ansible.module_utils.six import text_type +from ansible.module_utils._text import to_native, to_text from ansible.playbook.attribute import FieldAttribute -from ansible.template import Templar from ansible.utils.display import Display display = Display() +DEFINED_REGEX = re.compile(r'(hostvars\[.+\]|[\w_]+)\s+(not\s+is|is|is\s+not)\s+(defined|undefined)') +LOOKUP_REGEX = re.compile(r'lookup\s*\(') +VALID_VAR_REGEX = re.compile("^[_A-Za-z][_a-zA-Z0-9]*$") + class Conditional: + ''' This is a mix-in class, to be used with Base to allow the object to be run conditionally when a condition is met or skipped. @@ -47,69 +57,166 @@ class Conditional: raise AnsibleError("a loader must be specified when using Conditional() directly") else: self._loader = loader - super().__init__() + super(Conditional, self).__init__() def _validate_when(self, attr, name, value): if not isinstance(value, list): setattr(self, name, [value]) - def evaluate_conditional(self, templar: Templar, all_vars: dict[str, t.Any]) -> bool: + def extract_defined_undefined(self, conditional): + results = [] + + cond = conditional + m = DEFINED_REGEX.search(cond) + while m: + results.append(m.groups()) + cond = cond[m.end():] + m = DEFINED_REGEX.search(cond) + + return results + + def evaluate_conditional(self, templar, all_vars): ''' Loops through the conditionals set on this object, returning False if any of them evaluate as such. ''' - return self.evaluate_conditional_with_result(templar, all_vars)[0] - - def evaluate_conditional_with_result(self, templar: Templar, all_vars: dict[str, t.Any]) -> tuple[bool, t.Optional[str]]: - """Loops through the conditionals set on this object, returning - False if any of them evaluate as such as well as the condition - that was false. - """ - for conditional in self.when: - if conditional is None or conditional == "": - res = True - elif isinstance(conditional, bool): - res = conditional - else: - try: + + # since this is a mix-in, it may not have an underlying datastructure + # associated with it, so we pull it out now in case we need it for + # error reporting below + ds = None + if hasattr(self, '_ds'): + ds = getattr(self, '_ds') + + result = True + try: + for conditional in self.when: + + # do evaluation + if conditional is None or conditional == '': + res = True + elif isinstance(conditional, bool): + res = conditional + else: res = self._check_conditional(conditional, templar, all_vars) - except AnsibleError as e: - raise AnsibleError( - "The conditional check '%s' failed. The error was: %s" % (to_native(conditional), to_native(e)), - obj=getattr(self, '_ds', None) - ) - display.debug("Evaluated conditional (%s): %s" % (conditional, res)) - if not res: - return res, conditional + # only update if still true, preserve false + if result: + result = res - return True, None + display.debug("Evaluated conditional (%s): %s" % (conditional, res)) + if not result: + break + + except Exception as e: + raise AnsibleError("The conditional check '%s' failed. The error was: %s" % (to_native(conditional), to_native(e)), obj=ds) + + return result + + def _check_conditional(self, conditional, templar, all_vars): + ''' + This method does the low-level evaluation of each conditional + set on this object, using jinja2 to wrap the conditionals for + evaluation. + ''' - def _check_conditional(self, conditional: str, templar: Templar, all_vars: dict[str, t.Any]) -> bool: original = conditional + + if templar.is_template(conditional): + display.warning('conditional statements should not include jinja2 ' + 'templating delimiters such as {{ }} or {%% %%}. ' + 'Found: %s' % conditional) + + # make sure the templar is using the variables specified with this method templar.available_variables = all_vars + try: - if templar.is_template(conditional): - display.warning( - "conditional statements should not include jinja2 " - "templating delimiters such as {{ }} or {%% %%}. " - "Found: %s" % conditional - ) - conditional = templar.template(conditional) - if isinstance(conditional, bool): - return conditional - elif conditional == "": - return False + # if the conditional is "unsafe", disable lookups + disable_lookups = hasattr(conditional, '__UNSAFE__') + conditional = templar.template(conditional, disable_lookups=disable_lookups) + + if not isinstance(conditional, text_type) or conditional == "": + return conditional # If the result of the first-pass template render (to resolve inline templates) is marked unsafe, # explicitly fail since the next templating operation would never evaluate if hasattr(conditional, '__UNSAFE__'): raise AnsibleTemplateError('Conditional is marked as unsafe, and cannot be evaluated.') + # First, we do some low-level jinja2 parsing involving the AST format of the + # statement to ensure we don't do anything unsafe (using the disable_lookup flag above) + class CleansingNodeVisitor(ast.NodeVisitor): + def generic_visit(self, node, inside_call=False, inside_yield=False): + if isinstance(node, ast.Call): + inside_call = True + elif isinstance(node, ast.Yield): + inside_yield = True + elif isinstance(node, ast.Str): + if disable_lookups: + if inside_call and node.s.startswith("__"): + # calling things with a dunder is generally bad at this point... + raise AnsibleError( + "Invalid access found in the conditional: '%s'" % conditional + ) + elif inside_yield: + # we're inside a yield, so recursively parse and traverse the AST + # of the result to catch forbidden syntax from executing + parsed = ast.parse(node.s, mode='exec') + cnv = CleansingNodeVisitor() + cnv.visit(parsed) + # iterate over all child nodes + for child_node in ast.iter_child_nodes(node): + self.generic_visit( + child_node, + inside_call=inside_call, + inside_yield=inside_yield + ) + try: + res = templar.environment.parse(conditional, None, None) + res = generate(res, templar.environment, None, None) + parsed = ast.parse(res, mode='exec') + + cnv = CleansingNodeVisitor() + cnv.visit(parsed) + except Exception as e: + raise AnsibleError("Invalid conditional detected: %s" % to_native(e)) + + # and finally we generate and template the presented string and look at the resulting string # NOTE The spaces around True and False are intentional to short-circuit literal_eval for # jinja2_native=False and avoid its expensive calls. - return templar.template( - "{%% if %s %%} True {%% else %%} False {%% endif %%}" % conditional, - ).strip() == "True" - except AnsibleUndefinedVariable as e: - raise AnsibleUndefinedVariable("error while evaluating conditional (%s): %s" % (original, e)) + presented = "{%% if %s %%} True {%% else %%} False {%% endif %%}" % conditional + val = templar.template(presented, disable_lookups=disable_lookups).strip() + if val == "True": + return True + elif val == "False": + return False + else: + raise AnsibleError("unable to evaluate conditional: %s" % original) + except (AnsibleUndefinedVariable, UndefinedError) as e: + # the templating failed, meaning most likely a variable was undefined. If we happened + # to be looking for an undefined variable, return True, otherwise fail + try: + # first we extract the variable name from the error message + var_name = re.compile(r"'(hostvars\[.+\]|[\w_]+)' is undefined").search(str(e)).groups()[0] + # next we extract all defined/undefined tests from the conditional string + def_undef = self.extract_defined_undefined(conditional) + # then we loop through these, comparing the error variable name against + # each def/undef test we found above. If there is a match, we determine + # whether the logic/state mean the variable should exist or not and return + # the corresponding True/False + for (du_var, logic, state) in def_undef: + # when we compare the var names, normalize quotes because something + # like hostvars['foo'] may be tested against hostvars["foo"] + if var_name.replace("'", '"') == du_var.replace("'", '"'): + # the should exist is a xor test between a negation in the logic portion + # against the state (defined or undefined) + should_exist = ('not' in logic) != (state == 'defined') + if should_exist: + return False + else: + return True + # as nothing above matched the failed var name, re-raise here to + # trigger the AnsibleUndefinedVariable exception again below + raise + except Exception: + raise AnsibleUndefinedVariable("error while evaluating conditional (%s): %s" % (original, e)) |