source: mod_gnutls/test/tests/06_invalid_plaintext/hooks.py @ 83b5bf6

asynciomain
Last change on this file since 83b5bf6 was 5357109, checked in by Fiona Klute <fiona.klute@…>, 2 years ago

Fix flake8 warnings in test hooks.py

  • Property mode set to 100644
File size: 4.2 KB
Line 
1import contextlib
2import os
3import re
4import socket
5import ssl
6import struct
7
8from urllib.request import urlopen
9
10CRLF = b'\r\n\r\n'
11
12
13class TLSRecord:
14    header = struct.Struct('!BHH')
15
16    def __init__(self, data):
17        self.type, self.legacy_proto, self.length = \
18            self.header.unpack(data[:5])
19        self.data = data[5:]
20        if len(self.data) != self.length:
21            raise ValueError('Actual data length does not match header!')
22
23    def __repr__(self):
24        return f'<{__name__}.{self.__class__.__name__}, type: {self.type}>'
25
26    @property
27    def is_alert(self):
28        return self.type == 21
29
30    @property
31    def is_app_data(self):
32        return self.type == 23
33
34
35def test_immediate_plaintext(host, port, req):
36    """Send plaintext to the HTTPS socket"""
37    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
38        s.connect((host, port))
39        s.sendall(req)
40        data = s.recv(1024)
41    print(f'Received: {data!r}')
42    record = TLSRecord(data)
43    # Expect an unencrypted alert
44    assert(record.is_alert)
45    assert(record.length == 2)
46
47
48def test_plaintext_after_https(host, port, req, context):
49    """Send an HTTPS request and then plaintext on the same TCP connection"""
50    with contextlib.ExitStack() as stack:
51        s = stack.enter_context(
52            socket.socket(socket.AF_INET, socket.SOCK_STREAM))
53        s.connect((host, port))
54
55        # Duplicate s so we can still use it.
56        tls_sock = stack.enter_context(
57            context.wrap_socket(s.dup(), server_hostname='localhost',
58                                do_handshake_on_connect=False,
59                                suppress_ragged_eofs=False))
60        tls_sock.do_handshake()
61
62        # Send request
63        tls_sock.sendall(req)
64
65        # Read header
66        buf = bytearray(2048)
67        pos = 0
68        while CRLF not in buf:
69            received = tls_sock.recv_into(memoryview(buf)[pos:])
70            # If we get 0 it means the connection ended before the
71            # header was complete.
72            assert(received > 0)
73            pos += received
74        print(f'Received HTTPS header: {bytes(memoryview(buf)[:pos])!r}')
75        data_start = buf.index(CRLF) + len(CRLF)
76
77        # Read body
78        m = re.search(rb'\r\nContent-Length: (\d+)\r\n', buf)
79        assert(m is not None)
80        clen = int(m.group(1))
81        while pos < (data_start + clen):
82            received = tls_sock.recv_into(memoryview(buf)[pos:])
83            # If we get 0 it means the connection ended before the
84            # body was complete.
85            assert(received > 0)
86            pos += received
87        body_data = bytes(memoryview(buf)[data_start:pos])
88        print(f'Received HTTPS data: {body_data!r}')
89        assert(body_data == b'test\n')
90
91        print('Sending plaintext request')
92        s.sendall(req)
93        # Peek read so the TLS socket can also get the alert.
94        data = s.recv(1024, socket.MSG_PEEK)
95        print(f'Received: {data!r}')
96        record = TLSRecord(data)
97        # Expect application data (TLS 1.3 encrypted alert, hopefully)
98        assert(record.is_app_data)
99        assert(record.length > 2)
100
101        tls_sock.sendall(req)
102        data = tls_sock.recv(clen)
103        print(f'Received TLS data: {data!r}')
104        assert(len(data) == 0)
105        print('Connection has been closed as expected.')
106
107
108def run_connection(testname, conn_log, response_log):
109    """Inject unencrypted requests into TCP connections."""
110
111    host = os.environ['TEST_HOST']
112    port = int(os.environ['TEST_PORT'])
113    http_req = f'GET /test.txt HTTP/1.1\r\nHost: {host}\r\n\r\n'.encode()
114
115    context = ssl.SSLContext()
116    context.load_verify_locations(cafile='authority/x509.pem')
117    context.verify_mode = ssl.CERT_REQUIRED
118    context.check_hostname = True
119
120    print(test_immediate_plaintext.__doc__)
121    test_immediate_plaintext(host, port, http_req)
122    print()
123
124    print(test_plaintext_after_https.__doc__)
125    test_plaintext_after_https(host, port, http_req, context)
126    print()
127
128    print('Send a good HTTPS request, and expect it to work')
129    with urlopen(f'https://{host}:{port}/test.txt', context=context) as f:
130        print(f.read().decode())
131
132
133if __name__ == '__main__':
134    run_connection(None, None, None)
Note: See TracBrowser for help on using the repository browser.