source: mod_gnutls/test/runtest.py @ 079859e

proxy-ticket
Last change on this file since 079859e was 079859e, checked in by Fiona Klute <fiona.klute@…>, 8 months ago

Fix exception messages for invalid test numbers

  • Property mode set to 100755
File size: 8.5 KB
Line 
1#!/usr/bin/python3
2# PYTHON_ARGCOMPLETE_OK
3
4# Copyright 2019 Fiona Klute
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17
18import contextlib
19import os
20import os.path
21import subprocess
22import sys
23
24import mgstest.hooks
25from mgstest import lockfile, TestExpectationFailed
26from mgstest.services import ApacheService, TestService
27from mgstest.tests import run_test_conf
28
29
30
31def find_testdir(number, dir):
32    """Find the configuration directory for a test based on its
33    number. The given directory must contain exactly one directory
34    with a name matching "NUMBER_*", otherwise a LookupError is
35    raised.
36
37    """
38    with os.scandir(dir) as it:
39        found = None
40        for entry in it:
41            if entry.is_dir():
42                num = int(entry.name.split('_', maxsplit=1)[0])
43                if number == num:
44                    if found:
45                        # duplicate numbers are an error
46                        raise LookupError('Multiple directories found for '
47                                          f'test number {number}: '
48                                          f'{found.name} and {entry.name}')
49                    else:
50                        found = entry
51        if found == None:
52            raise LookupError('No test directory found for test number '
53                              f'{number}!')
54        else:
55            return (found.path, found.name)
56
57
58
59def check_ocsp_responder():
60    # Check if OCSP responder works
61    issuer_cert = 'authority/x509.pem'
62    check_cert = 'authority/server/x509.pem'
63    command = ['ocsptool', '--ask', '--nonce',
64               '--load-issuer', issuer_cert,
65               '--load-cert', check_cert]
66    return subprocess.run(command).returncode == 0
67
68def check_msva():
69    # Check if MSVA is up
70    cert_file = 'authority/client/x509.pem'
71    uid_file = 'authority/client/uid'
72    with open(uid_file, 'r') as file:
73        uid = file.read().strip()
74        command = ['msva-query-agent', 'https', uid, 'x509pem', 'client']
75        with open(cert_file, 'r') as cert:
76            return subprocess.run(command, stdin=cert).returncode == 0
77
78
79
80if __name__ == "__main__":
81    import argparse
82    parser = argparse.ArgumentParser(
83        description='Run a mod_gnutls server test')
84    parser.add_argument('--test-number', type=int,
85                        required=True, help='load YAML test configuration')
86    parser.add_argument('--log-connection', type=str, default=None,
87                        help='write connection log to this file')
88    parser.add_argument('--log-responses', type=str, default=None,
89                        help='write HTTP responses to this file')
90
91    # enable bash completion if argcomplete is available
92    try:
93        import argcomplete
94        argcomplete.autocomplete(parser)
95    except ImportError:
96        pass
97
98    args = parser.parse_args()
99
100    # The Automake environment always provides srcdir, the default is
101    # for manual use.
102    srcdir = os.path.realpath(os.environ.get('srcdir', '.'))
103    # ensure environment srcdir is absolute
104    os.environ['srcdir'] = srcdir
105
106    # Find the configuration directory for the test in
107    # ${srcdir}/tests/, based on the test number.
108    testdir, testname = find_testdir(args.test_number,
109                                     os.path.join(srcdir, 'tests'))
110    print(f'Found test {testname}, test dir is {testdir}')
111    os.environ['TEST_NAME'] = testname
112
113    # Load test case hooks (if any)
114    plugin_path = os.path.join(testdir, 'hooks.py')
115    plugin = mgstest.hooks.load_hooks_plugin(plugin_path)
116
117    # PID file name varies depending on whether we're using
118    # namespaces.
119    #
120    # TODO: Check if having the different names is really necessary.
121    pidaffix = ''
122    if 'USE_TEST_NAMESPACE' in os.environ:
123        pidaffix = f'-{testname}'
124
125    # Define the available services
126    apache = ApacheService(config=os.path.join(testdir, 'apache.conf'),
127                           pidfile=f'apache2{pidaffix}.pid')
128    backend = ApacheService(config=os.path.join(testdir, 'backend.conf'),
129                            pidfile=f'backend{pidaffix}.pid')
130    ocsp = ApacheService(config=os.path.join(testdir, 'ocsp.conf'),
131                         pidfile=f'ocsp{pidaffix}.pid',
132                         check=check_ocsp_responder)
133    msva = TestService(start=['monkeysphere-validation-agent'],
134                       env={'GNUPGHOME': 'msva.gnupghome',
135                            'MSVA_KEYSERVER_POLICY': 'never'},
136                       condition=lambda: 'USE_MSVA' in os.environ,
137                       check=check_msva)
138
139    # background services: must be ready before the main apache
140    # instance is started
141    bg_services = [backend, ocsp, msva]
142
143    # TODO: check extra requirements (e.g. specific modules)
144
145    # TODO: add hook to modify environment (unless made obsolete by
146    # parameters)
147
148    # If VERBOSE is enabled, log the HTTPD build configuration
149    if 'VERBOSE' in os.environ:
150        apache2 = os.environ.get('APACHE2', 'apache2')
151        subprocess.run([apache2, '-f', f'{srcdir}/base_apache.conf', '-V'],
152                       check=True)
153
154    if 'USE_MSVA' in os.environ:
155        os.environ['MONKEYSPHERE_VALIDATION_AGENT_SOCKET'] = \
156            f'http://127.0.0.1:{os.environ["MSVA_PORT"]}'
157
158    with contextlib.ExitStack() as service_stack:
159        service_stack.enter_context(lockfile('test.lock', nolock='MGS_NETNS_ACTIVE' in os.environ))
160        service_stack.enter_context(ocsp.run())
161        service_stack.enter_context(backend.run())
162        service_stack.enter_context(msva.run())
163
164        # TEST_SERVICE_MAX_WAIT is in milliseconds
165        wait_timeout = \
166            int(os.environ.get('TEST_SERVICE_MAX_WAIT', 10000)) / 1000
167        for s in bg_services:
168            if s.condition():
169                s.wait_ready(timeout=wait_timeout)
170
171        # special case: expected to fail in a few cases
172        try:
173            service_stack.enter_context(apache.run())
174            if os.path.exists(os.path.join(testdir, 'fail.server')):
175                raise TestExpectationFailed(
176                    'Server start did not fail as expected!')
177            apache.wait_ready()
178        except subprocess.CalledProcessError as e:
179            if os.path.exists(os.path.join(testdir, 'fail.server')):
180                print('Apache server failed to start as expected',
181                      file=sys.stderr)
182            else:
183                raise e
184
185        # Set TEST_TARGET for the request. Might be replaced with a
186        # parameter later.
187        if 'TARGET_IP' in os.environ:
188            os.environ['TEST_TARGET'] = os.environ['TARGET_IP']
189        else:
190            os.environ['TEST_TARGET'] = os.environ['TEST_HOST']
191
192        # Run the test connections
193        with contextlib.ExitStack() as stack:
194            log_file = None
195            output_file = None
196            if args.log_connection:
197                log_file = stack.enter_context(open(args.log_connection, 'w'))
198            if args.log_responses:
199                output_file = stack.enter_context(open(args.log_responses, 'w'))
200
201            if plugin.run_connection:
202                plugin.run_connection(testname,
203                                      conn_log=log_file,
204                                      response_log=output_file)
205            else:
206                test_conf = stack.enter_context(
207                    open(os.path.join(testdir, 'test.yml'), 'r'))
208                run_test_conf(test_conf,
209                              float(os.environ.get('TEST_QUERY_TIMEOUT', 5.0)),
210                              conn_log=log_file, response_log=output_file)
211
212    # run extra checks the test's hooks.py might define
213    if plugin.post_check:
214        log_file = None
215        output_file = None
216        with contextlib.ExitStack() as stack:
217            # TODO: The log files should be created as temporary
218            # files if needed by the plugin but not configured.
219            if args.log_connection:
220                log_file = stack.enter_context(open(args.log_connection, 'r'))
221            if args.log_responses:
222                output_file = stack.enter_context(open(args.log_responses, 'r'))
223            plugin.post_check(conn_log=log_file, response_log=output_file)
Note: See TracBrowser for help on using the repository browser.