source: mod_gnutls/test/runtest.py @ ac516aa

proxy-ticket
Last change on this file since ac516aa was ac516aa, checked in by Fiona Klute <fiona.klute@…>, 8 months ago

runtest.py: Split actions after argument parsing into a main function

This will make handling log files easier.

  • Property mode set to 100755
File size: 8.6 KB
Line 
1#!/usr/bin/python3
2# PYTHON_ARGCOMPLETE_OK
3
4# Copyright 2019 Fiona Klute
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17
18import contextlib
19import os
20import os.path
21import subprocess
22import sys
23
24import mgstest.hooks
25from mgstest import lockfile, TestExpectationFailed
26from mgstest.services import ApacheService, TestService
27from mgstest.tests import run_test_conf
28
29
30
31def find_testdir(number, dir):
32    """Find the configuration directory for a test based on its
33    number. The given directory must contain exactly one directory
34    with a name matching "NUMBER_*", otherwise a LookupError is
35    raised.
36
37    """
38    with os.scandir(dir) as it:
39        found = None
40        for entry in it:
41            if entry.is_dir():
42                num = int(entry.name.split('_', maxsplit=1)[0])
43                if number == num:
44                    if found:
45                        # duplicate numbers are an error
46                        raise LookupError('Multiple directories found for '
47                                          f'test number {number}: '
48                                          f'{found.name} and {entry.name}')
49                    else:
50                        found = entry
51        if found == None:
52            raise LookupError('No test directory found for test number '
53                              f'{number}!')
54        else:
55            return (found.path, found.name)
56
57
58
59def check_ocsp_responder():
60    # Check if OCSP responder works
61    issuer_cert = 'authority/x509.pem'
62    check_cert = 'authority/server/x509.pem'
63    command = ['ocsptool', '--ask', '--nonce',
64               '--load-issuer', issuer_cert,
65               '--load-cert', check_cert]
66    return subprocess.run(command).returncode == 0
67
68def check_msva():
69    # Check if MSVA is up
70    cert_file = 'authority/client/x509.pem'
71    uid_file = 'authority/client/uid'
72    with open(uid_file, 'r') as file:
73        uid = file.read().strip()
74        command = ['msva-query-agent', 'https', uid, 'x509pem', 'client']
75        with open(cert_file, 'r') as cert:
76            return subprocess.run(command, stdin=cert).returncode == 0
77
78
79
80def main(args):
81    # The Automake environment always provides srcdir, the default is
82    # for manual use.
83    srcdir = os.path.realpath(os.environ.get('srcdir', '.'))
84    # ensure environment srcdir is absolute
85    os.environ['srcdir'] = srcdir
86
87    # Find the configuration directory for the test in
88    # ${srcdir}/tests/, based on the test number.
89    testdir, testname = find_testdir(args.test_number,
90                                     os.path.join(srcdir, 'tests'))
91    print(f'Found test {testname}, test dir is {testdir}')
92    os.environ['TEST_NAME'] = testname
93
94    # Load test case hooks (if any)
95    plugin_path = os.path.join(testdir, 'hooks.py')
96    plugin = mgstest.hooks.load_hooks_plugin(plugin_path)
97
98    # PID file name varies depending on whether we're using
99    # namespaces.
100    #
101    # TODO: Check if having the different names is really necessary.
102    pidaffix = ''
103    if 'USE_TEST_NAMESPACE' in os.environ:
104        pidaffix = f'-{testname}'
105
106    # Define the available services
107    apache = ApacheService(config=os.path.join(testdir, 'apache.conf'),
108                           pidfile=f'apache2{pidaffix}.pid')
109    backend = ApacheService(config=os.path.join(testdir, 'backend.conf'),
110                            pidfile=f'backend{pidaffix}.pid')
111    ocsp = ApacheService(config=os.path.join(testdir, 'ocsp.conf'),
112                         pidfile=f'ocsp{pidaffix}.pid',
113                         check=check_ocsp_responder)
114    msva = TestService(start=['monkeysphere-validation-agent'],
115                       env={'GNUPGHOME': 'msva.gnupghome',
116                            'MSVA_KEYSERVER_POLICY': 'never'},
117                       condition=lambda: 'USE_MSVA' in os.environ,
118                       check=check_msva)
119
120    # background services: must be ready before the main apache
121    # instance is started
122    bg_services = [backend, ocsp, msva]
123
124    # TODO: check extra requirements (e.g. specific modules)
125
126    # TODO: add hook to modify environment (unless made obsolete by
127    # parameters)
128
129    # If VERBOSE is enabled, log the HTTPD build configuration
130    if 'VERBOSE' in os.environ:
131        apache2 = os.environ.get('APACHE2', 'apache2')
132        subprocess.run([apache2, '-f', f'{srcdir}/base_apache.conf', '-V'],
133                       check=True)
134
135    if 'USE_MSVA' in os.environ:
136        os.environ['MONKEYSPHERE_VALIDATION_AGENT_SOCKET'] = \
137            f'http://127.0.0.1:{os.environ["MSVA_PORT"]}'
138
139    with contextlib.ExitStack() as service_stack:
140        service_stack.enter_context(lockfile('test.lock', nolock='MGS_NETNS_ACTIVE' in os.environ))
141        service_stack.enter_context(ocsp.run())
142        service_stack.enter_context(backend.run())
143        service_stack.enter_context(msva.run())
144
145        # TEST_SERVICE_MAX_WAIT is in milliseconds
146        wait_timeout = \
147            int(os.environ.get('TEST_SERVICE_MAX_WAIT', 10000)) / 1000
148        for s in bg_services:
149            if s.condition():
150                s.wait_ready(timeout=wait_timeout)
151
152        # special case: expected to fail in a few cases
153        try:
154            service_stack.enter_context(apache.run())
155            if os.path.exists(os.path.join(testdir, 'fail.server')):
156                raise TestExpectationFailed(
157                    'Server start did not fail as expected!')
158            apache.wait_ready()
159        except subprocess.CalledProcessError as e:
160            if os.path.exists(os.path.join(testdir, 'fail.server')):
161                print('Apache server failed to start as expected',
162                      file=sys.stderr)
163            else:
164                raise e
165
166        # Set TEST_TARGET for the request. Might be replaced with a
167        # parameter later.
168        if 'TARGET_IP' in os.environ:
169            os.environ['TEST_TARGET'] = os.environ['TARGET_IP']
170        else:
171            os.environ['TEST_TARGET'] = os.environ['TEST_HOST']
172
173        # Run the test connections
174        with contextlib.ExitStack() as stack:
175            log_file = None
176            output_file = None
177            if args.log_connection:
178                log_file = stack.enter_context(open(args.log_connection, 'w'))
179            if args.log_responses:
180                output_file = stack.enter_context(open(args.log_responses, 'w'))
181
182            if plugin.run_connection:
183                plugin.run_connection(testname,
184                                      conn_log=log_file,
185                                      response_log=output_file)
186            else:
187                test_conf = stack.enter_context(
188                    open(os.path.join(testdir, 'test.yml'), 'r'))
189                run_test_conf(test_conf,
190                              float(os.environ.get('TEST_QUERY_TIMEOUT', 5.0)),
191                              conn_log=log_file, response_log=output_file)
192
193    # run extra checks the test's hooks.py might define
194    if plugin.post_check:
195        log_file = None
196        output_file = None
197        with contextlib.ExitStack() as stack:
198            # TODO: The log files should be created as temporary
199            # files if needed by the plugin but not configured.
200            if args.log_connection:
201                log_file = stack.enter_context(open(args.log_connection, 'r'))
202            if args.log_responses:
203                output_file = stack.enter_context(open(args.log_responses, 'r'))
204            plugin.post_check(conn_log=log_file, response_log=output_file)
205
206
207
208if __name__ == "__main__":
209    import argparse
210    parser = argparse.ArgumentParser(
211        description='Run a mod_gnutls server test')
212    parser.add_argument('--test-number', type=int,
213                        required=True, help='load YAML test configuration')
214    parser.add_argument('--log-connection', type=str, default=None,
215                        help='write connection log to this file')
216    parser.add_argument('--log-responses', type=str, default=None,
217                        help='write HTTP responses to this file')
218
219    # enable bash completion if argcomplete is available
220    try:
221        import argcomplete
222        argcomplete.autocomplete(parser)
223    except ImportError:
224        pass
225
226    args = parser.parse_args()
227
228    main(args)
Note: See TracBrowser for help on using the repository browser.