""" Python interface to EasyRSA CA. """ import subprocess from datetime import datetime from pathlib import Path from OpenSSL import crypto from passlib import pwd from .config import CertificateAlgo, Config, Settings class EasyRSA: """ Represents an EasyRSA PKI. """ @property def pki_directory(self) -> Path: return Settings._.data_dir.joinpath("pki") @property def ca_password(self) -> str: config = Config._ if (ca_password := config.crypto.ca_password) is None: ca_password = pwd.genword( length=32, charset="ascii_62", ) config.crypto.ca_password = ca_password config.save() return config.crypto.ca_password def __easyrsa( self, *easyrsa_args: str, ) -> subprocess.CompletedProcess: return subprocess.run( [ "easyrsa", "--batch", f"--pki-dir={self.pki_directory}", *easyrsa_args, ], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=True, ) def __build_cert( self, cert_filename: Path, expiry_days: int | None, *easyrsa_args: str, ) -> crypto.X509: config = Config._ extra_args: tuple[str] = tuple() if expiry_days is not None: extra_args += tuple([f"--days={expiry_days}"]) if (algo := config.crypto.cert_algo) is not None: if algo is CertificateAlgo.rsa2048: extra_args += ("--use-algo=rsa", "--keysize=2048") elif algo is CertificateAlgo.rsa4096: extra_args += ("--use-algo=rsa", "--keysize=4096") elif algo is CertificateAlgo.secp256r1: extra_args += ("--use-algo=ec", "--curve=secp256r1") elif algo is CertificateAlgo.secp384r1: extra_args += ("--use-algo=ec", "--curve=secp384r1") elif algo is CertificateAlgo.ed25519: extra_args += ("--use-algo=ed", "--curve=ed25519") else: raise ValueError(f"Unexpected algorithm: {algo}") self.__easyrsa( *extra_args, *easyrsa_args ) with open( self.pki_directory.joinpath(cert_filename), "r" ) as cert_file: return crypto.load_certificate( crypto.FILETYPE_PEM, cert_file.read() ) def init_pki(self) -> bool: self.__easyrsa("init-pki") def build_ca(self) -> crypto.X509: config = Config._ server_dn = config.server_dn cert = self.__build_cert( Path("ca.crt"), config.crypto.ca_expiry_days, f"--passout=pass:{self.ca_password}", f"--passin=pass:{self.ca_password}", "--dn-mode=org", f"--req-c={server_dn.country.value}", f"--req-st={server_dn.state.value}", f"--req-city={server_dn.city.value}", f"--req-org={server_dn.organization.value}", f"--req-ou={server_dn.organizational_unit.value}", f"--req-email={server_dn.email.value}", f"--req-cn={server_dn.common_name}", "build-ca", ) # self.__easyrsa("gen-dh") return cert def issue( self, cert_type: str = "client", cn: str = "kiwi-vpn-client", ) -> crypto.X509: config = Config._ return self.__build_cert( Path(f"issued/{cn}.crt"), config.crypto.cert_expiry_days, f"--passin=pass:{self.ca_password}", f"build-{cert_type}-full", cn, "nopass", ) if __name__ == "__main__": easy_rsa = EasyRSA() easy_rsa.init_pki() ca = easy_rsa.build_ca() server = easy_rsa.issue(cert_type="server", cn="kiwi-vpn-server") client = easy_rsa.issue(cert_type="client", cn="kiwi-vpn-client") date_format, encoding = "%Y%m%d%H%M%SZ", "ascii" for cert in [ca, server, client]: print(cert.get_subject().CN) print(cert.get_signature_algorithm().decode(encoding)) print(datetime.strptime( cert.get_notAfter().decode(encoding), date_format))