source: mod_gnutls/test/runtest.py @ 3deb86e

proxy-ticket
Last change on this file since 3deb86e was 4de8cd3, checked in by Fiona Klute <fiona.klute@…>, 11 months ago

Remove an ExitStack? made obsolete by wrapping "with" around main()

  • Property mode set to 100755
File size: 8.4 KB
Line 
1#!/usr/bin/python3
2# PYTHON_ARGCOMPLETE_OK
3
4# Copyright 2019 Fiona Klute
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17
18import contextlib
19import os
20import os.path
21import subprocess
22import sys
23import tempfile
24
25import mgstest.hooks
26from mgstest import lockfile, TestExpectationFailed
27from mgstest.services import ApacheService, TestService
28from mgstest.tests import run_test_conf
29
30
31
32def find_testdir(number, dir):
33    """Find the configuration directory for a test based on its
34    number. The given directory must contain exactly one directory
35    with a name matching "NUMBER_*", otherwise a LookupError is
36    raised.
37
38    """
39    with os.scandir(dir) as it:
40        found = None
41        for entry in it:
42            if entry.is_dir():
43                num = int(entry.name.split('_', maxsplit=1)[0])
44                if number == num:
45                    if found:
46                        # duplicate numbers are an error
47                        raise LookupError('Multiple directories found for '
48                                          f'test number {number}: '
49                                          f'{found.name} and {entry.name}')
50                    else:
51                        found = entry
52        if found == None:
53            raise LookupError('No test directory found for test number '
54                              f'{number}!')
55        else:
56            return (found.path, found.name)
57
58def temp_logfile():
59    return tempfile.SpooledTemporaryFile(max_size=4096, mode='w+',
60                                         prefix='mod_gnutls', suffix=".log")
61
62
63
64
65def check_ocsp_responder():
66    # Check if OCSP responder works
67    issuer_cert = 'authority/x509.pem'
68    check_cert = 'authority/server/x509.pem'
69    command = ['ocsptool', '--ask', '--nonce',
70               '--load-issuer', issuer_cert,
71               '--load-cert', check_cert]
72    return subprocess.run(command).returncode == 0
73
74def check_msva():
75    # Check if MSVA is up
76    cert_file = 'authority/client/x509.pem'
77    uid_file = 'authority/client/uid'
78    with open(uid_file, 'r') as file:
79        uid = file.read().strip()
80        command = ['msva-query-agent', 'https', uid, 'x509pem', 'client']
81        with open(cert_file, 'r') as cert:
82            return subprocess.run(command, stdin=cert).returncode == 0
83
84
85
86def main(args):
87    # The Automake environment always provides srcdir, the default is
88    # for manual use.
89    srcdir = os.path.realpath(os.environ.get('srcdir', '.'))
90    # ensure environment srcdir is absolute
91    os.environ['srcdir'] = srcdir
92
93    # Find the configuration directory for the test in
94    # ${srcdir}/tests/, based on the test number.
95    testdir, testname = find_testdir(args.test_number,
96                                     os.path.join(srcdir, 'tests'))
97    print(f'Found test {testname}, test dir is {testdir}')
98    os.environ['TEST_NAME'] = testname
99
100    # Load test case hooks (if any)
101    plugin_path = os.path.join(testdir, 'hooks.py')
102    plugin = mgstest.hooks.load_hooks_plugin(plugin_path)
103
104    # PID file name varies depending on whether we're using
105    # namespaces.
106    #
107    # TODO: Check if having the different names is really necessary.
108    pidaffix = ''
109    if 'USE_TEST_NAMESPACE' in os.environ:
110        pidaffix = f'-{testname}'
111
112    # Define the available services
113    apache = ApacheService(config=os.path.join(testdir, 'apache.conf'),
114                           pidfile=f'apache2{pidaffix}.pid')
115    backend = ApacheService(config=os.path.join(testdir, 'backend.conf'),
116                            pidfile=f'backend{pidaffix}.pid')
117    ocsp = ApacheService(config=os.path.join(testdir, 'ocsp.conf'),
118                         pidfile=f'ocsp{pidaffix}.pid',
119                         check=check_ocsp_responder)
120    msva = TestService(start=['monkeysphere-validation-agent'],
121                       env={'GNUPGHOME': 'msva.gnupghome',
122                            'MSVA_KEYSERVER_POLICY': 'never'},
123                       condition=lambda: 'USE_MSVA' in os.environ,
124                       check=check_msva)
125
126    # background services: must be ready before the main apache
127    # instance is started
128    bg_services = [backend, ocsp, msva]
129
130    # TODO: check extra requirements (e.g. specific modules)
131
132    # TODO: add hook to modify environment (unless made obsolete by
133    # parameters)
134
135    # If VERBOSE is enabled, log the HTTPD build configuration
136    if 'VERBOSE' in os.environ:
137        apache2 = os.environ.get('APACHE2', 'apache2')
138        subprocess.run([apache2, '-f', f'{srcdir}/base_apache.conf', '-V'],
139                       check=True)
140
141    if 'USE_MSVA' in os.environ:
142        os.environ['MONKEYSPHERE_VALIDATION_AGENT_SOCKET'] = \
143            f'http://127.0.0.1:{os.environ["MSVA_PORT"]}'
144
145    with contextlib.ExitStack() as service_stack:
146        service_stack.enter_context(lockfile('test.lock', nolock='MGS_NETNS_ACTIVE' in os.environ))
147        service_stack.enter_context(ocsp.run())
148        service_stack.enter_context(backend.run())
149        service_stack.enter_context(msva.run())
150
151        # TEST_SERVICE_MAX_WAIT is in milliseconds
152        wait_timeout = \
153            int(os.environ.get('TEST_SERVICE_MAX_WAIT', 10000)) / 1000
154        for s in bg_services:
155            if s.condition():
156                s.wait_ready(timeout=wait_timeout)
157
158        # special case: expected to fail in a few cases
159        try:
160            service_stack.enter_context(apache.run())
161            if os.path.exists(os.path.join(testdir, 'fail.server')):
162                raise TestExpectationFailed(
163                    'Server start did not fail as expected!')
164            apache.wait_ready()
165        except subprocess.CalledProcessError as e:
166            if os.path.exists(os.path.join(testdir, 'fail.server')):
167                print('Apache server failed to start as expected',
168                      file=sys.stderr)
169            else:
170                raise e
171
172        # Set TEST_TARGET for the request. Might be replaced with a
173        # parameter later.
174        if 'TARGET_IP' in os.environ:
175            os.environ['TEST_TARGET'] = os.environ['TARGET_IP']
176        else:
177            os.environ['TEST_TARGET'] = os.environ['TEST_HOST']
178
179        # Run the test connections
180        if plugin.run_connection:
181            plugin.run_connection(testname,
182                                  conn_log=args.log_connection,
183                                  response_log=args.log_responses)
184        else:
185            with open(os.path.join(testdir, 'test.yml'), 'r') as test_conf:
186                run_test_conf(test_conf,
187                              float(os.environ.get('TEST_QUERY_TIMEOUT', 5.0)),
188                              conn_log=args.log_connection,
189                              response_log=args.log_responses)
190
191    # run extra checks the test's hooks.py might define
192    if plugin.post_check:
193        args.log_connection.seek(0)
194        args.log_responses.seek(0)
195        plugin.post_check(conn_log=args.log_connection,
196                          response_log=args.log_responses)
197
198
199
200if __name__ == "__main__":
201    import argparse
202    parser = argparse.ArgumentParser(
203        description='Run a mod_gnutls server test')
204    parser.add_argument('--test-number', type=int,
205                        required=True, help='load YAML test configuration')
206    parser.add_argument('--log-connection', type=argparse.FileType('w+'),
207                        default=temp_logfile(),
208                        help='write connection log to this file')
209    parser.add_argument('--log-responses', type=argparse.FileType('w+'),
210                        default=temp_logfile(),
211                        help='write HTTP responses to this file')
212
213    # enable bash completion if argcomplete is available
214    try:
215        import argcomplete
216        argcomplete.autocomplete(parser)
217    except ImportError:
218        pass
219
220    args = parser.parse_args()
221
222    with contextlib.ExitStack() as stack:
223        stack.enter_context(contextlib.closing(args.log_connection))
224        stack.enter_context(contextlib.closing(args.log_responses))
225        main(args)
Note: See TracBrowser for help on using the repository browser.