source: mod_gnutls/test/https-test-client.py @ 019ab8e

asyncioproxy-ticket
Last change on this file since 019ab8e was 019ab8e, checked in by Fiona Klute <fiona.klute@…>, 17 months ago

https-test-client.py: Catch expected connection errors

  • Property mode set to 100755
File size: 9.2 KB
Line 
1#!/usr/bin/python3
2# PYTHON_ARGCOMPLETE_OK
3
4# Copyright 2019 Fiona Klute
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17
18import socket
19import subprocess
20import yaml
21
22from http.client import HTTPConnection
23from time import sleep
24
25class HTTPSubprocessConnection(HTTPConnection):
26    def __init__(self, command, host, port=None,
27                 timeout=socket._GLOBAL_DEFAULT_TIMEOUT,
28                 blocksize=8192):
29        super(HTTPSubprocessConnection, self).__init__(host, port, timeout,
30                                                       source_address=None,
31                                                       blocksize=blocksize)
32        # "command" must be a list containing binary and command line
33        # parameters
34        self.command = command
35        # This will be the subprocess reference when connected
36        self._sproc = None
37        # The subprocess return code is stored here on close()
38        self.returncode = None
39        # The set_tunnel method of the super class is not supported
40        # (see exception doc)
41        self.set_tunnel = None
42
43    def connect(self):
44        s_local, s_remote = socket.socketpair(socket.AF_UNIX,
45                                              socket.SOCK_STREAM)
46        s_local.settimeout(self.timeout)
47
48        # TODO: Maybe capture stderr?
49        self._sproc = subprocess.Popen(self.command, stdout=s_remote,
50                                       stdin=s_remote, close_fds=True)
51        s_remote.close()
52        self.sock = s_local
53
54    def close(self):
55        super().close()
56        # Wait for the process to stop, send SIGTERM/SIGKILL if
57        # necessary
58        self.returncode = self._sproc.wait(self.timeout)
59        if self.returncode == None:
60            self._sproc.terminate()
61            self.returncode = self._sproc.wait(self.timeout)
62            if self.returncode == None:
63                self._sproc.kill()
64                self.returncode = self._sproc.wait(self.timeout)
65
66
67
68class TestRequest(yaml.YAMLObject):
69    yaml_tag = '!request'
70    def __init__(self, path, method='GET', headers=dict(),
71                 expect=dict(status=200)):
72        self.method = method
73        self.path = path
74        self.headers = headers
75        self.expect = expect
76
77    def __repr__(self):
78        return (f'{self.__class__.__name__!s}(path={self.path!r}, '
79                f'method={self.method!r}, headers={self.headers!r}, '
80                f'expect={self.expect!r})')
81
82    def _check_body(self, body):
83        """
84        >>> r1 = TestRequest(path='/test.txt', method='GET', headers={}, expect={'status': 200, 'body': {'exactly': 'test\\n'}})
85        >>> r1._check_body('test\\n')
86        >>> r1._check_body('xyz\\n')
87        Traceback (most recent call last):
88        ...
89        https-test-client.TestExpectationFailed: Unexpected body: 'xyz\\n' != 'test\\n'
90        >>> r2 = TestRequest(path='/test.txt', method='GET', headers={}, expect={'status': 200, 'body': {'contains': ['tes', 'est']}})
91        >>> r2._check_body('test\\n')
92        >>> r2._check_body('est\\n')
93        Traceback (most recent call last):
94        ...
95        https-test-client.TestExpectationFailed: Unexpected body: 'est\\n' does not contain 'tes'
96        >>> r3 = TestRequest(path='/test.txt', method='GET', headers={}, expect={'status': 200, 'body': {'contains': 'test'}})
97        >>> r3._check_body('test\\n')
98        """
99        if 'exactly' in self.expect['body'] \
100           and body != self.expect['body']['exactly']:
101            raise TestExpectationFailed(
102                f'Unexpected body: {body!r} != '
103                f'{self.expect["body"]["exactly"]!r}')
104        if 'contains' in self.expect['body']:
105            if type(self.expect['body']['contains']) is str:
106                self.expect['body']['contains'] = [
107                    self.expect['body']['contains']]
108            for s in self.expect['body']['contains']:
109                if not s in body:
110                    raise TestExpectationFailed(
111                        f'Unexpected body: {body!r} does not contain '
112                        f'{s!r}')
113
114    def check_response(self, response, body):
115        if self.expects_conn_reset():
116            raise TestExpectationFailed(
117                'Got a response, but connection should have failed!')
118        if response.status != self.expect['status']:
119            raise TestExpectationFailed(
120                f'Unexpected status: {response.status} != '
121                f'{self.expect["status"]}')
122        if 'body' in self.expect:
123            self._check_body(body)
124
125    def expects_conn_reset(self):
126        if 'reset' in self.expect:
127            return self.expect['reset']
128        return False
129
130    @classmethod
131    def _from_yaml(cls, loader, node):
132        fields = loader.construct_mapping(node)
133        req = TestRequest(**fields)
134        return req
135
136class TestConnection(yaml.YAMLObject):
137    yaml_tag = '!connection'
138
139    def __init__(self, actions, gnutls_params=[], protocol='https'):
140        self.gnutls_params = gnutls_params
141        self.actions = actions
142        self.protocol = protocol
143
144    def __repr__(self):
145        return (f'{self.__class__.__name__!s}'
146                f'(gnutls_params={self.gnutls_params!r}, '
147                f'actions={self.actions!r}, protocol={self.protocol!r})')
148
149    @classmethod
150    def _from_yaml(cls, loader, node):
151        fields = loader.construct_mapping(node)
152        conn = TestConnection(**fields)
153        return conn
154
155# Override the default constructors. Pyyaml ignores default parameters
156# otherwise.
157yaml.add_constructor('!request', TestRequest._from_yaml, yaml.Loader)
158yaml.add_constructor('!connection', TestConnection._from_yaml, yaml.Loader)
159
160
161
162class TestExpectationFailed(Exception):
163    """Raise if a test failed. The constructor should be called with a
164    string describing the problem."""
165    pass
166
167
168
169def format_response(resp, body):
170    s = f'{resp.status} {resp.reason}\n'
171    s = s + '\n'.join(f'{name}: {value}' for name, value in resp.getheaders())
172    s = s + '\n\n' + body
173    return s
174
175
176
177if __name__ == "__main__":
178    import argparse
179    parser = argparse.ArgumentParser(
180        description='Send HTTP requests through gnutls-cli',
181        formatter_class=argparse.ArgumentDefaultsHelpFormatter)
182    parser.add_argument('host', nargs='?', help='Access the specified host',
183                        default='localhost')
184    parser.add_argument('--insecure', action='store_true',
185                        help='do not validate the server certificate')
186    parser.add_argument('-p', '--port', type=int,
187                        help='Access the specified port', default='8000')
188    parser.add_argument('--x509cafile', type=str,
189                        help='Use the specified CA to validate the '
190                        'server certificate')
191    parser.add_argument('--test-config', type=argparse.FileType('r'),
192                        help='load YAML test configuration')
193
194    # enable bash completion if argcomplete is available
195    try:
196        import argcomplete
197        argcomplete.autocomplete(parser)
198    except ImportError:
199        pass
200
201    args = parser.parse_args()
202
203    test_conn = None
204    test_actions = None
205
206    if args.test_config:
207        config = yaml.load(args.test_config, Loader=yaml.Loader)
208        if type(config) is TestConnection:
209            test_conn = config
210            print(test_conn)
211            test_actions = test_conn.actions
212    else:
213        # simple default request
214        test_actions = [TestRequest(path='/test.txt',
215                                    expect={'status': 200, 'body': 'test\n'},
216                                    method='GET')]
217
218
219    # note: "--logfile" option requires GnuTLS version >= 3.6.7
220    command = ['gnutls-cli', '--logfile=/dev/stderr']
221    if args.insecure:
222        command.append('--insecure')
223    if args.x509cafile:
224        command.append('--x509cafile')
225        command.append(args.x509cafile)
226    if test_conn != None:
227        for s in test_conn.gnutls_params:
228            command.append('--' + s)
229    command = command + ['-p', str(args.port), args.host]
230
231    conn = HTTPSubprocessConnection(command, args.host, port=args.port,
232                                    timeout=6.0)
233
234    try:
235        for act in test_actions:
236            if type(act) is TestRequest:
237                try:
238                    conn.request(act.method, act.path, headers=act.headers)
239                    resp = conn.getresponse()
240                except ConnectionResetError as err:
241                    if act.expects_conn_reset():
242                        print('connection reset as expected.')
243                        break
244                    else:
245                        raise err
246                body = resp.read().decode()
247                print(format_response(resp, body))
248                act.check_response(resp, body)
249            else:
250                raise TypeError(f'Unsupported action requested: {act!r}')
251    finally:
252        conn.close()
Note: See TracBrowser for help on using the repository browser.