Last active
March 26, 2024 07:45
-
-
Save korrosivesec/0311282d327c58c7c8aa7004ffcab1ba to your computer and use it in GitHub Desktop.
Extract and parse untrusted self-signed certificate from aiohttp ClientResponse object.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import aiohttp | |
import asyncio | |
from typing import Text, Awaitable, Tuple | |
from cryptography import x509 | |
from cryptography.x509 import Certificate | |
from cryptography.hazmat.primitives import hashes | |
# Additional fields to parse documented at https://cryptography.io/en/latest/x509/reference/#x-509-certificate-object | |
def parse_cert(cert: Certificate) -> dict: | |
cert_dict = {} | |
cert_dict['fingerprint'] = ':'.join('{:02X}'.format(byte) for byte in cert.fingerprint(hashes.SHA1())) | |
cert_dict['serial'] = str(cert.serial_number) | |
cert_dict['start_date'] = cert.not_valid_before.strftime('%Y/%m/%d') | |
cert_dict['end_date'] = cert.not_valid_after.strftime('%Y/%m/%d') | |
cert_dict['issuer'] = cert.issuer.rfc4514_string() | |
cert_dict['subject'] = cert.subject.rfc4514_string() | |
return cert_dict | |
async def make_request(session: aiohttp.ClientSession, url: Text) -> Tuple[Awaitable[aiohttp.ClientResponse], dict]: | |
# ssl = False so that we can connect to servers with self-signed certificates. | |
async with session.get(url, ssl=False) as response: | |
# Get SSLSocket object inside the context manager before the response | |
# object is closed. | |
socket = response.connection.transport.get_extra_info('ssl_object') | |
# getpeercert(True) to retrieve the binary version of the cert. | |
# Without specifiying binary output, the certificate is never parsed | |
# when certificate validation is disabled. | |
# https://github.com/python/cpython/blob/401272e6e660445d6556d5cd4db88ed4267a50b3/Modules/_ssl.c#L1819 | |
cert_binary = socket.getpeercert(True) | |
cert_x509 = x509.load_der_x509_certificate(cert_binary) | |
cert_dict = parse_cert(cert_x509) | |
return response, certificate | |
async def main(): | |
async with aiohttp.ClientSession() as session: | |
response, certificate = await make_request(session, 'https://google.com') | |
print(certificate) | |
print(response) | |
loop = asyncio.get_event_loop() | |
loop.run_until_complete(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment