""" Python interface to EasyRSA CA. """ from __future__ import annotations import subprocess from enum import Enum, auto from pathlib import Path from cryptography import x509 from passlib import pwd from pydantic import BaseModel from .config import SETTINGS, Config, KeyAlgorithm from .db import Connection, Device class DistinguishedName(BaseModel): """ An `X.509 distinguished name` (DN) as specified in RFC 5280 """ country: str state: str city: str organization: str organizational_unit: str email: str common_name: str @classmethod def build(cls, device: Device | None = None) -> DistinguishedName: """ Create a DN from the current config and an optional device """ # extract server DN config server_dn = Config._.server_dn result = cls( country=server_dn.country.value, state=server_dn.state.value, city=server_dn.city.value, organization=server_dn.organization.value, organizational_unit=server_dn.organizational_unit.value, email=server_dn.email.value, common_name=server_dn.common_name, ) # no device specified -> done if device is None: return result # don't override locked or empty fields if not (server_dn.country.locked or device.owner.country is None): result.country = device.owner.country if not (server_dn.state.locked or device.owner.state is None): result.state = device.owner.state if not (server_dn.city.locked or device.owner.city is None): result.city = device.owner.city if not (server_dn.organization.locked or device.owner.organization is None): result.organization = device.owner.organization if not (server_dn.organizational_unit.locked or device.owner.organizational_unit is None): result.organizational_unit = device.owner.organizational_unit # definitely use derived email and common_name result.email = device.owner.email result.common_name = f"{device.owner.name}_{device.name}" return result @property def easyrsa_env(self) -> dict[str, str]: """ Pass this DN as arguments to easyrsa """ return { "EASYRSA_DN": "org", "EASYRSA_REQ_COUNTRY": self.country, "EASYRSA_REQ_PROVINCE": self.state, "EASYRSA_REQ_CITY": self.city, "EASYRSA_REQ_ORG": self.organization, "EASYRSA_REQ_OU": self.organizational_unit, "EASYRSA_REQ_EMAIL": self.email, "EASYRSA_REQ_CN": self.common_name, } class CertificateType(Enum): """ Possible types of certificates """ client = auto() server = auto() def __str__(self) -> str: return self._name_ class EasyRSA: """ Represents an EasyRSA PKI. """ __mapKeyAlgorithm = { KeyAlgorithm.rsa2048: { "EASYRSA_ALGO": "rsa", "EASYRSA_KEY_SIZE": "2048", }, KeyAlgorithm.rsa4096: { "EASYRSA_ALGO": "rsa", "EASYRSA_KEY_SIZE": "4096", }, KeyAlgorithm.secp256r1: { "EASYRSA_ALGO": "ec", "EASYRSA_CURVE": "secp256r1", }, KeyAlgorithm.secp384r1: { "EASYRSA_ALGO": "ec", "EASYRSA_CURVE": "secp384r1", }, KeyAlgorithm.ed25519: { "EASYRSA_ALGO": "ed", "EASYRSA_CURVE": "ed25519", }, None: {}, } @property def output_directory(self) -> Path: """ Where certificates are stored """ return SETTINGS.data_dir.joinpath("pki") @property def ca_password(self) -> str: """ Get CA password from config, or generate a new one """ config = Config._ if (ca_password := config.crypto.ca_password) is None: # generate and save new CA password ca_password = pwd.genword( length=32, charset="ascii_62", ) config.crypto.ca_password = ca_password config.save() return ca_password def __easyrsa( self, *easyrsa_cmd: str, **easyrsa_env: str, ) -> subprocess.CompletedProcess: """ Call the `easyrsa` executable """ return subprocess.run( [ "/usr/local/bin/easyrsa", *easyrsa_cmd, ], env={ # base settings "EASYRSA_BATCH": "1", "EASYRSA_PKI": str(self.output_directory), # always include CA password "EASYRSA_PASSOUT": f"pass:{self.ca_password}", "EASYRSA_PASSIN": f"pass:{self.ca_password}", # include env from parameters **easyrsa_env, }, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True, ) def __build_cert( self, *easyrsa_cmd: str, **easyrsa_env: str, ) -> None: """ Create an X.509 certificate """ config = Config._ if ((algorithm := config.crypto.key_algorithm) not in EasyRSA.__mapKeyAlgorithm): raise ValueError(f"Unexpected algorithm: {algorithm}") # include expiry options if (ca_expiry_days := config.crypto.ca_expiry_days) is not None: easyrsa_env["EASYRSA_CA_EXPIRE"] = str(ca_expiry_days) if (cert_expiry_days := config.crypto.cert_expiry_days) is not None: easyrsa_env["EASYRSA_CERT_EXPIRE"] = str(cert_expiry_days) try: # call easyrsa self.__easyrsa( *easyrsa_cmd, # include algorithm options **EasyRSA.__mapKeyAlgorithm[algorithm], **easyrsa_env, ) except (subprocess.CalledProcessError): # certificate couldn't be built pass return None def get_certificate( self, *, dn: DistinguishedName | None = None, ) -> x509.Certificate | None: """ Get a certificate from the PKI directory """ if dn is None: cert_filename = self.output_directory.joinpath("ca.crt") else: cert_filename = (self.output_directory.joinpath("issued") .joinpath(f"{dn.common_name}.crt")) try: # parse the certificate with open(cert_filename, "rb") as cert_file: return x509.load_pem_x509_certificate( cert_file.read() ) except FileNotFoundError: return None def init_pki(self) -> None: """ Clean working directory """ self.__easyrsa("init-pki") def build_ca(self) -> x509.Certificate: """ Build the CA certificate """ self.__build_cert( "build-ca", EASYRSA_DN="cn_only", EASYRSA_REQ_CN="kiwi-vpn-ca", ) cert = self.get_certificate() assert cert is not None # # this takes long! # self.__easyrsa("gen-dh") return cert def issue( self, cert_type: CertificateType = CertificateType.client, dn: DistinguishedName | None = None, ) -> x509.Certificate | None: """ Issue a client or server certificate """ if dn is None: dn = DistinguishedName.build() if not (cert_type is CertificateType.client or cert_type is CertificateType.server): return None self.__build_cert( f"build-{cert_type}-full", dn.common_name, "nopass", **dn.easyrsa_env, ) return self.get_certificate(dn=dn) def renew( self, dn: DistinguishedName | None = None, ) -> x509.Certificate | None: """ Renew a client or server certificate """ if dn is None: dn = DistinguishedName.build() self.__build_cert( "renew", dn.common_name, "nopass", # allow renewal 14 days before cert expiry EASYRSA_CERT_RENEW="14", **dn.easyrsa_env, ) return self.get_certificate(dn=dn) def revoke( self, dn: DistinguishedName | None = None, ) -> bool: """ Revoke a client or server certificate """ if dn is None: dn = DistinguishedName.build() try: self.__easyrsa( "revoke", dn.common_name, **dn.easyrsa_env, ) except subprocess.CalledProcessError: return False return True EASYRSA = EasyRSA() # some basic test if __name__ == "__main__": ca = EASYRSA.build_ca() server = EASYRSA.issue(CertificateType.server) client = None # check if configured if (current_config := Config.load()) is not None: # connect to database Connection.connect(current_config.db.uri) if (device := Device.get(1)) is not None: client = EASYRSA.issue( dn=DistinguishedName.build(device) ) for cert in (ca, server, client): if cert is not None: print(cert.subject) print(cert.signature_hash_algorithm) print(cert.not_valid_after)