source: mod_gnutls/test/https-test-client.py @ a4e136a

asyncioproxy-ticket
Last change on this file since a4e136a was a4e136a, checked in by Fiona Klute <fiona.klute@…>, 17 months ago

https-test-client.py: Allow overriding headers

  • Property mode set to 100755
File size: 7.3 KB
Line 
1#!/usr/bin/python3
2# PYTHON_ARGCOMPLETE_OK
3
4# Copyright 2019 Fiona Klute
5#
6# Licensed under the Apache License, Version 2.0 (the "License");
7# you may not use this file except in compliance with the License.
8# You may obtain a copy of the License at
9#
10#     http://www.apache.org/licenses/LICENSE-2.0
11#
12# Unless required by applicable law or agreed to in writing, software
13# distributed under the License is distributed on an "AS IS" BASIS,
14# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15# See the License for the specific language governing permissions and
16# limitations under the License.
17
18import socket
19import subprocess
20import yaml
21
22from http.client import HTTPConnection
23from time import sleep
24
25class HTTPSubprocessConnection(HTTPConnection):
26    def __init__(self, command, host, port=None,
27                 timeout=socket._GLOBAL_DEFAULT_TIMEOUT,
28                 blocksize=8192):
29        super(HTTPSubprocessConnection, self).__init__(host, port, timeout,
30                                                       source_address=None,
31                                                       blocksize=blocksize)
32        # "command" must be a list containing binary and command line
33        # parameters
34        self.command = command
35        # This will be the subprocess reference when connected
36        self._sproc = None
37        # The subprocess return code is stored here on close()
38        self.returncode = None
39        # The set_tunnel method of the super class is not supported
40        # (see exception doc)
41        self.set_tunnel = None
42
43    def connect(self):
44        s_local, s_remote = socket.socketpair(socket.AF_UNIX,
45                                              socket.SOCK_STREAM)
46        s_local.settimeout(self.timeout)
47
48        # TODO: Maybe capture stderr?
49        self._sproc = subprocess.Popen(self.command, stdout=s_remote,
50                                       stdin=s_remote, close_fds=True)
51        s_remote.close()
52        self.sock = s_local
53
54    def close(self):
55        super().close()
56        # Wait for the process to stop, send SIGTERM/SIGKILL if
57        # necessary
58        self.returncode = self._sproc.wait(self.timeout)
59        if self.returncode == None:
60            self._sproc.terminate()
61            self.returncode = self._sproc.wait(self.timeout)
62            if self.returncode == None:
63                self._sproc.kill()
64                self.returncode = self._sproc.wait(self.timeout)
65
66
67
68class TestRequest(yaml.YAMLObject):
69    yaml_tag = '!request'
70    def __init__(self, path, method='GET', headers=dict(),
71                 expect=dict(status=200)):
72        self.method = method
73        self.path = path
74        self.headers = headers
75        self.expect = expect
76
77    def __repr__(self):
78        return (f'{self.__class__.__name__!s}(path={self.path!r}, '
79                f'method={self.method!r}), headers={self.headers!r}, '
80                f'expect={self.expect!r}')
81
82    def check_response(self, response, body):
83        if response.status != self.expect['status']:
84            raise TestExpectationFailed(
85                f'Unexpected status: {response.status} != '
86                f'{self.expect["status"]}')
87        if 'body' in self.expect and self.expect['body'] != body:
88            raise TestExpectationFailed(
89                f'Unexpected body: {body!r} != {self.expect["body"]!r}')
90
91    @classmethod
92    def _from_yaml(cls, loader, node):
93        fields = loader.construct_mapping(node)
94        req = TestRequest(**fields)
95        return req
96
97class TestConnection(yaml.YAMLObject):
98    yaml_tag = '!connection'
99
100    def __init__(self, actions, gnutls_params=[], protocol='https'):
101        self.gnutls_params = gnutls_params
102        self.actions = actions
103        self.protocol = protocol
104
105    def __repr__(self):
106        return (f'{self.__class__.__name__!s}'
107                f'(gnutls_params={self.gnutls_params!r}, '
108                f'actions={self.actions!r}, protocol={self.protocol!r})')
109
110    @classmethod
111    def _from_yaml(cls, loader, node):
112        fields = loader.construct_mapping(node)
113        conn = TestConnection(**fields)
114        return conn
115
116# Override the default constructors. Pyyaml ignores default parameters
117# otherwise.
118yaml.add_constructor('!request', TestRequest._from_yaml, yaml.Loader)
119yaml.add_constructor('!connection', TestConnection._from_yaml, yaml.Loader)
120
121
122
123class TestExpectationFailed(Exception):
124    """Raise if a test failed. The constructor should be called with a
125    string describing the problem."""
126    pass
127
128
129
130def format_response(resp, body):
131    s = f'{resp.status} {resp.reason}\n'
132    s = s + '\n'.join(f'{name}: {value}' for name, value in resp.getheaders())
133    s = s + '\n\n' + body
134    return s
135
136
137
138if __name__ == "__main__":
139    import argparse
140    parser = argparse.ArgumentParser(
141        description='Send HTTP requests through gnutls-cli',
142        formatter_class=argparse.ArgumentDefaultsHelpFormatter)
143    parser.add_argument('host', nargs='?', help='Access the specified host',
144                        default='localhost')
145    parser.add_argument('--insecure', action='store_true',
146                        help='do not validate the server certificate')
147    parser.add_argument('-p', '--port', type=int,
148                        help='Access the specified port', default='8000')
149    parser.add_argument('--x509cafile', type=str,
150                        help='Use the specified CA to validate the '
151                        'server certificate')
152    parser.add_argument('--test-config', type=argparse.FileType('r'),
153                        help='load YAML test configuration')
154
155    # enable bash completion if argcomplete is available
156    try:
157        import argcomplete
158        argcomplete.autocomplete(parser)
159    except ImportError:
160        pass
161
162    args = parser.parse_args()
163
164    test_conn = None
165    test_actions = None
166
167    if args.test_config:
168        config = yaml.load(args.test_config, Loader=yaml.Loader)
169        if type(config) is TestConnection:
170            test_conn = config
171            print(test_conn)
172            test_actions = test_conn.actions
173    else:
174        # simple default request
175        test_actions = [TestRequest(path='/test.txt',
176                                    expect={'status': 200, 'body': 'test\n'},
177                                    method='GET')]
178
179
180    # note: "--logfile" option requires GnuTLS version >= 3.6.7
181    command = ['gnutls-cli', '--logfile=/dev/stderr']
182    if args.insecure:
183        command.append('--insecure')
184    if args.x509cafile:
185        command.append('--x509cafile')
186        command.append(args.x509cafile)
187    if test_conn != None:
188        for s in test_conn.gnutls_params:
189            command.append('--' + s)
190    command = command + ['-p', str(args.port), args.host]
191
192    conn = HTTPSubprocessConnection(command, args.host, port=args.port,
193                                    timeout=6.0)
194
195    try:
196        for act in test_actions:
197            if type(act) is TestRequest:
198                # Add headers={'Host': 'test.host'} to provoke "421
199                # Misdirected
200                conn.request(act.method, act.path, headers=act.headers)
201                resp = conn.getresponse()
202                body = resp.read().decode()
203                print(format_response(resp, body))
204                act.check_response(resp, body)
205            else:
206                raise TypeError(f'Unsupported action requested: {act!r}')
207    finally:
208        conn.close()
Note: See TracBrowser for help on using the repository browser.