[c9e720f] | 1 | import contextlib |
---|
| 2 | import os |
---|
| 3 | import re |
---|
| 4 | import socket |
---|
| 5 | import ssl |
---|
| 6 | import struct |
---|
| 7 | |
---|
| 8 | from urllib.request import urlopen |
---|
| 9 | |
---|
| 10 | CRLF = b'\r\n\r\n' |
---|
| 11 | |
---|
| 12 | |
---|
| 13 | class TLSRecord: |
---|
| 14 | header = struct.Struct('!BHH') |
---|
| 15 | def __init__(self, data): |
---|
| 16 | self.type, self.legacy_proto, self.length = \ |
---|
| 17 | self.header.unpack(data[:5]) |
---|
| 18 | self.data = data[5:] |
---|
| 19 | if len(self.data) != self.length: |
---|
| 20 | raise ValueError('Actual data length does not match header!') |
---|
| 21 | |
---|
| 22 | def __repr__(self): |
---|
| 23 | return f'<{__name__}.{self.__class__.__name__}, type: {self.type}>' |
---|
| 24 | |
---|
| 25 | @property |
---|
| 26 | def is_alert(self): |
---|
| 27 | return self.type == 21 |
---|
| 28 | |
---|
| 29 | @property |
---|
| 30 | def is_app_data(self): |
---|
| 31 | return self.type == 23 |
---|
| 32 | |
---|
| 33 | |
---|
| 34 | def test_immediate_plaintext(host, port, req): |
---|
| 35 | """Send plaintext to the HTTPS socket""" |
---|
| 36 | with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: |
---|
| 37 | s.connect((host, port)) |
---|
| 38 | s.sendall(req) |
---|
| 39 | data = s.recv(1024) |
---|
| 40 | print(f'Received: {data!r}') |
---|
| 41 | record = TLSRecord(data) |
---|
| 42 | # Expect an unencrypted alert |
---|
| 43 | assert(record.is_alert) |
---|
| 44 | assert(record.length == 2) |
---|
| 45 | |
---|
| 46 | |
---|
| 47 | def test_plaintext_after_https(host, port, req, context): |
---|
| 48 | """Send an HTTPS request and then plaintext on the same TCP connection""" |
---|
| 49 | with contextlib.ExitStack() as stack: |
---|
| 50 | s = stack.enter_context(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) |
---|
| 51 | s.connect((host, port)) |
---|
| 52 | |
---|
| 53 | # Duplicate s so we can still use it. |
---|
| 54 | tls_sock = stack.enter_context( |
---|
| 55 | context.wrap_socket(s.dup(), server_hostname='localhost', |
---|
| 56 | do_handshake_on_connect=False, |
---|
| 57 | suppress_ragged_eofs=False)) |
---|
| 58 | tls_sock.do_handshake() |
---|
| 59 | |
---|
| 60 | # Send request |
---|
| 61 | tls_sock.sendall(req) |
---|
| 62 | |
---|
| 63 | # Read header |
---|
| 64 | buf = bytearray(2048) |
---|
| 65 | pos = 0 |
---|
| 66 | while not CRLF in buf: |
---|
| 67 | received = tls_sock.recv_into(memoryview(buf)[pos:]) |
---|
| 68 | # If we get 0 it means the connection ended before the |
---|
| 69 | # header was complete. |
---|
| 70 | assert(received > 0) |
---|
| 71 | pos += received |
---|
| 72 | print(f'Received HTTPS header: {bytes(memoryview(buf)[:pos])!r}') |
---|
| 73 | data_start = buf.index(CRLF) + len(CRLF) |
---|
| 74 | |
---|
| 75 | # Read body |
---|
| 76 | m = re.search(rb'\r\nContent-Length: (\d+)\r\n', buf) |
---|
| 77 | assert(m is not None) |
---|
| 78 | clen = int(m.group(1)) |
---|
| 79 | while pos < (data_start + clen): |
---|
| 80 | received = tls_sock.recv_into(memoryview(buf)[pos:]) |
---|
| 81 | # If we get 0 it means the connection ended before the |
---|
| 82 | # body was complete. |
---|
| 83 | assert(received > 0) |
---|
| 84 | pos += received |
---|
| 85 | body_data = bytes(memoryview(buf)[data_start:pos]) |
---|
| 86 | print(f'Received HTTPS data: {body_data!r}') |
---|
| 87 | assert(body_data == b'test\n') |
---|
| 88 | |
---|
| 89 | print('Sending plaintext request') |
---|
| 90 | s.sendall(req) |
---|
| 91 | # Peek read so the TLS socket can also get the alert. |
---|
| 92 | data = s.recv(1024, socket.MSG_PEEK) |
---|
| 93 | print(f'Received: {data!r}') |
---|
| 94 | record = TLSRecord(data) |
---|
| 95 | # Expect application data (TLS 1.3 encrypted alert, hopefully) |
---|
| 96 | assert(record.is_app_data) |
---|
| 97 | assert(record.length > 2) |
---|
| 98 | |
---|
| 99 | tls_sock.sendall(req) |
---|
| 100 | data = tls_sock.recv(clen) |
---|
| 101 | print(f'Received TLS data: {data!r}') |
---|
| 102 | assert(len(data) == 0) |
---|
| 103 | print('Connection has been closed as expected.') |
---|
| 104 | |
---|
| 105 | |
---|
| 106 | def run_connection(testname, conn_log, response_log): |
---|
| 107 | """Inject unencrypted requests into TCP connections.""" |
---|
| 108 | |
---|
| 109 | host = os.environ['TEST_HOST'] |
---|
| 110 | port = int(os.environ['TEST_PORT']) |
---|
| 111 | http_req = f'GET /test.txt HTTP/1.1\r\nHost: {host}\r\n\r\n'.encode() |
---|
| 112 | |
---|
| 113 | context = ssl.SSLContext() |
---|
| 114 | context.load_verify_locations(cafile='authority/x509.pem') |
---|
| 115 | context.verify_mode = ssl.CERT_REQUIRED |
---|
| 116 | context.check_hostname = True |
---|
| 117 | |
---|
| 118 | print(test_immediate_plaintext.__doc__) |
---|
| 119 | test_immediate_plaintext(host, port, http_req) |
---|
| 120 | print() |
---|
| 121 | |
---|
| 122 | print(test_plaintext_after_https.__doc__) |
---|
| 123 | test_plaintext_after_https(host, port, http_req, context) |
---|
| 124 | print() |
---|
| 125 | |
---|
| 126 | print('Send a good HTTPS request, and expect it to work') |
---|
| 127 | with urlopen(f'https://{host}:{port}/test.txt', context=context) as f: |
---|
| 128 | print(f.read().decode()) |
---|
| 129 | |
---|
| 130 | |
---|
| 131 | if __name__ == '__main__': |
---|
| 132 | run_connection(None, None, None) |
---|