source: mod_gnutls/test/runtest.py @ 0f52d48

asyncioproxy-ticket
Last change on this file since 0f52d48 was 9a48691, checked in by Fiona Klute <fiona.klute@…>, 15 months ago

Test suite: Support checking preconditions in prepare_env hook

The hook may now raise unittest.SkipTest? to skip the test case if any
preconditions (e.g. availability of a certain Apache module) are not
met.

  • Property mode set to 100755
File size: 8.5 KB
Line 
1#!/usr/bin/python3
2# PYTHON_ARGCOMPLETE_OK
3
4# Copyright 2019 Fiona Klute
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17
18import contextlib
19import os
20import os.path
21import subprocess
22import sys
23import tempfile
24from unittest import SkipTest
25
26import mgstest.hooks
27from mgstest import lockfile, TestExpectationFailed
28from mgstest.services import ApacheService, TestService
29from mgstest.tests import run_test_conf
30
31
32
33def find_testdir(number, dir):
34    """Find the configuration directory for a test based on its
35    number. The given directory must contain exactly one directory
36    with a name matching "NUMBER_*", otherwise a LookupError is
37    raised.
38
39    """
40    with os.scandir(dir) as it:
41        found = None
42        for entry in it:
43            if entry.is_dir():
44                num = int(entry.name.split('_', maxsplit=1)[0])
45                if number == num:
46                    if found:
47                        # duplicate numbers are an error
48                        raise LookupError('Multiple directories found for '
49                                          f'test number {number}: '
50                                          f'{found.name} and {entry.name}')
51                    else:
52                        found = entry
53        if found == None:
54            raise LookupError('No test directory found for test number '
55                              f'{number}!')
56        else:
57            return (found.path, found.name)
58
59def temp_logfile():
60    return tempfile.SpooledTemporaryFile(max_size=4096, mode='w+',
61                                         prefix='mod_gnutls', suffix=".log")
62
63
64
65
66def check_ocsp_responder():
67    # Check if OCSP responder works
68    issuer_cert = 'authority/x509.pem'
69    check_cert = 'authority/server/x509.pem'
70    command = ['ocsptool', '--ask', '--nonce',
71               '--load-issuer', issuer_cert,
72               '--load-cert', check_cert]
73    return subprocess.run(command).returncode == 0
74
75def check_msva():
76    # Check if MSVA is up
77    cert_file = 'authority/client/x509.pem'
78    uid_file = 'authority/client/uid'
79    with open(uid_file, 'r') as file:
80        uid = file.read().strip()
81        command = ['msva-query-agent', 'https', uid, 'x509pem', 'client']
82        with open(cert_file, 'r') as cert:
83            return subprocess.run(command, stdin=cert).returncode == 0
84
85
86
87def main(args):
88    # The Automake environment always provides srcdir, the default is
89    # for manual use.
90    srcdir = os.path.realpath(os.environ.get('srcdir', '.'))
91    # ensure environment srcdir is absolute
92    os.environ['srcdir'] = srcdir
93
94    # Find the configuration directory for the test in
95    # ${srcdir}/tests/, based on the test number.
96    testdir, testname = find_testdir(args.test_number,
97                                     os.path.join(srcdir, 'tests'))
98    print(f'Found test {testname}, test dir is {testdir}')
99    os.environ['TEST_NAME'] = testname
100
101    # Load test case hooks (if any)
102    plugin_path = os.path.join(testdir, 'hooks.py')
103    plugin = mgstest.hooks.load_hooks_plugin(plugin_path)
104
105    # PID file name varies depending on whether we're using
106    # namespaces.
107    #
108    # TODO: Check if having the different names is really necessary.
109    pidaffix = ''
110    if 'USE_TEST_NAMESPACE' in os.environ:
111        pidaffix = f'-{testname}'
112
113    # Define the available services
114    apache = ApacheService(config=os.path.join(testdir, 'apache.conf'),
115                           pidfile=f'apache2{pidaffix}.pid')
116    backend = ApacheService(config=os.path.join(testdir, 'backend.conf'),
117                            pidfile=f'backend{pidaffix}.pid')
118    ocsp = ApacheService(config=os.path.join(testdir, 'ocsp.conf'),
119                         pidfile=f'ocsp{pidaffix}.pid',
120                         check=check_ocsp_responder)
121    msva = TestService(start=['monkeysphere-validation-agent'],
122                       env={'GNUPGHOME': 'msva.gnupghome',
123                            'MSVA_KEYSERVER_POLICY': 'never'},
124                       condition=lambda: 'USE_MSVA' in os.environ,
125                       check=check_msva)
126
127    # background services: must be ready before the main apache
128    # instance is started
129    bg_services = [backend, ocsp, msva]
130
131    # TODO: check extra requirements (e.g. specific modules)
132
133    # This hook may modify the environment as needed for the test.
134    try:
135        if plugin.prepare_env:
136            plugin.prepare_env()
137    except SkipTest as skip:
138        print(f'Skipping: {skip!s}')
139        sys.exit(77)
140
141    # If VERBOSE is enabled, log the HTTPD build configuration
142    if 'VERBOSE' in os.environ:
143        apache2 = os.environ.get('APACHE2', 'apache2')
144        subprocess.run([apache2, '-f', f'{srcdir}/base_apache.conf', '-V'],
145                       check=True)
146
147    if 'USE_MSVA' in os.environ:
148        os.environ['MONKEYSPHERE_VALIDATION_AGENT_SOCKET'] = \
149            f'http://127.0.0.1:{os.environ["MSVA_PORT"]}'
150
151    with contextlib.ExitStack() as service_stack:
152        service_stack.enter_context(lockfile('test.lock', nolock='MGS_NETNS_ACTIVE' in os.environ))
153        service_stack.enter_context(ocsp.run())
154        service_stack.enter_context(backend.run())
155        service_stack.enter_context(msva.run())
156
157        # TEST_SERVICE_MAX_WAIT is in milliseconds
158        wait_timeout = \
159            int(os.environ.get('TEST_SERVICE_MAX_WAIT', 10000)) / 1000
160        for s in bg_services:
161            if s.condition():
162                s.wait_ready(timeout=wait_timeout)
163
164        # special case: expected to fail in a few cases
165        try:
166            service_stack.enter_context(apache.run())
167            if os.path.exists(os.path.join(testdir, 'fail.server')):
168                raise TestExpectationFailed(
169                    'Server start did not fail as expected!')
170            apache.wait_ready()
171        except subprocess.CalledProcessError as e:
172            if os.path.exists(os.path.join(testdir, 'fail.server')):
173                print('Apache server failed to start as expected',
174                      file=sys.stderr)
175            else:
176                raise e
177
178        # Set TEST_TARGET for the request. Might be replaced with a
179        # parameter later.
180        if 'TARGET_IP' in os.environ:
181            os.environ['TEST_TARGET'] = os.environ['TARGET_IP']
182        else:
183            os.environ['TEST_TARGET'] = os.environ['TEST_HOST']
184
185        # Run the test connections
186        if plugin.run_connection:
187            plugin.run_connection(testname,
188                                  conn_log=args.log_connection,
189                                  response_log=args.log_responses)
190        else:
191            with open(os.path.join(testdir, 'test.yml'), 'r') as test_conf:
192                run_test_conf(test_conf,
193                              float(os.environ.get('TEST_QUERY_TIMEOUT', 5.0)),
194                              conn_log=args.log_connection,
195                              response_log=args.log_responses)
196
197    # run extra checks the test's hooks.py might define
198    if plugin.post_check:
199        args.log_connection.seek(0)
200        args.log_responses.seek(0)
201        plugin.post_check(conn_log=args.log_connection,
202                          response_log=args.log_responses)
203
204
205
206if __name__ == "__main__":
207    import argparse
208    parser = argparse.ArgumentParser(
209        description='Run a mod_gnutls server test')
210    parser.add_argument('--test-number', type=int,
211                        required=True, help='load YAML test configuration')
212    parser.add_argument('--log-connection', type=argparse.FileType('w+'),
213                        default=temp_logfile(),
214                        help='write connection log to this file')
215    parser.add_argument('--log-responses', type=argparse.FileType('w+'),
216                        default=temp_logfile(),
217                        help='write HTTP responses to this file')
218
219    # enable bash completion if argcomplete is available
220    try:
221        import argcomplete
222        argcomplete.autocomplete(parser)
223    except ImportError:
224        pass
225
226    args = parser.parse_args()
227
228    with contextlib.ExitStack() as stack:
229        stack.enter_context(contextlib.closing(args.log_connection))
230        stack.enter_context(contextlib.closing(args.log_responses))
231        main(args)
Note: See TracBrowser for help on using the repository browser.