source: mod_gnutls/test/runtest.py @ b457e67

asyncioproxy-ticket
Last change on this file since b457e67 was b457e67, checked in by Fiona Klute <fiona.klute@…>, 19 months ago

Open log files through argparse

The with block around main ensures they get closed even in case of
exceptions.

  • Property mode set to 100755
File size: 8.5 KB
Line 
1#!/usr/bin/python3
2# PYTHON_ARGCOMPLETE_OK
3
4# Copyright 2019 Fiona Klute
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17
18import contextlib
19import os
20import os.path
21import subprocess
22import sys
23
24import mgstest.hooks
25from mgstest import lockfile, TestExpectationFailed
26from mgstest.services import ApacheService, TestService
27from mgstest.tests import run_test_conf
28
29
30
31def find_testdir(number, dir):
32    """Find the configuration directory for a test based on its
33    number. The given directory must contain exactly one directory
34    with a name matching "NUMBER_*", otherwise a LookupError is
35    raised.
36
37    """
38    with os.scandir(dir) as it:
39        found = None
40        for entry in it:
41            if entry.is_dir():
42                num = int(entry.name.split('_', maxsplit=1)[0])
43                if number == num:
44                    if found:
45                        # duplicate numbers are an error
46                        raise LookupError('Multiple directories found for '
47                                          f'test number {number}: '
48                                          f'{found.name} and {entry.name}')
49                    else:
50                        found = entry
51        if found == None:
52            raise LookupError('No test directory found for test number '
53                              f'{number}!')
54        else:
55            return (found.path, found.name)
56
57
58
59def check_ocsp_responder():
60    # Check if OCSP responder works
61    issuer_cert = 'authority/x509.pem'
62    check_cert = 'authority/server/x509.pem'
63    command = ['ocsptool', '--ask', '--nonce',
64               '--load-issuer', issuer_cert,
65               '--load-cert', check_cert]
66    return subprocess.run(command).returncode == 0
67
68def check_msva():
69    # Check if MSVA is up
70    cert_file = 'authority/client/x509.pem'
71    uid_file = 'authority/client/uid'
72    with open(uid_file, 'r') as file:
73        uid = file.read().strip()
74        command = ['msva-query-agent', 'https', uid, 'x509pem', 'client']
75        with open(cert_file, 'r') as cert:
76            return subprocess.run(command, stdin=cert).returncode == 0
77
78
79
80def main(args):
81    # The Automake environment always provides srcdir, the default is
82    # for manual use.
83    srcdir = os.path.realpath(os.environ.get('srcdir', '.'))
84    # ensure environment srcdir is absolute
85    os.environ['srcdir'] = srcdir
86
87    # Find the configuration directory for the test in
88    # ${srcdir}/tests/, based on the test number.
89    testdir, testname = find_testdir(args.test_number,
90                                     os.path.join(srcdir, 'tests'))
91    print(f'Found test {testname}, test dir is {testdir}')
92    os.environ['TEST_NAME'] = testname
93
94    # Load test case hooks (if any)
95    plugin_path = os.path.join(testdir, 'hooks.py')
96    plugin = mgstest.hooks.load_hooks_plugin(plugin_path)
97
98    # PID file name varies depending on whether we're using
99    # namespaces.
100    #
101    # TODO: Check if having the different names is really necessary.
102    pidaffix = ''
103    if 'USE_TEST_NAMESPACE' in os.environ:
104        pidaffix = f'-{testname}'
105
106    # Define the available services
107    apache = ApacheService(config=os.path.join(testdir, 'apache.conf'),
108                           pidfile=f'apache2{pidaffix}.pid')
109    backend = ApacheService(config=os.path.join(testdir, 'backend.conf'),
110                            pidfile=f'backend{pidaffix}.pid')
111    ocsp = ApacheService(config=os.path.join(testdir, 'ocsp.conf'),
112                         pidfile=f'ocsp{pidaffix}.pid',
113                         check=check_ocsp_responder)
114    msva = TestService(start=['monkeysphere-validation-agent'],
115                       env={'GNUPGHOME': 'msva.gnupghome',
116                            'MSVA_KEYSERVER_POLICY': 'never'},
117                       condition=lambda: 'USE_MSVA' in os.environ,
118                       check=check_msva)
119
120    # background services: must be ready before the main apache
121    # instance is started
122    bg_services = [backend, ocsp, msva]
123
124    # TODO: check extra requirements (e.g. specific modules)
125
126    # TODO: add hook to modify environment (unless made obsolete by
127    # parameters)
128
129    # If VERBOSE is enabled, log the HTTPD build configuration
130    if 'VERBOSE' in os.environ:
131        apache2 = os.environ.get('APACHE2', 'apache2')
132        subprocess.run([apache2, '-f', f'{srcdir}/base_apache.conf', '-V'],
133                       check=True)
134
135    if 'USE_MSVA' in os.environ:
136        os.environ['MONKEYSPHERE_VALIDATION_AGENT_SOCKET'] = \
137            f'http://127.0.0.1:{os.environ["MSVA_PORT"]}'
138
139    with contextlib.ExitStack() as service_stack:
140        service_stack.enter_context(lockfile('test.lock', nolock='MGS_NETNS_ACTIVE' in os.environ))
141        service_stack.enter_context(ocsp.run())
142        service_stack.enter_context(backend.run())
143        service_stack.enter_context(msva.run())
144
145        # TEST_SERVICE_MAX_WAIT is in milliseconds
146        wait_timeout = \
147            int(os.environ.get('TEST_SERVICE_MAX_WAIT', 10000)) / 1000
148        for s in bg_services:
149            if s.condition():
150                s.wait_ready(timeout=wait_timeout)
151
152        # special case: expected to fail in a few cases
153        try:
154            service_stack.enter_context(apache.run())
155            if os.path.exists(os.path.join(testdir, 'fail.server')):
156                raise TestExpectationFailed(
157                    'Server start did not fail as expected!')
158            apache.wait_ready()
159        except subprocess.CalledProcessError as e:
160            if os.path.exists(os.path.join(testdir, 'fail.server')):
161                print('Apache server failed to start as expected',
162                      file=sys.stderr)
163            else:
164                raise e
165
166        # Set TEST_TARGET for the request. Might be replaced with a
167        # parameter later.
168        if 'TARGET_IP' in os.environ:
169            os.environ['TEST_TARGET'] = os.environ['TARGET_IP']
170        else:
171            os.environ['TEST_TARGET'] = os.environ['TEST_HOST']
172
173        # Run the test connections
174        with contextlib.ExitStack() as stack:
175            if plugin.run_connection:
176                plugin.run_connection(testname,
177                                      conn_log=args.log_connection,
178                                      response_log=args.log_responses)
179            else:
180                test_conf = stack.enter_context(
181                    open(os.path.join(testdir, 'test.yml'), 'r'))
182                run_test_conf(test_conf,
183                              float(os.environ.get('TEST_QUERY_TIMEOUT', 5.0)),
184                              conn_log=args.log_connection,
185                              response_log=args.log_responses)
186
187    # run extra checks the test's hooks.py might define
188    if plugin.post_check:
189        if args.log_connection:
190            args.log_connection.seek(0)
191        if args.log_responses:
192            args.log_responses.seek(0)
193        plugin.post_check(conn_log=args.log_connection,
194                          response_log=args.log_responses)
195
196
197
198if __name__ == "__main__":
199    import argparse
200    parser = argparse.ArgumentParser(
201        description='Run a mod_gnutls server test')
202    parser.add_argument('--test-number', type=int,
203                        required=True, help='load YAML test configuration')
204    # TODO: The log files should be created as temporary
205    # files if needed by the plugin but not configured.
206    parser.add_argument('--log-connection', type=argparse.FileType('w+'),
207                        default=None,
208                        help='write connection log to this file')
209    parser.add_argument('--log-responses', type=argparse.FileType('w+'),
210                        default=None,
211                        help='write HTTP responses to this file')
212
213    # enable bash completion if argcomplete is available
214    try:
215        import argcomplete
216        argcomplete.autocomplete(parser)
217    except ImportError:
218        pass
219
220    args = parser.parse_args()
221
222    with contextlib.ExitStack() as stack:
223        if args.log_connection:
224            stack.enter_context(contextlib.closing(args.log_connection))
225        if args.log_responses:
226            stack.enter_context(contextlib.closing(args.log_responses))
227        main(args)
Note: See TracBrowser for help on using the repository browser.