source: mod_gnutls/test/mgstest/tests.py @ 0560bb9

proxy-ticket
Last change on this file since 0560bb9 was 0560bb9, checked in by Fiona Klute <fiona.klute@…>, 12 months ago

Add some documentation for the Python test modules

  • Property mode set to 100644
File size: 11.8 KB
Line 
1#!/usr/bin/python3
2
3# Copyright 2019 Fiona Klute
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9#     http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17"""Test objects and support functions for the mod_gnutls test
18suite. The classes defined in this module represent structures in the
19YAML test configuration files.
20
21"""
22
23import re
24import subprocess
25import yaml
26
27from . import TestExpectationFailed
28from .http import HTTPSubprocessConnection
29
30class TestConnection(yaml.YAMLObject):
31    """An HTTP connection in a test. It includes parameters for the
32    transport (currently gnutls-cli only), and the actions
33    (e.g. sending requests) to take using this connection.
34
35    Note that running one TestConnection object may result in multiple
36    sequential network connections, if the transport gets closed in a
37    non-failure way (e.g. following a "Connection: close" request) and
38    there are more actions, or (rarely) if an action requires its own
39    transport.
40
41    """
42    yaml_tag = '!connection'
43
44    def __init__(self, actions, gnutls_params=[], transport='gnutls'):
45        self.gnutls_params = gnutls_params
46        self.actions = actions
47        self.transport = transport
48
49    def __repr__(self):
50        return (f'{self.__class__.__name__!s}'
51                f'(gnutls_params={self.gnutls_params!r}, '
52                f'actions={self.actions!r}, transport={self.transport!r})')
53
54    def run(self, host, port, timeout=5.0):
55        # note: "--logfile" option requires GnuTLS version >= 3.6.7
56        command = ['gnutls-cli', '--logfile=/dev/stderr']
57        for s in self.gnutls_params:
58            command.append('--' + s)
59        command = command + ['-p', str(port), host]
60
61        conn = HTTPSubprocessConnection(command, host, port,
62                                        output_filter=filter_cert_log,
63                                        timeout=timeout)
64
65        try:
66            for act in self.actions:
67                if type(act) is TestRequest:
68                    act.run(conn)
69                elif type(act) is TestRaw10:
70                    act.run(command, timeout)
71                else:
72                    raise TypeError(f'Unsupported action requested: {act!r}')
73        finally:
74            conn.close()
75
76    @classmethod
77    def _from_yaml(cls, loader, node):
78        fields = loader.construct_mapping(node)
79        conn = TestConnection(**fields)
80        return conn
81
82
83
84class TestRequest(yaml.YAMLObject):
85    """Test action that sends an HTTP/1.1 request.
86
87    The path must be specified in the configuration file, all other
88    parameters (method, headers, expected response) have
89    defaults.
90
91    Options for checking the response currently are:
92    * require a specific response status
93    * require the body to exactly match a specific string
94    * require the body to contain all of a list of strings
95
96    """
97    yaml_tag = '!request'
98    def __init__(self, path, method='GET', headers=dict(),
99                 expect=dict(status=200)):
100        self.method = method
101        self.path = path
102        self.headers = headers
103        self.expect = expect
104
105    def __repr__(self):
106        return (f'{self.__class__.__name__!s}(path={self.path!r}, '
107                f'method={self.method!r}, headers={self.headers!r}, '
108                f'expect={self.expect!r})')
109
110    def run(self, conn):
111        try:
112            conn.request(self.method, self.path, headers=self.headers)
113            resp = conn.getresponse()
114        except ConnectionResetError as err:
115            if self.expects_conn_reset():
116                print('connection reset as expected.')
117                return
118            else:
119                raise err
120        body = resp.read().decode()
121        print(format_response(resp, body))
122        self.check_response(resp, body)
123
124    def check_body(self, body):
125        """
126        >>> r1 = TestRequest(path='/test.txt', method='GET', headers={}, expect={'status': 200, 'body': {'exactly': 'test\\n'}})
127        >>> r1.check_body('test\\n')
128        >>> r1.check_body('xyz\\n')
129        Traceback (most recent call last):
130        ...
131        mgstest.TestExpectationFailed: Unexpected body: 'xyz\\n' != 'test\\n'
132        >>> r2 = TestRequest(path='/test.txt', method='GET', headers={}, expect={'status': 200, 'body': {'contains': ['tes', 'est']}})
133        >>> r2.check_body('test\\n')
134        >>> r2.check_body('est\\n')
135        Traceback (most recent call last):
136        ...
137        mgstest.TestExpectationFailed: Unexpected body: 'est\\n' does not contain 'tes'
138        >>> r3 = TestRequest(path='/test.txt', method='GET', headers={}, expect={'status': 200, 'body': {'contains': 'test'}})
139        >>> r3.check_body('test\\n')
140        """
141        if 'exactly' in self.expect['body'] \
142           and body != self.expect['body']['exactly']:
143            raise TestExpectationFailed(
144                f'Unexpected body: {body!r} != '
145                f'{self.expect["body"]["exactly"]!r}')
146        if 'contains' in self.expect['body']:
147            if type(self.expect['body']['contains']) is str:
148                self.expect['body']['contains'] = [
149                    self.expect['body']['contains']]
150            for s in self.expect['body']['contains']:
151                if not s in body:
152                    raise TestExpectationFailed(
153                        f'Unexpected body: {body!r} does not contain '
154                        f'{s!r}')
155
156    def check_response(self, response, body):
157        if self.expects_conn_reset():
158            raise TestExpectationFailed(
159                'Got a response, but connection should have failed!')
160        if response.status != self.expect['status']:
161            raise TestExpectationFailed(
162                f'Unexpected status: {response.status} != '
163                f'{self.expect["status"]}')
164        if 'body' in self.expect:
165            self.check_body(body)
166
167    def expects_conn_reset(self):
168        """Returns True if running this request is expected to fail due to the
169        connection being reset. That usually means the underlying TLS
170        connection failed.
171
172        >>> r1 = TestRequest(path='/test.txt', method='GET', headers={}, expect={'status': 200, 'body': {'contains': 'test'}})
173        >>> r1.expects_conn_reset()
174        False
175        >>> r2 = TestRequest(path='/test.txt', method='GET', headers={}, expect={'reset': True})
176        >>> r2.expects_conn_reset()
177        True
178        """
179        if 'reset' in self.expect:
180            return self.expect['reset']
181        return False
182
183    @classmethod
184    def _from_yaml(cls, loader, node):
185        fields = loader.construct_mapping(node)
186        req = TestRequest(**fields)
187        return req
188
189
190
191class TestRaw10(TestRequest):
192    """Test action that sends a request using a minimal (and likely
193    incomplete) HTTP/1.0 test client for the one test case that
194    strictly requires HTTP/1.0.
195
196    All request parameters (method, path, headers) MUST be specified
197    in the config file. Checks on status and body work the same as for
198    TestRequest.
199
200    """
201    yaml_tag = '!raw10'
202    status_re = re.compile('^HTTP/([\d\.]+) (\d+) (.*)$')
203
204    def __init__(self, method, path, headers, expect):
205        self.method = method
206        self.path = path
207        self.headers = headers
208        self.expect = expect
209
210    def __repr__(self):
211        return (f'{self.__class__.__name__!s}'
212                f'(method={self.method!r}, path={self.path!r}, '
213                f'headers={self.headers!r}, expect={self.expect!r})')
214
215    def run(self, command, timeout=None):
216        req = f'{self.method} {self.path} HTTP/1.0\r\n'
217        for name, value in self.headers.items():
218            req = req + f'{name}: {value}\r\n'
219        req = req + f'\r\n'
220        proc = subprocess.Popen(command, stdout=subprocess.PIPE,
221                                stdin=subprocess.PIPE, close_fds=True,
222                                bufsize=0)
223        try:
224            # Note: errs will be empty because stderr is not captured
225            outs, errs = proc.communicate(input=req.encode(),
226                                          timeout=timeout)
227        except TimeoutExpired:
228            proc.kill()
229            outs, errs = proc.communicate()
230
231        # first line of the received data must be the status
232        status, rest = outs.decode().split('\r\n', maxsplit=1)
233        # headers and body are separated by double newline
234        headers, body = rest.split('\r\n\r\n', maxsplit=1)
235        # log response for debugging
236        print(f'{status}\n{headers}\n\n{body}')
237
238        m = self.status_re.match(status)
239        if m:
240            status_code = int(m.group(2))
241            status_expect = self.expect.get('status')
242            if status_expect and not status_code == status_expect:
243                raise TestExpectationFailed('Unexpected status code: '
244                                            f'{status}, expected '
245                                            f'{status_expect}')
246        else:
247            raise TestExpectationFailed(f'Invalid status line: "{status}"')
248
249        if 'body' in self.expect:
250            self.check_body(body)
251
252
253
254# Override the default constructors. Pyyaml ignores default parameters
255# otherwise.
256yaml.add_constructor('!request', TestRequest._from_yaml, yaml.Loader)
257yaml.add_constructor('!connection', TestConnection._from_yaml, yaml.Loader)
258
259
260
261def filter_cert_log(in_stream, out_stream):
262    """Filter to stop an erroneous gnutls-cli log message.
263
264    This function filters out a log line about loading client
265    certificates that is mistakenly sent to stdout from gnutls-cli. My
266    fix (https://gitlab.com/gnutls/gnutls/merge_requests/1125) has
267    been merged, but buggy binaries will probably be around for a
268    while.
269
270    The filter is meant to run in a multiprocessing.Process or
271    threading.Thread that receives the stdout of gnutls-cli as
272    in_stream, and a connection for further processing as out_stream.
273
274    """
275    import fcntl
276    import os
277    import select
278    # message to filter
279    cert_log = b'Processed 1 client X.509 certificates...\n'
280
281    # Set the input to non-blocking mode
282    fd = in_stream.fileno()
283    fl = fcntl.fcntl(fd, fcntl.F_GETFL)
284    fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
285
286    # The poll object allows waiting for events on non-blocking IO
287    # channels.
288    poller = select.poll()
289    poller.register(fd)
290
291    init_done = False
292    run_loop = True
293    while run_loop:
294        # The returned tuples are file descriptor and event, but
295        # we're only listening on one stream anyway, so we don't
296        # need to check it here.
297        for x, event in poller.poll():
298            # Critical: "event" is a bitwise OR of the POLL* constants
299            if event & select.POLLIN or event & select.POLLPRI:
300                data = in_stream.read()
301                if not init_done:
302                    # If the erroneous log line shows up it's the
303                    # first piece of data we receive. Just copy
304                    # everything after.
305                    init_done = True
306                    if cert_log in data:
307                        data = data.replace(cert_log, b'')
308                out_stream.send(data)
309            if event & select.POLLHUP or event & select.POLLRDHUP:
310                # Stop the loop, but process any other events that
311                # might be in the list returned by poll() first.
312                run_loop = False
313
314    in_stream.close()
315    out_stream.close()
316
317
318
319def format_response(resp, body):
320    s = f'{resp.status} {resp.reason}\n'
321    s = s + '\n'.join(f'{name}: {value}' for name, value in resp.getheaders())
322    s = s + '\n\n' + body
323    return s
Note: See TracBrowser for help on using the repository browser.