Skip to content

Instantly share code, notes, and snippets.

@b0bu
Last active July 15, 2021 12:58
Show Gist options
  • Save b0bu/49f304263785bed46c2fb4aff9544196 to your computer and use it in GitHub Desktop.
Save b0bu/49f304263785bed46c2fb4aff9544196 to your computer and use it in GitHub Desktop.
custom-acme-client

I'd like to talk about custom acme protocol clients this time. I'm just showing some dummy implementation code here with the threading removed and auth removed, to show how you might go about setting up core interface that can obtain certificates from an le environment endpoint. This is part of a client that I may open source in the next few weeks as it's widely deployed at my company and solved an automation and provisioning problem.

The usecase for writing your own is to have more granular programmatic control over dns record creation and to speed up the io bound process of obtain a certificate by threading it and batching the threads based on rate limits.

from providers.mycustomdns import client as dns
from acme import client as acme

class EmptyDomainListError(Exception):

    def __init__(self, domains, message="domains must be a list and cannot be an empty list"):
        self.message = message
        self.domains = domains
        super().__init__(self.message)
    
    def __str__(self):
        return f"{self.message} you provided '{self.domains}'"


def main(domains: list) -> None:
    """ main loop """

    # read from config object
    environment = "https://acme-staging-v02.api.letsencrypt.org"
    contacts = ["mailto:[email protected]"]

    # possible dns-01 presumed
    # letsencrypt = acme.Client(environment, challege_type="http-01")
    
    letsencrypt = acme.Client(environment)
    endpoints = letsencrypt.endpoints()

    # can't continue without agreeing to tos
    agree_tos(endpoints['meta']['termsOfService'])

    account = letsencrypt.send_request(endpoints["newAccount"], payload={
            "termsOfServiceAgreed": True,
            "contact": contacts
        })

    # usually runs on a thread using "domains" list 
    # txt record creation and verify le order status set to "ready" before join
    new_order = letsencrypt.send_request(endpoints["newOrder"], payload={
            "identifiers": [{
                "type": "dns",
                "value": "sub.domain.com"
                }]
            }, kid=account.headers["Location"])

The builk of the smarts here goes into what send_request is doing and how to build up a jose encoded JWS. Here's a look at a simple acme client I wrote that, the package itself has other considerations not implemented here that I may post about next such as:

  • the order and dns record creation happen on thread
  • the key material is read from azure_keyvault
  • the key matieral is a plugin system where you can specify a backend
""" initialise letsencrypt client """
#%%

import logging
import requests
import json
from utils.request_helpers.encode import base64_encode
from utils.request_helpers.crypto import import_private_key
from utils.request_helpers.crypto import sign


class Client(object):
    nonce_uri = "/acme/new-nonce"
    directory_uri = "/directory"

    def __init__(self, environment: str, challenge_type="dns-01"):
        self.environment = environment
        self.client_name = "my-acme-client/0.0.1.alpha"
        self.challenge_type = challenge_type
    
    def _nonce(self) -> str:
        """ fetch nonce from the LE """
        nonce = requests.get("".join([self.environment, self.nonce_uri]))
        return nonce.headers["Replay-Nonce"]

    def endpoints(self) -> dict:
        """ get a list of directory api endpoints """
        # put this in config certs issued by stage won't be trusted in browser
        return requests.get("".join([self.environment, self.directory_uri])).json()


    def send_request(self, url: str, payload="", kid=None) -> dict:
        """ send jose formatted request to LE return response """
        headers = { 
            "Content-Type": "application/jose+json",
            'User-Agent': self.client_name,
            'Accept-Language': 'en'
            }

        protected_header = self._build_protected_header(url, kid)

        # build request object and jose encode
        jose = self._build_json_web_signature(url, payload, protected_header)

        # post-as-get
        return requests.post(url, json=jose, headers=headers)


    def _build_json_web_key(self) -> dict:
        """ build and return JWK """
        exponent, modulus = import_private_key()

        jwk = {
            "e": base64_encode(exponent),
            "kty": 'RSA',
            "n": base64_encode(modulus),
        }
        return jwk

    def _build_protected_header(self, url: str, kid=None) -> dict:
        """ build and return protected header """
        
        metadata = { 
            "alg": "RS256", 
            "nonce": self._nonce(), 
            "url": url
        }

        jwk = self._build_json_web_key()

        if kid:
            metadata["kid"] = kid
        else:
            metadata["jwk"] = jwk

        return metadata


    def _build_json_web_signature(self, url: str, payload: dict, protected_header: dict) -> dict:
        """ build and return JWS """
        request = {}

        # dict to json to bsae64
        # dict() -> json.dump(dict).encode("utf-8") -> base64.urlsafe_b64encod(json).decode("utf-8")
        # {"a": 1} -> b'{"a": 1}' -> 'eyJhIjogMX0'
        request["protected"] = base64_encode(json.dumps(protected_header).encode("utf-8"))
        if payload:
            request["payload"] = base64_encode(json.dumps(payload).encode("utf-8"))
        else:
            # no payload is required when sending GET request like to new_order['authorizations']
            # since POST-AS-GET is used, the payload is string ''
            request["payload"] = payload
        signature = f"{request['protected']}.{request['payload']}".encode("utf-8")
        request["signature"] = signature

        return sign(request)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment