Changeset d5f5356 in mod_gnutls


Ignore:
Timestamp:
Dec 1, 2019, 8:54:37 AM (3 years ago)
Author:
Fiona Klute <fiona.klute@…>
Branches:
asyncio, main, master, proxy-ticket
Children:
a4e136a
Parents:
618ee14
Message:

https-test-client.py: Support YAML test configuration

Location:
test
Files:
1 added
1 edited

Legend:

Unmodified
Added
Removed
  • test/https-test-client.py

    r618ee14 rd5f5356  
    1818import socket
    1919import subprocess
     20import yaml
    2021
    2122from http.client import HTTPConnection
     
    6566
    6667
    67 def format_response(resp):
    68     print('{} {}'.format(resp.status, resp.reason))
    69     for name, value in resp.getheaders():
    70         print('{}: {}'.format(name, value))
    71     print()
    72     print(resp.read().decode())
     68class TestRequest(yaml.YAMLObject):
     69    yaml_tag = '!request'
     70    def __init__(self, path, expect=dict(status=200), method='GET'):
     71        self.method = method
     72        self.path = path
     73        self.expect = expect
     74
     75    def __repr__(self):
     76        return (f'{self.__class__.__name__!s}(path={self.path!r}, '
     77                f'expect={self.expect!r}, method={self.method!r})')
     78
     79    def check_response(self, response, body):
     80        if response.status != self.expect['status']:
     81            raise TestExpectationFailed(
     82                f'Unexpected status: {response.status} != '
     83                f'{self.expect["status"]}')
     84        if 'body' in self.expect and self.expect['body'] != body:
     85            raise TestExpectationFailed(
     86                f'Unexpected body: {body!r} != {self.expect["body"]!r}')
     87
     88    @classmethod
     89    def _from_yaml(cls, loader, node):
     90        fields = loader.construct_mapping(node)
     91        req = TestRequest(**fields)
     92        return req
     93
     94class TestConnection(yaml.YAMLObject):
     95    yaml_tag = '!connection'
     96
     97    def __init__(self, actions, gnutls_params=[], protocol='https'):
     98        self.gnutls_params = gnutls_params
     99        self.actions = actions
     100        self.protocol = protocol
     101
     102    def __repr__(self):
     103        return (f'{self.__class__.__name__!s}'
     104                f'(gnutls_params={self.gnutls_params!r}, '
     105                f'actions={self.actions!r}, protocol={self.protocol!r})')
     106
     107    @classmethod
     108    def _from_yaml(cls, loader, node):
     109        fields = loader.construct_mapping(node)
     110        conn = TestConnection(**fields)
     111        return conn
     112
     113# Override the default constructors. Pyyaml ignores default parameters
     114# otherwise.
     115yaml.add_constructor('!request', TestRequest._from_yaml, yaml.Loader)
     116yaml.add_constructor('!connection', TestConnection._from_yaml, yaml.Loader)
     117
     118
     119
     120class TestExpectationFailed(Exception):
     121    """Raise if a test failed. The constructor should be called with a
     122    string describing the problem."""
     123    pass
     124
     125
     126
     127def format_response(resp, body):
     128    s = f'{resp.status} {resp.reason}\n'
     129    s = s + '\n'.join(f'{name}: {value}' for name, value in resp.getheaders())
     130    s = s + '\n\n' + body
     131    return s
    73132
    74133
     
    88147                        help='Use the specified CA to validate the '
    89148                        'server certificate')
     149    parser.add_argument('--test-config', type=argparse.FileType('r'),
     150                        help='load YAML test configuration')
    90151
    91152    # enable bash completion if argcomplete is available
     
    97158
    98159    args = parser.parse_args()
     160
     161    test_conn = None
     162    test_actions = None
     163
     164    if args.test_config:
     165        config = yaml.load(args.test_config, Loader=yaml.Loader)
     166        if type(config) is TestConnection:
     167            test_conn = config
     168            print(test_conn)
     169            test_actions = test_conn.actions
     170    else:
     171        # simple default request
     172        test_actions = [TestRequest(path='/test.txt',
     173                                    expect={'status': 200, 'body': 'test\n'},
     174                                    method='GET')]
     175
    99176
    100177    # note: "--logfile" option requires GnuTLS version >= 3.6.7
     
    105182        command.append('--x509cafile')
    106183        command.append(args.x509cafile)
     184    if test_conn != None:
     185        for s in test_conn.gnutls_params:
     186            command.append('--' + s)
    107187    command = command + ['-p', str(args.port), args.host]
    108188
    109189    conn = HTTPSubprocessConnection(command, args.host, port=args.port,
    110190                                    timeout=6.0)
    111     # Maybe call connect() here to detect handshake errors before
    112     # sending the request?
    113 
    114     # Add headers={'Host': 'test.host'} to provoke "421 Misdirected
    115     # Request"
    116     conn.request('GET', '/')
    117     resp = conn.getresponse()
    118     format_response(resp)
    119 
    120     # This could be used to test keepalive behavior
    121     #sleep(2)
    122 
    123     conn.request('GET', '/test.txt')
    124     resp = conn.getresponse()
    125     format_response(resp)
    126 
    127     conn.close()
    128     exit(conn.returncode)
     191
     192    try:
     193        for act in test_actions:
     194            if type(act) is TestRequest:
     195                # Add headers={'Host': 'test.host'} to provoke "421
     196                # Misdirected
     197                conn.request(act.method, act.path)
     198                resp = conn.getresponse()
     199                body = resp.read().decode()
     200                print(format_response(resp, body))
     201                act.check_response(resp, body)
     202            else:
     203                raise TypeError(f'Unsupported action requested: {act!r}')
     204    finally:
     205        conn.close()
Note: See TracChangeset for help on using the changeset viewer.