1 | import contextlib |
---|
2 | import os |
---|
3 | import re |
---|
4 | import socket |
---|
5 | import ssl |
---|
6 | import struct |
---|
7 | |
---|
8 | from urllib.request import urlopen |
---|
9 | |
---|
10 | CRLF = b'\r\n\r\n' |
---|
11 | |
---|
12 | |
---|
13 | class TLSRecord: |
---|
14 | header = struct.Struct('!BHH') |
---|
15 | |
---|
16 | def __init__(self, data): |
---|
17 | self.type, self.legacy_proto, self.length = \ |
---|
18 | self.header.unpack(data[:5]) |
---|
19 | self.data = data[5:] |
---|
20 | if len(self.data) != self.length: |
---|
21 | raise ValueError('Actual data length does not match header!') |
---|
22 | |
---|
23 | def __repr__(self): |
---|
24 | return f'<{__name__}.{self.__class__.__name__}, type: {self.type}>' |
---|
25 | |
---|
26 | @property |
---|
27 | def is_alert(self): |
---|
28 | return self.type == 21 |
---|
29 | |
---|
30 | @property |
---|
31 | def is_app_data(self): |
---|
32 | return self.type == 23 |
---|
33 | |
---|
34 | |
---|
35 | def test_immediate_plaintext(host, port, req): |
---|
36 | """Send plaintext to the HTTPS socket""" |
---|
37 | with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: |
---|
38 | s.connect((host, port)) |
---|
39 | s.sendall(req) |
---|
40 | data = s.recv(1024) |
---|
41 | print(f'Received: {data!r}') |
---|
42 | record = TLSRecord(data) |
---|
43 | # Expect an unencrypted alert |
---|
44 | assert(record.is_alert) |
---|
45 | assert(record.length == 2) |
---|
46 | |
---|
47 | |
---|
48 | def test_plaintext_after_https(host, port, req, context): |
---|
49 | """Send an HTTPS request and then plaintext on the same TCP connection""" |
---|
50 | with contextlib.ExitStack() as stack: |
---|
51 | s = stack.enter_context( |
---|
52 | socket.socket(socket.AF_INET, socket.SOCK_STREAM)) |
---|
53 | s.connect((host, port)) |
---|
54 | |
---|
55 | # Duplicate s so we can still use it. |
---|
56 | tls_sock = stack.enter_context( |
---|
57 | context.wrap_socket(s.dup(), server_hostname='localhost', |
---|
58 | do_handshake_on_connect=False, |
---|
59 | suppress_ragged_eofs=False)) |
---|
60 | tls_sock.do_handshake() |
---|
61 | |
---|
62 | # Send request |
---|
63 | tls_sock.sendall(req) |
---|
64 | |
---|
65 | # Read header |
---|
66 | buf = bytearray(2048) |
---|
67 | pos = 0 |
---|
68 | while CRLF not in buf: |
---|
69 | received = tls_sock.recv_into(memoryview(buf)[pos:]) |
---|
70 | # If we get 0 it means the connection ended before the |
---|
71 | # header was complete. |
---|
72 | assert(received > 0) |
---|
73 | pos += received |
---|
74 | print(f'Received HTTPS header: {bytes(memoryview(buf)[:pos])!r}') |
---|
75 | data_start = buf.index(CRLF) + len(CRLF) |
---|
76 | |
---|
77 | # Read body |
---|
78 | m = re.search(rb'\r\nContent-Length: (\d+)\r\n', buf) |
---|
79 | assert(m is not None) |
---|
80 | clen = int(m.group(1)) |
---|
81 | while pos < (data_start + clen): |
---|
82 | received = tls_sock.recv_into(memoryview(buf)[pos:]) |
---|
83 | # If we get 0 it means the connection ended before the |
---|
84 | # body was complete. |
---|
85 | assert(received > 0) |
---|
86 | pos += received |
---|
87 | body_data = bytes(memoryview(buf)[data_start:pos]) |
---|
88 | print(f'Received HTTPS data: {body_data!r}') |
---|
89 | assert(body_data == b'test\n') |
---|
90 | |
---|
91 | print('Sending plaintext request') |
---|
92 | s.sendall(req) |
---|
93 | # Peek read so the TLS socket can also get the alert. |
---|
94 | data = s.recv(1024, socket.MSG_PEEK) |
---|
95 | print(f'Received: {data!r}') |
---|
96 | record = TLSRecord(data) |
---|
97 | # Expect application data (TLS 1.3 encrypted alert, hopefully) |
---|
98 | assert(record.is_app_data) |
---|
99 | assert(record.length > 2) |
---|
100 | |
---|
101 | tls_sock.sendall(req) |
---|
102 | data = tls_sock.recv(clen) |
---|
103 | print(f'Received TLS data: {data!r}') |
---|
104 | assert(len(data) == 0) |
---|
105 | print('Connection has been closed as expected.') |
---|
106 | |
---|
107 | |
---|
108 | def run_connection(testname, conn_log, response_log): |
---|
109 | """Inject unencrypted requests into TCP connections.""" |
---|
110 | |
---|
111 | host = os.environ['TEST_HOST'] |
---|
112 | port = int(os.environ['TEST_PORT']) |
---|
113 | http_req = f'GET /test.txt HTTP/1.1\r\nHost: {host}\r\n\r\n'.encode() |
---|
114 | |
---|
115 | context = ssl.SSLContext() |
---|
116 | context.load_verify_locations(cafile='authority/x509.pem') |
---|
117 | context.verify_mode = ssl.CERT_REQUIRED |
---|
118 | context.check_hostname = True |
---|
119 | |
---|
120 | print(test_immediate_plaintext.__doc__) |
---|
121 | test_immediate_plaintext(host, port, http_req) |
---|
122 | print() |
---|
123 | |
---|
124 | print(test_plaintext_after_https.__doc__) |
---|
125 | test_plaintext_after_https(host, port, http_req, context) |
---|
126 | print() |
---|
127 | |
---|
128 | print('Send a good HTTPS request, and expect it to work') |
---|
129 | with urlopen(f'https://{host}:{port}/test.txt', context=context) as f: |
---|
130 | print(f.read().decode()) |
---|
131 | |
---|
132 | |
---|
133 | if __name__ == '__main__': |
---|
134 | run_connection(None, None, None) |
---|