|
import json |
|
import os |
|
from certvalidator import CertificateValidator, ValidationContext, errors |
|
import boto3 |
|
from asn1crypto import pem |
|
|
|
''' |
|
Let's load our truststore from s3. Doing this outside of handler function so that this will be loaded only on coldstart. |
|
If the truststore contents change, you need to update the lambda env var 'TRUSTSTORE_FILE_VERSIONID' |
|
with the new files versionId. And also update the same in 'API Gateway > Custom domain names > Domain details > Truststore version' and wait till Status becomes Available. |
|
If APi Gateway finds some problem with the truststore, such as could not find complete chain, it will display a warning. The warning details will tell you which cert it has a problem with and the problem. You need to fix the truststore chain till this warning goes away. |
|
''' |
|
|
|
s3_client = boto3.client('s3') |
|
|
|
bucket = os.environ.get('TRUSTSTORE_BUCKET') |
|
key = os.environ.get('TRUSTSTORE_FILENAME') |
|
versionId = os.environ.get('TRUSTSTORE_FILE_VERSIONID') |
|
|
|
download_path = '/tmp/{}'.format(key) |
|
s3_client.download_file(bucket, key, download_path, ExtraArgs={'VersionId': versionId}) |
|
|
|
trust_roots = [] |
|
with open(download_path, 'rb') as f: |
|
for _, _, der_bytes in pem.unarmor(f.read(), multiple=True): |
|
trust_roots.append(der_bytes) |
|
|
|
def lambda_handler(event, context): |
|
print("Received event: " + json.dumps(event, indent=2)) |
|
''' Get the client cert from lambda event ''' |
|
cert = event["requestContext"]["authentication"]["clientCert"]["clientCertPem"].encode() |
|
|
|
''' |
|
hard-fail mode, any error in checking revocation is considered a failure. However, if there is no known source of revocation information, it is not considered a failure. |
|
This allows us to keep using self signed certs too. |
|
''' |
|
context = ValidationContext(allow_fetching=True, revocation_mode="hard-fail", trust_roots=trust_roots) |
|
|
|
try: |
|
validator = CertificateValidator(cert, validation_context=context) |
|
validator.validate_usage( |
|
set(['digital_signature', 'key_encipherment']), |
|
set(['server_auth', 'client_auth']), |
|
True |
|
) |
|
except Exception as inst: |
|
print(inst) |
|
print("The certificate could not be validated") |
|
return { |
|
"isAuthorized": "false", |
|
"context": { |
|
"exception": str(inst.args) |
|
} |
|
} |
|
else: |
|
print("The certificate is ok") |
|
return { |
|
"isAuthorized": "true", |
|
"context": { |
|
"exception": None |
|
} |
|
} |