summaryrefslogtreecommitdiff
path: root/lib/ansible/parsing/dataloader.py
diff options
context:
space:
mode:
Diffstat (limited to 'lib/ansible/parsing/dataloader.py')
-rw-r--r--lib/ansible/parsing/dataloader.py54
1 files changed, 27 insertions, 27 deletions
diff --git a/lib/ansible/parsing/dataloader.py b/lib/ansible/parsing/dataloader.py
index 13a57e41..cbba9668 100644
--- a/lib/ansible/parsing/dataloader.py
+++ b/lib/ansible/parsing/dataloader.py
@@ -11,16 +11,15 @@ import os
import os.path
import re
import tempfile
-import typing as t
from ansible import constants as C
from ansible.errors import AnsibleFileNotFound, AnsibleParserError
from ansible.module_utils.basic import is_executable
from ansible.module_utils.six import binary_type, text_type
-from ansible.module_utils.common.text.converters import to_bytes, to_native, to_text
+from ansible.module_utils._text import to_bytes, to_native, to_text
from ansible.parsing.quoting import unquote
from ansible.parsing.utils.yaml import from_yaml
-from ansible.parsing.vault import VaultLib, b_HEADER, is_encrypted, is_encrypted_file, parse_vaulttext_envelope, PromptVaultSecret
+from ansible.parsing.vault import VaultLib, b_HEADER, is_encrypted, is_encrypted_file, parse_vaulttext_envelope
from ansible.utils.path import unfrackpath
from ansible.utils.display import Display
@@ -46,7 +45,7 @@ class DataLoader:
Usage:
dl = DataLoader()
- # optionally: dl.set_vault_secrets([('default', ansible.parsing.vault.PrompVaultSecret(...),)])
+ # optionally: dl.set_vault_password('foo')
ds = dl.load('...')
ds = dl.load_from_file('/path/to/file')
'''
@@ -67,19 +66,20 @@ class DataLoader:
# initialize the vault stuff with an empty password
# TODO: replace with a ref to something that can get the password
# a creds/auth provider
+ # self.set_vault_password(None)
self._vaults = {}
self._vault = VaultLib()
self.set_vault_secrets(None)
# TODO: since we can query vault_secrets late, we could provide this to DataLoader init
- def set_vault_secrets(self, vault_secrets: list[tuple[str, PromptVaultSecret]] | None) -> None:
+ def set_vault_secrets(self, vault_secrets):
self._vault.secrets = vault_secrets
- def load(self, data: str, file_name: str = '<string>', show_content: bool = True, json_only: bool = False) -> t.Any:
+ def load(self, data, file_name='<string>', show_content=True, json_only=False):
'''Backwards compat for now'''
return from_yaml(data, file_name, show_content, self._vault.secrets, json_only=json_only)
- def load_from_file(self, file_name: str, cache: bool = True, unsafe: bool = False, json_only: bool = False) -> t.Any:
+ def load_from_file(self, file_name, cache=True, unsafe=False, json_only=False):
''' Loads data from a file, which can contain either JSON or YAML. '''
file_name = self.path_dwim(file_name)
@@ -105,28 +105,28 @@ class DataLoader:
# return a deep copy here, so the cache is not affected
return copy.deepcopy(parsed_data)
- def path_exists(self, path: str) -> bool:
+ def path_exists(self, path):
path = self.path_dwim(path)
return os.path.exists(to_bytes(path, errors='surrogate_or_strict'))
- def is_file(self, path: str) -> bool:
+ def is_file(self, path):
path = self.path_dwim(path)
return os.path.isfile(to_bytes(path, errors='surrogate_or_strict')) or path == os.devnull
- def is_directory(self, path: str) -> bool:
+ def is_directory(self, path):
path = self.path_dwim(path)
return os.path.isdir(to_bytes(path, errors='surrogate_or_strict'))
- def list_directory(self, path: str) -> list[str]:
+ def list_directory(self, path):
path = self.path_dwim(path)
return os.listdir(path)
- def is_executable(self, path: str) -> bool:
+ def is_executable(self, path):
'''is the given path executable?'''
path = self.path_dwim(path)
return is_executable(path)
- def _decrypt_if_vault_data(self, b_vault_data: bytes, b_file_name: bytes | None = None) -> tuple[bytes, bool]:
+ def _decrypt_if_vault_data(self, b_vault_data, b_file_name=None):
'''Decrypt b_vault_data if encrypted and return b_data and the show_content flag'''
if not is_encrypted(b_vault_data):
@@ -139,7 +139,7 @@ class DataLoader:
show_content = False
return b_data, show_content
- def _get_file_contents(self, file_name: str) -> tuple[bytes, bool]:
+ def _get_file_contents(self, file_name):
'''
Reads the file contents from the given file name
@@ -168,17 +168,17 @@ class DataLoader:
except (IOError, OSError) as e:
raise AnsibleParserError("an error occurred while trying to read the file '%s': %s" % (file_name, to_native(e)), orig_exc=e)
- def get_basedir(self) -> str:
+ def get_basedir(self):
''' returns the current basedir '''
return self._basedir
- def set_basedir(self, basedir: str) -> None:
+ def set_basedir(self, basedir):
''' sets the base directory, used to find files when a relative path is given '''
if basedir is not None:
self._basedir = to_text(basedir)
- def path_dwim(self, given: str) -> str:
+ def path_dwim(self, given):
'''
make relative paths work like folks expect.
'''
@@ -194,7 +194,7 @@ class DataLoader:
return unfrackpath(path, follow=False)
- def _is_role(self, path: str) -> bool:
+ def _is_role(self, path):
''' imperfect role detection, roles are still valid w/o tasks|meta/main.yml|yaml|etc '''
b_path = to_bytes(path, errors='surrogate_or_strict')
@@ -228,7 +228,7 @@ class DataLoader:
return False
- def path_dwim_relative(self, path: str, dirname: str, source: str, is_role: bool = False) -> str:
+ def path_dwim_relative(self, path, dirname, source, is_role=False):
'''
find one file in either a role or playbook dir with or without
explicitly named dirname subdirs
@@ -283,7 +283,7 @@ class DataLoader:
return candidate
- def path_dwim_relative_stack(self, paths: list[str], dirname: str, source: str, is_role: bool = False) -> str:
+ def path_dwim_relative_stack(self, paths, dirname, source, is_role=False):
'''
find one file in first path in stack taking roles into account and adding play basedir as fallback
@@ -342,7 +342,7 @@ class DataLoader:
return result
- def _create_content_tempfile(self, content: str | bytes) -> str:
+ def _create_content_tempfile(self, content):
''' Create a tempfile containing defined content '''
fd, content_tempfile = tempfile.mkstemp(dir=C.DEFAULT_LOCAL_TMP)
f = os.fdopen(fd, 'wb')
@@ -356,7 +356,7 @@ class DataLoader:
f.close()
return content_tempfile
- def get_real_file(self, file_path: str, decrypt: bool = True) -> str:
+ def get_real_file(self, file_path, decrypt=True):
"""
If the file is vault encrypted return a path to a temporary decrypted file
If the file is not encrypted then the path is returned
@@ -396,7 +396,7 @@ class DataLoader:
except (IOError, OSError) as e:
raise AnsibleParserError("an error occurred while trying to read the file '%s': %s" % (to_native(real_path), to_native(e)), orig_exc=e)
- def cleanup_tmp_file(self, file_path: str) -> None:
+ def cleanup_tmp_file(self, file_path):
"""
Removes any temporary files created from a previous call to
get_real_file. file_path must be the path returned from a
@@ -406,7 +406,7 @@ class DataLoader:
os.unlink(file_path)
self._tempfiles.remove(file_path)
- def cleanup_all_tmp_files(self) -> None:
+ def cleanup_all_tmp_files(self):
"""
Removes all temporary files that DataLoader has created
NOTE: not thread safe, forks also need special handling see __init__ for details.
@@ -417,7 +417,7 @@ class DataLoader:
except Exception as e:
display.warning("Unable to cleanup temp files: %s" % to_text(e))
- def find_vars_files(self, path: str, name: str, extensions: list[str] | None = None, allow_dir: bool = True) -> list[str]:
+ def find_vars_files(self, path, name, extensions=None, allow_dir=True):
"""
Find vars files in a given path with specified name. This will find
files in a dir named <name>/ or a file called <name> ending in known
@@ -447,11 +447,11 @@ class DataLoader:
else:
continue
else:
- found.append(to_text(full_path))
+ found.append(full_path)
break
return found
- def _get_dir_vars_files(self, path: str, extensions: list[str]) -> list[str]:
+ def _get_dir_vars_files(self, path, extensions):
found = []
for spath in sorted(self.list_directory(path)):
if not spath.startswith(u'.') and not spath.endswith(u'~'): # skip hidden and backups