""" Python interface to EasyRSA CA. """ from __future__ import annotations import subprocess from datetime import datetime from pathlib import Path from OpenSSL import crypto from passlib import pwd from pydantic import BaseModel from .config import CertificateAlgo, Config, Settings from .db import Device class DistinguishedName(BaseModel): """ An `X.509 distinguished name` (DN) as specified in RFC 5280 """ country: str state: str city: str organization: str organizational_unit: str email: str common_name: str @classmethod def build(cls, device: Device | None = None) -> DistinguishedName: """ Create a DN from the current config and an optional device """ # extract server DN config server_dn = Config._.server_dn result = cls( country=server_dn.country.value, state=server_dn.state.value, city=server_dn.city.value, organization=server_dn.organization.value, organizational_unit=server_dn.organizational_unit.value, email=server_dn.email.value, common_name=server_dn.common_name, ) # no device specified -> done if device is None: return result # don't override locked or empty fields if not (server_dn.country.locked or device.owner.country is None): result.country = device.owner.country if not (server_dn.state.locked or device.owner.state is None): result.state = device.owner.state if not (server_dn.city.locked or device.owner.city is None): result.city = device.owner.city if not (server_dn.organization.locked or device.owner.organization is None): result.organization = device.owner.organization if not (server_dn.organizational_unit.locked or device.owner.organizational_unit is None): result.organizational_unit = device.owner.organizational_unit # definitely use derived email and common_name result.email = device.owner.email result.common_name = f"{device.owner.name}_{device.name}" return result @property def easyrsa_args(self) -> tuple[str]: """ Pass this DN as arguments to easyrsa """ return ( "--dn-mode=org", f"--req-c={self.country}", f"--req-st={self.state}", f"--req-city={self.city}", f"--req-org={self.organization}", f"--req-ou={self.organizational_unit}", f"--req-email={self.email}", f"--req-cn={self.common_name}", ) class EasyRSA: """ Represents an EasyRSA PKI. """ @property def output_directory(self) -> Path: """ Where certificates are stored """ return Settings._.data_dir.joinpath("pki") @property def ca_password(self) -> str: """ Get CA password from config, or generate a new one """ config = Config._ if (ca_password := config.crypto.ca_password) is None: ca_password = pwd.genword( length=32, charset="ascii_62", ) config.crypto.ca_password = ca_password config.save() return config.crypto.ca_password def __easyrsa( self, *easyrsa_args: str, ) -> subprocess.CompletedProcess: """ Call the `easyrsa` executable """ return subprocess.run( [ "easyrsa", "--batch", f"--pki-dir={self.output_directory}", *easyrsa_args, ], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True, ) def __build_cert( self, cert_filename: Path, expiry_days: int | None, *easyrsa_args: str, ) -> crypto.X509: """ Create an X.509 certificate """ config = Config._ extra_args: tuple[str] = ( f"--passout=pass:{self.ca_password}", f"--passin=pass:{self.ca_password}", ) if expiry_days is not None: extra_args += tuple([f"--days={expiry_days}"]) if (algo := config.crypto.cert_algo) is not None: if algo is CertificateAlgo.rsa2048: extra_args += ("--use-algo=rsa", "--keysize=2048") elif algo is CertificateAlgo.rsa4096: extra_args += ("--use-algo=rsa", "--keysize=4096") elif algo is CertificateAlgo.secp256r1: extra_args += ("--use-algo=ec", "--curve=secp256r1") elif algo is CertificateAlgo.secp384r1: extra_args += ("--use-algo=ec", "--curve=secp384r1") elif algo is CertificateAlgo.ed25519: extra_args += ("--use-algo=ed", "--curve=ed25519") else: raise ValueError(f"Unexpected algorithm: {algo}") self.__easyrsa( *extra_args, *easyrsa_args ) with open( self.output_directory.joinpath(cert_filename), "r" ) as cert_file: return crypto.load_certificate( crypto.FILETYPE_PEM, cert_file.read() ) def init_pki(self) -> bool: """ Clean the working directory """ self.__easyrsa("init-pki") def build_ca(self) -> crypto.X509: """ Build the CA certificate """ cert = self.__build_cert( Path("ca.crt"), Config._.crypto.ca_expiry_days, "--req-cn=kiwi-vpn-ca", "build-ca", ) # # this takes long! # self.__easyrsa("gen-dh") return cert def issue( self, cert_type: str = "client", dn: DistinguishedName = DistinguishedName.build(), ) -> crypto.X509: """ Issue a client or server certificate """ return self.__build_cert( Path(f"issued/{dn.common_name}.crt"), Config._.crypto.cert_expiry_days, *dn.easyrsa_args, f"build-{cert_type}-full", dn.common_name, "nopass", ) # some basic test if __name__ == "__main__": easy_rsa = EasyRSA() easy_rsa.init_pki() ca = easy_rsa.build_ca() server = easy_rsa.issue("server") # client = easy_rsa.issue(cert_type="client", cn="kiwi-vpn-client") date_format, encoding = "%Y%m%d%H%M%SZ", "ascii" for cert in (ca, server): print(cert.get_subject().CN) print(cert.get_signature_algorithm().decode(encoding)) print(datetime.strptime( cert.get_notAfter().decode(encoding), date_format))