summaryrefslogtreecommitdiff
path: root/test/units/executor/module_common/test_module_common.py
blob: fa6add8cd61b7df810c15fc10923726bb78834e6 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
# (c) 2017, Toshio Kuratomi <tkuratomi@ansible.com>
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible.  If not, see <http://www.gnu.org/licenses/>.

# Make coding more python3-ish
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type

import os.path

import pytest

import ansible.errors

from ansible.executor import module_common as amc
from ansible.executor.interpreter_discovery import InterpreterDiscoveryRequiredError
from ansible.module_utils.six import PY2


class TestStripComments:
    def test_no_changes(self):
        no_comments = u"""def some_code():
    return False"""
        assert amc._strip_comments(no_comments) == no_comments

    def test_all_comments(self):
        all_comments = u"""# This is a test
            # Being as it is
            # To be
            """
        assert amc._strip_comments(all_comments) == u""

    def test_all_whitespace(self):
        # Note: Do not remove the spaces on the blank lines below.  They're
        # test data to show that the lines get removed despite having spaces
        # on them
        all_whitespace = u"""
              

                
\t\t\r\n
            """  # nopep8
        assert amc._strip_comments(all_whitespace) == u""

    def test_somewhat_normal(self):
        mixed = u"""#!/usr/bin/python

# here we go
def test(arg):
    # this is a thing
    thing = '# test'
    return thing
# End
"""
        mixed_results = u"""def test(arg):
    thing = '# test'
    return thing"""
        assert amc._strip_comments(mixed) == mixed_results


class TestSlurp:
    def test_slurp_nonexistent(self, mocker):
        mocker.patch('os.path.exists', side_effect=lambda x: False)
        with pytest.raises(ansible.errors.AnsibleError):
            amc._slurp('no_file')

    def test_slurp_file(self, mocker):
        mocker.patch('os.path.exists', side_effect=lambda x: True)
        m = mocker.mock_open(read_data='This is a test')
        if PY2:
            mocker.patch('__builtin__.open', m)
        else:
            mocker.patch('builtins.open', m)
        assert amc._slurp('some_file') == 'This is a test'

    def test_slurp_file_with_newlines(self, mocker):
        mocker.patch('os.path.exists', side_effect=lambda x: True)
        m = mocker.mock_open(read_data='#!/usr/bin/python\ndef test(args):\nprint("hi")\n')
        if PY2:
            mocker.patch('__builtin__.open', m)
        else:
            mocker.patch('builtins.open', m)
        assert amc._slurp('some_file') == '#!/usr/bin/python\ndef test(args):\nprint("hi")\n'


@pytest.fixture
def templar():
    class FakeTemplar:
        def template(self, template_string, *args, **kwargs):
            return template_string

    return FakeTemplar()


class TestGetShebang:
    """Note: We may want to change the API of this function in the future.  It isn't a great API"""
    def test_no_interpreter_set(self, templar):
        # normally this would return /usr/bin/python, but so long as we're defaulting to auto python discovery, we'll get
        # an InterpreterDiscoveryRequiredError here instead
        with pytest.raises(InterpreterDiscoveryRequiredError):
            amc._get_shebang(u'/usr/bin/python', {}, templar)

    def test_python_interpreter(self, templar):
        assert amc._get_shebang(u'/usr/bin/python3.8', {}, templar) == ('#!/usr/bin/python3.8', u'/usr/bin/python3.8')

    def test_non_python_interpreter(self, templar):
        assert amc._get_shebang(u'/usr/bin/ruby', {}, templar) == ('#!/usr/bin/ruby', u'/usr/bin/ruby')

    def test_interpreter_set_in_task_vars(self, templar):
        assert amc._get_shebang(u'/usr/bin/python', {u'ansible_python_interpreter': u'/usr/bin/pypy'}, templar) == \
            (u'#!/usr/bin/pypy', u'/usr/bin/pypy')

    def test_non_python_interpreter_in_task_vars(self, templar):
        assert amc._get_shebang(u'/usr/bin/ruby', {u'ansible_ruby_interpreter': u'/usr/local/bin/ruby'}, templar) == \
            (u'#!/usr/local/bin/ruby', u'/usr/local/bin/ruby')

    def test_with_args(self, templar):
        assert amc._get_shebang(u'/usr/bin/python', {u'ansible_python_interpreter': u'/usr/bin/python3'}, templar, args=('-tt', '-OO')) == \
            (u'#!/usr/bin/python3 -tt -OO', u'/usr/bin/python3')

    def test_python_via_env(self, templar):
        assert amc._get_shebang(u'/usr/bin/python', {u'ansible_python_interpreter': u'/usr/bin/env python'}, templar) == \
            (u'#!/usr/bin/env python', u'/usr/bin/env python')


class TestDetectionRegexes:
    ANSIBLE_MODULE_UTIL_STRINGS = (
        # Absolute collection imports
        b'import ansible_collections.my_ns.my_col.plugins.module_utils.my_util',
        b'from ansible_collections.my_ns.my_col.plugins.module_utils import my_util',
        b'from ansible_collections.my_ns.my_col.plugins.module_utils.my_util import my_func',
        # Absolute core imports
        b'import ansible.module_utils.basic',
        b'from ansible.module_utils import basic',
        b'from ansible.module_utils.basic import AnsibleModule',
        # Relative imports
        b'from ..module_utils import basic',
        b'from .. module_utils import basic',
        b'from ....module_utils import basic',
        b'from ..module_utils.basic import AnsibleModule',
    )
    NOT_ANSIBLE_MODULE_UTIL_STRINGS = (
        b'from ansible import release',
        b'from ..release import __version__',
        b'from .. import release',
        b'from ansible.modules.system import ping',
        b'from ansible_collecitons.my_ns.my_col.plugins.modules import function',
    )

    OFFSET = os.path.dirname(os.path.dirname(amc.__file__))
    CORE_PATHS = (
        ('%s/modules/from_role.py' % OFFSET, 'ansible/modules/from_role'),
        ('%s/modules/system/ping.py' % OFFSET, 'ansible/modules/system/ping'),
        ('%s/modules/cloud/amazon/s3.py' % OFFSET, 'ansible/modules/cloud/amazon/s3'),
    )

    COLLECTION_PATHS = (
        ('/root/ansible_collections/ns/col/plugins/modules/ping.py',
         'ansible_collections/ns/col/plugins/modules/ping'),
        ('/root/ansible_collections/ns/col/plugins/modules/subdir/ping.py',
         'ansible_collections/ns/col/plugins/modules/subdir/ping'),
    )

    @pytest.mark.parametrize('testcase', ANSIBLE_MODULE_UTIL_STRINGS)
    def test_detect_new_style_python_module_re(self, testcase):
        assert amc.NEW_STYLE_PYTHON_MODULE_RE.search(testcase)

    @pytest.mark.parametrize('testcase', NOT_ANSIBLE_MODULE_UTIL_STRINGS)
    def test_no_detect_new_style_python_module_re(self, testcase):
        assert not amc.NEW_STYLE_PYTHON_MODULE_RE.search(testcase)

    # pylint bug: https://github.com/PyCQA/pylint/issues/511
    @pytest.mark.parametrize('testcase, result', CORE_PATHS)  # pylint: disable=undefined-variable
    def test_detect_core_library_path_re(self, testcase, result):
        assert amc.CORE_LIBRARY_PATH_RE.search(testcase).group('path') == result

    @pytest.mark.parametrize('testcase', (p[0] for p in COLLECTION_PATHS))  # pylint: disable=undefined-variable
    def test_no_detect_core_library_path_re(self, testcase):
        assert not amc.CORE_LIBRARY_PATH_RE.search(testcase)

    @pytest.mark.parametrize('testcase, result', COLLECTION_PATHS)  # pylint: disable=undefined-variable
    def test_detect_collection_path_re(self, testcase, result):
        assert amc.COLLECTION_PATH_RE.search(testcase).group('path') == result

    @pytest.mark.parametrize('testcase', (p[0] for p in CORE_PATHS))  # pylint: disable=undefined-variable
    def test_no_detect_collection_path_re(self, testcase):
        assert not amc.COLLECTION_PATH_RE.search(testcase)