source: mod_gnutls/test/runtest.py @ ad2690b

asyncio
Last change on this file since ad2690b was ad2690b, checked in by Fiona Klute <fiona.klute@…>, 2 years ago

Fix flake8 warnings in runtest.py

  • Property mode set to 100755
File size: 9.5 KB
Line 
1#!/usr/bin/python3
2# PYTHON_ARGCOMPLETE_OK
3
4# Copyright 2019-2020 Fiona Klute
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17
18import contextlib
19import os
20import os.path
21import subprocess
22import sys
23import tempfile
24import yaml
25from unittest import SkipTest
26
27import mgstest.hooks
28import mgstest.valgrind
29from mgstest import lockfile, TestExpectationFailed
30from mgstest.services import ApacheService, TestService
31from mgstest.tests import run_test_conf
32
33
34def find_testdir(number, dir):
35    """Find the configuration directory for a test based on its
36    number. The given directory must contain exactly one directory
37    with a name matching "NUMBER_*", otherwise a LookupError is
38    raised.
39
40    """
41    with os.scandir(dir) as it:
42        found = None
43        for entry in it:
44            if entry.is_dir():
45                num = int(entry.name.split('_', maxsplit=1)[0])
46                if number == num:
47                    if found:
48                        # duplicate numbers are an error
49                        raise LookupError('Multiple directories found for '
50                                          f'test number {number}: '
51                                          f'{found.name} and {entry.name}')
52                    else:
53                        found = entry
54        if found is None:
55            raise LookupError('No test directory found for test number '
56                              f'{number}!')
57        else:
58            return (found.path, found.name)
59
60
61def temp_logfile():
62    return tempfile.SpooledTemporaryFile(max_size=4096, mode='w+',
63                                         prefix='mod_gnutls', suffix=".log")
64
65
66def check_ocsp_responder():
67    # Check if OCSP responder works
68    issuer_cert = 'authority/x509.pem'
69    check_cert = 'authority/server/x509.pem'
70    command = ['ocsptool', '--ask', '--nonce',
71               '--load-issuer', issuer_cert,
72               '--load-cert', check_cert]
73    return subprocess.run(command).returncode == 0
74
75
76def check_msva():
77    # Check if MSVA is up
78    cert_file = 'authority/client/x509.pem'
79    uid_file = 'authority/client/uid'
80    with open(uid_file, 'r') as file:
81        uid = file.read().strip()
82        command = ['msva-query-agent', 'https', uid, 'x509pem', 'client']
83        with open(cert_file, 'r') as cert:
84            return subprocess.run(command, stdin=cert).returncode == 0
85
86
87def main(args):
88    # The Automake environment always provides srcdir, the default is
89    # for manual use.
90    srcdir = os.path.realpath(os.environ.get('srcdir', '.'))
91    # ensure environment srcdir is absolute
92    os.environ['srcdir'] = srcdir
93
94    # Find the configuration directory for the test in
95    # ${srcdir}/tests/, based on the test number.
96    testdir, testname = find_testdir(args.test_number,
97                                     os.path.join(srcdir, 'tests'))
98    print(f'Found test {testname}, test dir is {testdir}')
99    os.environ['TEST_NAME'] = testname
100
101    # Load test config
102    try:
103        with open(os.path.join(testdir, 'test.yaml'), 'r') as conf_file:
104            test_conf = yaml.load(conf_file, Loader=yaml.Loader)
105    except FileNotFoundError:
106        test_conf = None
107
108    # Load test case hooks (if any)
109    plugin_path = os.path.join(testdir, 'hooks.py')
110    plugin = mgstest.hooks.load_hooks_plugin(plugin_path)
111
112    # PID file name varies depending on whether we're using
113    # namespaces.
114    #
115    # TODO: Check if having the different names is really necessary.
116    pidaffix = ''
117    if 'USE_TEST_NAMESPACE' in os.environ:
118        pidaffix = f'-{testname}'
119
120    valgrind_log = None
121    if args.valgrind:
122        valgrind_log = os.path.join('logs', f'valgrind-{testname}.log')
123
124    # Define the available services
125    apache = ApacheService(config=os.path.join(testdir, 'apache.conf'),
126                           pidfile=f'apache2{pidaffix}.pid',
127                           valgrind_log=valgrind_log,
128                           valgrind_suppress=args.valgrind_suppressions)
129    backend = ApacheService(config=os.path.join(testdir, 'backend.conf'),
130                            pidfile=f'backend{pidaffix}.pid')
131    ocsp = ApacheService(config=os.path.join(testdir, 'ocsp.conf'),
132                         pidfile=f'ocsp{pidaffix}.pid',
133                         check=check_ocsp_responder)
134    msva = TestService(start=['monkeysphere-validation-agent'],
135                       env={'GNUPGHOME': 'msva.gnupghome',
136                            'MSVA_KEYSERVER_POLICY': 'never'},
137                       condition=lambda: 'USE_MSVA' in os.environ,
138                       check=check_msva)
139
140    # background services: must be ready before the main apache
141    # instance is started
142    bg_services = [backend, ocsp, msva]
143
144    # If VERBOSE is enabled, log the HTTPD build configuration
145    if 'VERBOSE' in os.environ:
146        apache2 = os.environ.get('APACHE2', 'apache2')
147        subprocess.run([apache2, '-f', f'{srcdir}/base_apache.conf', '-V'],
148                       check=True)
149
150    # This hook may modify the environment as needed for the test.
151    cleanup_callback = None
152    try:
153        if plugin.prepare_env:
154            cleanup_callback = plugin.prepare_env()
155    except SkipTest as skip:
156        print(f'Skipping: {skip!s}')
157        sys.exit(77)
158
159    if 'USE_MSVA' in os.environ:
160        os.environ['MONKEYSPHERE_VALIDATION_AGENT_SOCKET'] = \
161            f'http://127.0.0.1:{os.environ["MSVA_PORT"]}'
162
163    with contextlib.ExitStack() as service_stack:
164        if cleanup_callback:
165            service_stack.callback(cleanup_callback)
166        service_stack.enter_context(
167            lockfile('test.lock', nolock='MGS_NETNS_ACTIVE' in os.environ))
168        service_stack.enter_context(ocsp.run())
169        service_stack.enter_context(backend.run())
170        service_stack.enter_context(msva.run())
171
172        # TEST_SERVICE_MAX_WAIT is in milliseconds
173        wait_timeout = \
174            int(os.environ.get('TEST_SERVICE_MAX_WAIT', 10000)) / 1000
175        for s in bg_services:
176            if s.condition():
177                s.wait_ready(timeout=wait_timeout)
178
179        # special case: expected to fail in a few cases
180        service_stack.enter_context(apache.run())
181        failed = apache.wait_ready()
182        if os.path.exists(os.path.join(testdir, 'fail.server')):
183            if failed:
184                print('Apache server failed to start as expected',
185                      file=sys.stderr)
186            else:
187                raise TestExpectationFailed(
188                    'Server start did not fail as expected!')
189
190        # Set TEST_TARGET for the request. Might be replaced with a
191        # parameter later.
192        if 'TARGET_IP' in os.environ:
193            os.environ['TEST_TARGET'] = os.environ['TARGET_IP']
194        else:
195            os.environ['TEST_TARGET'] = os.environ['TEST_HOST']
196
197        # Run the test connections
198        if plugin.run_connection:
199            plugin.run_connection(testname,
200                                  conn_log=args.log_connection,
201                                  response_log=args.log_responses)
202        else:
203            run_test_conf(test_conf,
204                          float(os.environ.get('TEST_QUERY_TIMEOUT', 5.0)),
205                          conn_log=args.log_connection,
206                          response_log=args.log_responses)
207
208    # run extra checks the test's hooks.py might define
209    if plugin.post_check:
210        args.log_connection.seek(0)
211        args.log_responses.seek(0)
212        plugin.post_check(conn_log=args.log_connection,
213                          response_log=args.log_responses)
214
215    if valgrind_log:
216        with open(valgrind_log) as log:
217            errors = mgstest.valgrind.error_summary(log)
218            print(f'Valgrind summary: {errors[0]} errors, '
219                  f'{errors[1]} suppressed')
220            if errors[0] > 0:
221                sys.exit(ord('V'))
222
223
224if __name__ == "__main__":
225    import argparse
226    parser = argparse.ArgumentParser(
227        description='Run a mod_gnutls server test')
228    parser.add_argument('--test-number', type=int,
229                        required=True, help='load YAML test configuration')
230    parser.add_argument('--log-connection', type=argparse.FileType('w+'),
231                        default=temp_logfile(),
232                        help='write connection log to this file')
233    parser.add_argument('--log-responses', type=argparse.FileType('w+'),
234                        default=temp_logfile(),
235                        help='write HTTP responses to this file')
236    parser.add_argument('--valgrind', action='store_true',
237                        help='run primary Apache instance with Valgrind')
238    parser.add_argument('--valgrind-suppressions', action='append',
239                        default=[],
240                        help='use Valgrind suppressions file')
241
242    # enable bash completion if argcomplete is available
243    try:
244        import argcomplete
245        argcomplete.autocomplete(parser)
246    except ImportError:
247        pass
248
249    args = parser.parse_args()
250
251    with contextlib.ExitStack() as stack:
252        stack.enter_context(contextlib.closing(args.log_connection))
253        stack.enter_context(contextlib.closing(args.log_responses))
254        main(args)
Note: See TracBrowser for help on using the repository browser.