source: mod_gnutls/test/tests/06_invalid_plaintext/hooks.py @ c9e720f

asyncio
Last change on this file since c9e720f was c9e720f, checked in by Fiona Klute <fiona.klute@…>, 12 months ago

Test what happens if the HTTPS server receives plaintext

The new test tries to

  • send an unencrypted HTTP request to the HTTPS port,
  • establish a TLS connection, send a proper HTTPS request, then send a plaintext HTTP request over the same TCP connections.

In both cases the server has to respond with an appropriate alert.

  • Property mode set to 100644
File size: 4.2 KB
Line 
1import contextlib
2import os
3import re
4import socket
5import ssl
6import struct
7
8from urllib.request import urlopen
9
10CRLF = b'\r\n\r\n'
11
12
13class TLSRecord:
14    header = struct.Struct('!BHH')
15    def __init__(self, data):
16        self.type, self.legacy_proto, self.length = \
17            self.header.unpack(data[:5])
18        self.data = data[5:]
19        if len(self.data) != self.length:
20            raise ValueError('Actual data length does not match header!')
21
22    def __repr__(self):
23        return f'<{__name__}.{self.__class__.__name__}, type: {self.type}>'
24
25    @property
26    def is_alert(self):
27        return self.type == 21
28
29    @property
30    def is_app_data(self):
31        return self.type == 23
32
33
34def test_immediate_plaintext(host, port, req):
35    """Send plaintext to the HTTPS socket"""
36    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
37        s.connect((host, port))
38        s.sendall(req)
39        data = s.recv(1024)
40    print(f'Received: {data!r}')
41    record = TLSRecord(data)
42    # Expect an unencrypted alert
43    assert(record.is_alert)
44    assert(record.length == 2)
45
46
47def test_plaintext_after_https(host, port, req, context):
48    """Send an HTTPS request and then plaintext on the same TCP connection"""
49    with contextlib.ExitStack() as stack:
50        s = stack.enter_context(socket.socket(socket.AF_INET, socket.SOCK_STREAM))
51        s.connect((host, port))
52
53        # Duplicate s so we can still use it.
54        tls_sock = stack.enter_context(
55            context.wrap_socket(s.dup(), server_hostname='localhost',
56                                do_handshake_on_connect=False,
57                                suppress_ragged_eofs=False))
58        tls_sock.do_handshake()
59
60        # Send request
61        tls_sock.sendall(req)
62
63        # Read header
64        buf = bytearray(2048)
65        pos = 0
66        while not CRLF in buf:
67            received = tls_sock.recv_into(memoryview(buf)[pos:])
68            # If we get 0 it means the connection ended before the
69            # header was complete.
70            assert(received > 0)
71            pos += received
72        print(f'Received HTTPS header: {bytes(memoryview(buf)[:pos])!r}')
73        data_start = buf.index(CRLF) + len(CRLF)
74
75        # Read body
76        m = re.search(rb'\r\nContent-Length: (\d+)\r\n', buf)
77        assert(m is not None)
78        clen = int(m.group(1))
79        while pos < (data_start + clen):
80            received = tls_sock.recv_into(memoryview(buf)[pos:])
81            # If we get 0 it means the connection ended before the
82            # body was complete.
83            assert(received > 0)
84            pos += received
85        body_data = bytes(memoryview(buf)[data_start:pos])
86        print(f'Received HTTPS data: {body_data!r}')
87        assert(body_data == b'test\n')
88
89        print('Sending plaintext request')
90        s.sendall(req)
91        # Peek read so the TLS socket can also get the alert.
92        data = s.recv(1024, socket.MSG_PEEK)
93        print(f'Received: {data!r}')
94        record = TLSRecord(data)
95        # Expect application data (TLS 1.3 encrypted alert, hopefully)
96        assert(record.is_app_data)
97        assert(record.length > 2)
98
99        tls_sock.sendall(req)
100        data = tls_sock.recv(clen)
101        print(f'Received TLS data: {data!r}')
102        assert(len(data) == 0)
103        print('Connection has been closed as expected.')
104
105
106def run_connection(testname, conn_log, response_log):
107    """Inject unencrypted requests into TCP connections."""
108
109    host = os.environ['TEST_HOST']
110    port = int(os.environ['TEST_PORT'])
111    http_req = f'GET /test.txt HTTP/1.1\r\nHost: {host}\r\n\r\n'.encode()
112
113    context = ssl.SSLContext()
114    context.load_verify_locations(cafile='authority/x509.pem')
115    context.verify_mode = ssl.CERT_REQUIRED
116    context.check_hostname = True
117
118    print(test_immediate_plaintext.__doc__)
119    test_immediate_plaintext(host, port, http_req)
120    print()
121
122    print(test_plaintext_after_https.__doc__)
123    test_plaintext_after_https(host, port, http_req, context)
124    print()
125
126    print('Send a good HTTPS request, and expect it to work')
127    with urlopen(f'https://{host}:{port}/test.txt', context=context) as f:
128        print(f.read().decode())
129
130
131if __name__ == '__main__':
132    run_connection(None, None, None)
Note: See TracBrowser for help on using the repository browser.