source: mod_gnutls/test/runtest.py @ 8666b50

asyncioproxy-ticket
Last change on this file since 8666b50 was 8666b50, checked in by Fiona Klute <fiona.klute@…>, 23 months ago

Implement "prepare_env" hook for tests

Test 15_basic_msva uses it to enable MSVA.

  • Property mode set to 100755
File size: 8.4 KB
Line 
1#!/usr/bin/python3
2# PYTHON_ARGCOMPLETE_OK
3
4# Copyright 2019 Fiona Klute
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17
18import contextlib
19import os
20import os.path
21import subprocess
22import sys
23import tempfile
24
25import mgstest.hooks
26from mgstest import lockfile, TestExpectationFailed
27from mgstest.services import ApacheService, TestService
28from mgstest.tests import run_test_conf
29
30
31
32def find_testdir(number, dir):
33    """Find the configuration directory for a test based on its
34    number. The given directory must contain exactly one directory
35    with a name matching "NUMBER_*", otherwise a LookupError is
36    raised.
37
38    """
39    with os.scandir(dir) as it:
40        found = None
41        for entry in it:
42            if entry.is_dir():
43                num = int(entry.name.split('_', maxsplit=1)[0])
44                if number == num:
45                    if found:
46                        # duplicate numbers are an error
47                        raise LookupError('Multiple directories found for '
48                                          f'test number {number}: '
49                                          f'{found.name} and {entry.name}')
50                    else:
51                        found = entry
52        if found == None:
53            raise LookupError('No test directory found for test number '
54                              f'{number}!')
55        else:
56            return (found.path, found.name)
57
58def temp_logfile():
59    return tempfile.SpooledTemporaryFile(max_size=4096, mode='w+',
60                                         prefix='mod_gnutls', suffix=".log")
61
62
63
64
65def check_ocsp_responder():
66    # Check if OCSP responder works
67    issuer_cert = 'authority/x509.pem'
68    check_cert = 'authority/server/x509.pem'
69    command = ['ocsptool', '--ask', '--nonce',
70               '--load-issuer', issuer_cert,
71               '--load-cert', check_cert]
72    return subprocess.run(command).returncode == 0
73
74def check_msva():
75    # Check if MSVA is up
76    cert_file = 'authority/client/x509.pem'
77    uid_file = 'authority/client/uid'
78    with open(uid_file, 'r') as file:
79        uid = file.read().strip()
80        command = ['msva-query-agent', 'https', uid, 'x509pem', 'client']
81        with open(cert_file, 'r') as cert:
82            return subprocess.run(command, stdin=cert).returncode == 0
83
84
85
86def main(args):
87    # The Automake environment always provides srcdir, the default is
88    # for manual use.
89    srcdir = os.path.realpath(os.environ.get('srcdir', '.'))
90    # ensure environment srcdir is absolute
91    os.environ['srcdir'] = srcdir
92
93    # Find the configuration directory for the test in
94    # ${srcdir}/tests/, based on the test number.
95    testdir, testname = find_testdir(args.test_number,
96                                     os.path.join(srcdir, 'tests'))
97    print(f'Found test {testname}, test dir is {testdir}')
98    os.environ['TEST_NAME'] = testname
99
100    # Load test case hooks (if any)
101    plugin_path = os.path.join(testdir, 'hooks.py')
102    plugin = mgstest.hooks.load_hooks_plugin(plugin_path)
103
104    # PID file name varies depending on whether we're using
105    # namespaces.
106    #
107    # TODO: Check if having the different names is really necessary.
108    pidaffix = ''
109    if 'USE_TEST_NAMESPACE' in os.environ:
110        pidaffix = f'-{testname}'
111
112    # Define the available services
113    apache = ApacheService(config=os.path.join(testdir, 'apache.conf'),
114                           pidfile=f'apache2{pidaffix}.pid')
115    backend = ApacheService(config=os.path.join(testdir, 'backend.conf'),
116                            pidfile=f'backend{pidaffix}.pid')
117    ocsp = ApacheService(config=os.path.join(testdir, 'ocsp.conf'),
118                         pidfile=f'ocsp{pidaffix}.pid',
119                         check=check_ocsp_responder)
120    msva = TestService(start=['monkeysphere-validation-agent'],
121                       env={'GNUPGHOME': 'msva.gnupghome',
122                            'MSVA_KEYSERVER_POLICY': 'never'},
123                       condition=lambda: 'USE_MSVA' in os.environ,
124                       check=check_msva)
125
126    # background services: must be ready before the main apache
127    # instance is started
128    bg_services = [backend, ocsp, msva]
129
130    # TODO: check extra requirements (e.g. specific modules)
131
132    # This hook may modify the environment as needed for the test.
133    if plugin.prepare_env:
134        plugin.prepare_env()
135
136    # If VERBOSE is enabled, log the HTTPD build configuration
137    if 'VERBOSE' in os.environ:
138        apache2 = os.environ.get('APACHE2', 'apache2')
139        subprocess.run([apache2, '-f', f'{srcdir}/base_apache.conf', '-V'],
140                       check=True)
141
142    if 'USE_MSVA' in os.environ:
143        os.environ['MONKEYSPHERE_VALIDATION_AGENT_SOCKET'] = \
144            f'http://127.0.0.1:{os.environ["MSVA_PORT"]}'
145
146    with contextlib.ExitStack() as service_stack:
147        service_stack.enter_context(lockfile('test.lock', nolock='MGS_NETNS_ACTIVE' in os.environ))
148        service_stack.enter_context(ocsp.run())
149        service_stack.enter_context(backend.run())
150        service_stack.enter_context(msva.run())
151
152        # TEST_SERVICE_MAX_WAIT is in milliseconds
153        wait_timeout = \
154            int(os.environ.get('TEST_SERVICE_MAX_WAIT', 10000)) / 1000
155        for s in bg_services:
156            if s.condition():
157                s.wait_ready(timeout=wait_timeout)
158
159        # special case: expected to fail in a few cases
160        try:
161            service_stack.enter_context(apache.run())
162            if os.path.exists(os.path.join(testdir, 'fail.server')):
163                raise TestExpectationFailed(
164                    'Server start did not fail as expected!')
165            apache.wait_ready()
166        except subprocess.CalledProcessError as e:
167            if os.path.exists(os.path.join(testdir, 'fail.server')):
168                print('Apache server failed to start as expected',
169                      file=sys.stderr)
170            else:
171                raise e
172
173        # Set TEST_TARGET for the request. Might be replaced with a
174        # parameter later.
175        if 'TARGET_IP' in os.environ:
176            os.environ['TEST_TARGET'] = os.environ['TARGET_IP']
177        else:
178            os.environ['TEST_TARGET'] = os.environ['TEST_HOST']
179
180        # Run the test connections
181        if plugin.run_connection:
182            plugin.run_connection(testname,
183                                  conn_log=args.log_connection,
184                                  response_log=args.log_responses)
185        else:
186            with open(os.path.join(testdir, 'test.yml'), 'r') as test_conf:
187                run_test_conf(test_conf,
188                              float(os.environ.get('TEST_QUERY_TIMEOUT', 5.0)),
189                              conn_log=args.log_connection,
190                              response_log=args.log_responses)
191
192    # run extra checks the test's hooks.py might define
193    if plugin.post_check:
194        args.log_connection.seek(0)
195        args.log_responses.seek(0)
196        plugin.post_check(conn_log=args.log_connection,
197                          response_log=args.log_responses)
198
199
200
201if __name__ == "__main__":
202    import argparse
203    parser = argparse.ArgumentParser(
204        description='Run a mod_gnutls server test')
205    parser.add_argument('--test-number', type=int,
206                        required=True, help='load YAML test configuration')
207    parser.add_argument('--log-connection', type=argparse.FileType('w+'),
208                        default=temp_logfile(),
209                        help='write connection log to this file')
210    parser.add_argument('--log-responses', type=argparse.FileType('w+'),
211                        default=temp_logfile(),
212                        help='write HTTP responses to this file')
213
214    # enable bash completion if argcomplete is available
215    try:
216        import argcomplete
217        argcomplete.autocomplete(parser)
218    except ImportError:
219        pass
220
221    args = parser.parse_args()
222
223    with contextlib.ExitStack() as stack:
224        stack.enter_context(contextlib.closing(args.log_connection))
225        stack.enter_context(contextlib.closing(args.log_responses))
226        main(args)
Note: See TracBrowser for help on using the repository browser.