Created
October 25, 2016 08:20
-
-
Save ellepdesk/3bc4ace904bb276584dbecea348f0d39 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#! /usr/bin/python3 | |
import sys | |
import unittest | |
from unittest import mock | |
import logging | |
from OpenSSL import crypto | |
from datetime import datetime | |
from datetime import timedelta | |
import tempfile | |
import os | |
import math | |
def createCertRequest(pkey, **name): | |
req = crypto.X509Req() | |
subj = req.get_subject() | |
for key, value in name.items(): | |
setattr(subj, key, value) | |
req.set_pubkey(pkey) | |
req.sign(pkey, "sha256") | |
return req | |
def createCertificate(req, issuerCertKey, serial, validityPeriod, | |
digest="sha256"): | |
issuerCert, issuerKey = issuerCertKey | |
notBefore, notAfter = validityPeriod | |
cert = crypto.X509() | |
cert.set_serial_number(serial) | |
cert.gmtime_adj_notBefore(notBefore) | |
cert.gmtime_adj_notAfter(notAfter) | |
cert.set_issuer(issuerCert.get_subject()) | |
cert.set_subject(req.get_subject()) | |
cert.set_pubkey(req.get_pubkey()) | |
cert.sign(issuerKey, digest) | |
return cert | |
def createSelfSignedCertificate(valid_period=(0, 600)): | |
pkey = crypto.PKey() | |
pkey.generate_key(crypto.TYPE_RSA, 2048) | |
req = createCertRequest(pkey, CN='Certificate Authority') | |
cert = createCertificate(req, (req, pkey), 1, valid_period) | |
return cert, pkey | |
def createSignedCertificate(ca, name, serial, valid_period=(0, 600)): | |
pkey = crypto.PKey() | |
pkey.generate_key(crypto.TYPE_RSA, 2048) | |
req = createCertRequest(pkey, CN=name) | |
cert = createCertificate(req, ca, serial, valid_period, "sha256") | |
return cert, pkey | |
def asn1_time_now(offset=None): | |
utcnow = datetime.utcnow() | |
return to_asn1_time(utcnow, offset) | |
def to_asn1_time(utcnow, offset=None): | |
if offset: | |
utcnow += offset | |
return bytes(utcnow.strftime("%Y%m%d%H%M%SZ"), 'UTF-8') | |
def save_certificates(certificate_list, filename): | |
with open(filename, 'wb') as f: | |
for certificate in certificate_list: | |
f.write(crypto.dump_certificate(crypto.FILETYPE_PEM, certificate)) | |
def verify_openssl(certname): | |
import subprocess | |
p = subprocess.Popen( | |
"openssl verify -CAfile CAchain.pem " + certname, | |
shell=True, | |
stdin=subprocess.PIPE, | |
stdout=subprocess.PIPE, | |
stderr=subprocess.PIPE) | |
stdout, stderr = p.communicate() | |
if stderr or p.returncode != 0: | |
raise Exception(stderr) | |
if b"error" in stdout: | |
raise Exception(stdout) | |
return stdout, stderr, p.returncode | |
class KeyServer(): | |
def __init__(self, ca_cert, server): | |
self.store = crypto.X509Store() | |
self.store.add_cert(ca_cert) | |
self.server = server | |
self.server_cert, self.server_pkey = self.server | |
self.store.add_cert(self.server_cert) | |
def verify_certificate(self, cert): | |
ctx = crypto.X509StoreContext(self.store, cert) | |
ctx.verify_certificate() | |
# Generate master keys in global memory | |
ca = createSelfSignedCertificate() | |
server = createSignedCertificate(ca, serial=1, name="server") | |
ca_cert, ca_key = ca | |
server_cert, server_key = server | |
# store certificates and key in local dir | |
save_certificates([ca_cert, server_cert], 'CAchain.pem') | |
class key_server_test(unittest.TestCase): | |
def setUp(self): | |
self.sut = KeyServer(ca_cert, server) | |
def tearDown(self): | |
pass | |
def test_tampered_valid_cert(self): | |
# create expired certificate | |
client_cert, client_key = createSignedCertificate(server, | |
serial=2, | |
name=self.id(), | |
valid_period=(-61, -1)) | |
save_certificates([client_cert], 'expired_client.crt') | |
with self.assertRaises(crypto.X509StoreContextError): | |
# pyopenssl finds an issue | |
certificate_valid = self.sut.verify_certificate(client_cert) | |
with self.assertRaises(Exception): | |
# and so does openssl | |
verify_openssl('expired_client.crt') | |
# Modify notAfter date of certificate to make it seem valid | |
client_cert.set_notAfter(asn1_time_now(offset=timedelta(days=1))) | |
save_certificates([client_cert], 'tampered_client.crt') | |
# This should fail, since the signature of the certificate is no longer correct | |
with self.assertRaises(Exception): | |
# openssl finds an issue | |
verify_openssl('tampered_client.crt') | |
with self.assertRaises(crypto.X509StoreContextError): | |
# but pyopenssl does not! | |
certificate_valid = self.sut.verify_certificate(client_cert) | |
if __name__ == '__main__': | |
unittest.main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment