From 968e9491cf36f949d2e421e37a422c82e4843c78 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn-Michael=20Miehe?= <40151420+ldericher@users.noreply.github.com> Date: Wed, 30 Mar 2022 23:41:34 +0000 Subject: [PATCH] use ORG mode for client- and server certs --- api/kiwi_vpn_api/easyrsa.py | 158 ++++++++++++++++++++++++++++++------ 1 file changed, 131 insertions(+), 27 deletions(-) diff --git a/api/kiwi_vpn_api/easyrsa.py b/api/kiwi_vpn_api/easyrsa.py index 450baa8..827fad1 100644 --- a/api/kiwi_vpn_api/easyrsa.py +++ b/api/kiwi_vpn_api/easyrsa.py @@ -2,14 +2,99 @@ Python interface to EasyRSA CA. """ +from __future__ import annotations + import subprocess from datetime import datetime from pathlib import Path from OpenSSL import crypto from passlib import pwd +from pydantic import BaseModel from .config import CertificateAlgo, Config, Settings +from .db import Device + + +class DistinguishedName(BaseModel): + """ + An `X.509 distinguished name` (DN) as specified in RFC 5280 + """ + + country: str + state: str + city: str + organization: str + organizational_unit: str + email: str + common_name: str + + @classmethod + def build(cls, device: Device | None = None) -> DistinguishedName: + """ + Create a DN from the current config and an optional device + """ + + # extract server DN config + server_dn = Config._.server_dn + result = cls( + country=server_dn.country.value, + state=server_dn.state.value, + city=server_dn.city.value, + organization=server_dn.organization.value, + organizational_unit=server_dn.organizational_unit.value, + email=server_dn.email.value, + common_name=server_dn.common_name, + ) + + # no device specified -> done + if device is None: + return result + + # don't override locked or empty fields + if not (server_dn.country.locked + or device.owner.country is None): + result.country = device.owner.country + + if not (server_dn.state.locked + or device.owner.state is None): + result.state = device.owner.state + + if not (server_dn.city.locked + or device.owner.city is None): + result.city = device.owner.city + + if not (server_dn.organization.locked + or device.owner.organization is None): + result.organization = device.owner.organization + + if not (server_dn.organizational_unit.locked + or device.owner.organizational_unit is None): + result.organizational_unit = device.owner.organizational_unit + + # definitely use derived email and common_name + result.email = device.owner.email + result.common_name = f"{device.owner.name}_{device.name}" + + return result + + @property + def easyrsa_args(self) -> tuple[str]: + """ + Pass this DN as arguments to easyrsa + """ + + return ( + "--dn-mode=org", + + f"--req-c={self.country}", + f"--req-st={self.state}", + f"--req-city={self.city}", + f"--req-org={self.organization}", + f"--req-ou={self.organizational_unit}", + f"--req-email={self.email}", + f"--req-cn={self.common_name}", + ) class EasyRSA: @@ -18,11 +103,19 @@ class EasyRSA: """ @property - def pki_directory(self) -> Path: + def output_directory(self) -> Path: + """ + Where certificates are stored + """ + return Settings._.data_dir.joinpath("pki") @property def ca_password(self) -> str: + """ + Get CA password from config, or generate a new one + """ + config = Config._ if (ca_password := config.crypto.ca_password) is None: @@ -40,10 +133,14 @@ class EasyRSA: self, *easyrsa_args: str, ) -> subprocess.CompletedProcess: + """ + Call the `easyrsa` executable + """ + return subprocess.run( [ "easyrsa", "--batch", - f"--pki-dir={self.pki_directory}", + f"--pki-dir={self.output_directory}", *easyrsa_args, ], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, @@ -56,9 +153,16 @@ class EasyRSA: expiry_days: int | None, *easyrsa_args: str, ) -> crypto.X509: + """ + Create an X.509 certificate + """ + config = Config._ - extra_args: tuple[str] = tuple() + extra_args: tuple[str] = ( + f"--passout=pass:{self.ca_password}", + f"--passin=pass:{self.ca_password}", + ) if expiry_days is not None: extra_args += tuple([f"--days={expiry_days}"]) @@ -88,71 +192,71 @@ class EasyRSA: ) with open( - self.pki_directory.joinpath(cert_filename), "r" + self.output_directory.joinpath(cert_filename), "r" ) as cert_file: return crypto.load_certificate( crypto.FILETYPE_PEM, cert_file.read() ) def init_pki(self) -> bool: + """ + Clean the working directory + """ + self.__easyrsa("init-pki") def build_ca(self) -> crypto.X509: - config = Config._ - server_dn = config.server_dn + """ + Build the CA certificate + """ cert = self.__build_cert( Path("ca.crt"), - config.crypto.ca_expiry_days, + Config._.crypto.ca_expiry_days, - f"--passout=pass:{self.ca_password}", - f"--passin=pass:{self.ca_password}", - - "--dn-mode=org", - f"--req-c={server_dn.country.value}", - f"--req-st={server_dn.state.value}", - f"--req-city={server_dn.city.value}", - f"--req-org={server_dn.organization.value}", - f"--req-ou={server_dn.organizational_unit.value}", - f"--req-email={server_dn.email.value}", - f"--req-cn={server_dn.common_name}", + "--req-cn=kiwi-vpn-ca", "build-ca", ) + # # this takes long! # self.__easyrsa("gen-dh") return cert def issue( self, cert_type: str = "client", - cn: str = "kiwi-vpn-client", + dn: DistinguishedName = DistinguishedName.build(), ) -> crypto.X509: - config = Config._ + """ + Issue a client or server certificate + """ return self.__build_cert( - Path(f"issued/{cn}.crt"), - config.crypto.cert_expiry_days, + Path(f"issued/{dn.common_name}.crt"), + Config._.crypto.cert_expiry_days, - f"--passin=pass:{self.ca_password}", + *dn.easyrsa_args, f"build-{cert_type}-full", - cn, + dn.common_name, "nopass", ) +# some basic test + if __name__ == "__main__": easy_rsa = EasyRSA() easy_rsa.init_pki() ca = easy_rsa.build_ca() - server = easy_rsa.issue(cert_type="server", cn="kiwi-vpn-server") - client = easy_rsa.issue(cert_type="client", cn="kiwi-vpn-client") + server = easy_rsa.issue("server") + # client = easy_rsa.issue(cert_type="client", cn="kiwi-vpn-client") date_format, encoding = "%Y%m%d%H%M%SZ", "ascii" - for cert in [ca, server, client]: + for cert in (ca, server): print(cert.get_subject().CN) print(cert.get_signature_algorithm().decode(encoding)) print(datetime.strptime(