source: mod_gnutls/test/runtest.py @ c33b0ea

asynciomainproxy-ticket
Last change on this file since c33b0ea was 60ed7d1, checked in by Fiona Klute <fiona.klute@…>, 3 years ago

Allow the prepare_env hook to return a cleanup callback

  • Property mode set to 100755
File size: 8.6 KB
Line 
1#!/usr/bin/python3
2# PYTHON_ARGCOMPLETE_OK
3
4# Copyright 2019 Fiona Klute
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17
18import contextlib
19import os
20import os.path
21import subprocess
22import sys
23import tempfile
24from unittest import SkipTest
25
26import mgstest.hooks
27from mgstest import lockfile, TestExpectationFailed
28from mgstest.services import ApacheService, TestService
29from mgstest.tests import run_test_conf
30
31
32
33def find_testdir(number, dir):
34    """Find the configuration directory for a test based on its
35    number. The given directory must contain exactly one directory
36    with a name matching "NUMBER_*", otherwise a LookupError is
37    raised.
38
39    """
40    with os.scandir(dir) as it:
41        found = None
42        for entry in it:
43            if entry.is_dir():
44                num = int(entry.name.split('_', maxsplit=1)[0])
45                if number == num:
46                    if found:
47                        # duplicate numbers are an error
48                        raise LookupError('Multiple directories found for '
49                                          f'test number {number}: '
50                                          f'{found.name} and {entry.name}')
51                    else:
52                        found = entry
53        if found == None:
54            raise LookupError('No test directory found for test number '
55                              f'{number}!')
56        else:
57            return (found.path, found.name)
58
59def temp_logfile():
60    return tempfile.SpooledTemporaryFile(max_size=4096, mode='w+',
61                                         prefix='mod_gnutls', suffix=".log")
62
63
64
65
66def check_ocsp_responder():
67    # Check if OCSP responder works
68    issuer_cert = 'authority/x509.pem'
69    check_cert = 'authority/server/x509.pem'
70    command = ['ocsptool', '--ask', '--nonce',
71               '--load-issuer', issuer_cert,
72               '--load-cert', check_cert]
73    return subprocess.run(command).returncode == 0
74
75def check_msva():
76    # Check if MSVA is up
77    cert_file = 'authority/client/x509.pem'
78    uid_file = 'authority/client/uid'
79    with open(uid_file, 'r') as file:
80        uid = file.read().strip()
81        command = ['msva-query-agent', 'https', uid, 'x509pem', 'client']
82        with open(cert_file, 'r') as cert:
83            return subprocess.run(command, stdin=cert).returncode == 0
84
85
86
87def main(args):
88    # The Automake environment always provides srcdir, the default is
89    # for manual use.
90    srcdir = os.path.realpath(os.environ.get('srcdir', '.'))
91    # ensure environment srcdir is absolute
92    os.environ['srcdir'] = srcdir
93
94    # Find the configuration directory for the test in
95    # ${srcdir}/tests/, based on the test number.
96    testdir, testname = find_testdir(args.test_number,
97                                     os.path.join(srcdir, 'tests'))
98    print(f'Found test {testname}, test dir is {testdir}')
99    os.environ['TEST_NAME'] = testname
100
101    # Load test case hooks (if any)
102    plugin_path = os.path.join(testdir, 'hooks.py')
103    plugin = mgstest.hooks.load_hooks_plugin(plugin_path)
104
105    # PID file name varies depending on whether we're using
106    # namespaces.
107    #
108    # TODO: Check if having the different names is really necessary.
109    pidaffix = ''
110    if 'USE_TEST_NAMESPACE' in os.environ:
111        pidaffix = f'-{testname}'
112
113    # Define the available services
114    apache = ApacheService(config=os.path.join(testdir, 'apache.conf'),
115                           pidfile=f'apache2{pidaffix}.pid')
116    backend = ApacheService(config=os.path.join(testdir, 'backend.conf'),
117                            pidfile=f'backend{pidaffix}.pid')
118    ocsp = ApacheService(config=os.path.join(testdir, 'ocsp.conf'),
119                         pidfile=f'ocsp{pidaffix}.pid',
120                         check=check_ocsp_responder)
121    msva = TestService(start=['monkeysphere-validation-agent'],
122                       env={'GNUPGHOME': 'msva.gnupghome',
123                            'MSVA_KEYSERVER_POLICY': 'never'},
124                       condition=lambda: 'USE_MSVA' in os.environ,
125                       check=check_msva)
126
127    # background services: must be ready before the main apache
128    # instance is started
129    bg_services = [backend, ocsp, msva]
130
131    # If VERBOSE is enabled, log the HTTPD build configuration
132    if 'VERBOSE' in os.environ:
133        apache2 = os.environ.get('APACHE2', 'apache2')
134        subprocess.run([apache2, '-f', f'{srcdir}/base_apache.conf', '-V'],
135                       check=True)
136
137    # This hook may modify the environment as needed for the test.
138    cleanup_callback = None
139    try:
140        if plugin.prepare_env:
141            cleanup_callback = plugin.prepare_env()
142    except SkipTest as skip:
143        print(f'Skipping: {skip!s}')
144        sys.exit(77)
145
146    if 'USE_MSVA' in os.environ:
147        os.environ['MONKEYSPHERE_VALIDATION_AGENT_SOCKET'] = \
148            f'http://127.0.0.1:{os.environ["MSVA_PORT"]}'
149
150    with contextlib.ExitStack() as service_stack:
151        if cleanup_callback:
152            service_stack.callback(cleanup_callback)
153        service_stack.enter_context(lockfile('test.lock', nolock='MGS_NETNS_ACTIVE' in os.environ))
154        service_stack.enter_context(ocsp.run())
155        service_stack.enter_context(backend.run())
156        service_stack.enter_context(msva.run())
157
158        # TEST_SERVICE_MAX_WAIT is in milliseconds
159        wait_timeout = \
160            int(os.environ.get('TEST_SERVICE_MAX_WAIT', 10000)) / 1000
161        for s in bg_services:
162            if s.condition():
163                s.wait_ready(timeout=wait_timeout)
164
165        # special case: expected to fail in a few cases
166        try:
167            service_stack.enter_context(apache.run())
168            if os.path.exists(os.path.join(testdir, 'fail.server')):
169                raise TestExpectationFailed(
170                    'Server start did not fail as expected!')
171            apache.wait_ready()
172        except subprocess.CalledProcessError as e:
173            if os.path.exists(os.path.join(testdir, 'fail.server')):
174                print('Apache server failed to start as expected',
175                      file=sys.stderr)
176            else:
177                raise e
178
179        # Set TEST_TARGET for the request. Might be replaced with a
180        # parameter later.
181        if 'TARGET_IP' in os.environ:
182            os.environ['TEST_TARGET'] = os.environ['TARGET_IP']
183        else:
184            os.environ['TEST_TARGET'] = os.environ['TEST_HOST']
185
186        # Run the test connections
187        if plugin.run_connection:
188            plugin.run_connection(testname,
189                                  conn_log=args.log_connection,
190                                  response_log=args.log_responses)
191        else:
192            with open(os.path.join(testdir, 'test.yml'), 'r') as test_conf:
193                run_test_conf(test_conf,
194                              float(os.environ.get('TEST_QUERY_TIMEOUT', 5.0)),
195                              conn_log=args.log_connection,
196                              response_log=args.log_responses)
197
198    # run extra checks the test's hooks.py might define
199    if plugin.post_check:
200        args.log_connection.seek(0)
201        args.log_responses.seek(0)
202        plugin.post_check(conn_log=args.log_connection,
203                          response_log=args.log_responses)
204
205
206
207if __name__ == "__main__":
208    import argparse
209    parser = argparse.ArgumentParser(
210        description='Run a mod_gnutls server test')
211    parser.add_argument('--test-number', type=int,
212                        required=True, help='load YAML test configuration')
213    parser.add_argument('--log-connection', type=argparse.FileType('w+'),
214                        default=temp_logfile(),
215                        help='write connection log to this file')
216    parser.add_argument('--log-responses', type=argparse.FileType('w+'),
217                        default=temp_logfile(),
218                        help='write HTTP responses to this file')
219
220    # enable bash completion if argcomplete is available
221    try:
222        import argcomplete
223        argcomplete.autocomplete(parser)
224    except ImportError:
225        pass
226
227    args = parser.parse_args()
228
229    with contextlib.ExitStack() as stack:
230        stack.enter_context(contextlib.closing(args.log_connection))
231        stack.enter_context(contextlib.closing(args.log_responses))
232        main(args)
Note: See TracBrowser for help on using the repository browser.