kiwi-vpn/api/kiwi_vpn_api/easyrsa.py

396 lines
9.8 KiB
Python

"""
Python interface to EasyRSA CA.
"""
from __future__ import annotations
import subprocess
from enum import Enum, auto
from pathlib import Path
from cryptography import x509
from passlib import pwd
from pydantic import BaseModel
from .config import SETTINGS, Config, KeyAlgorithm
from .db import Connection, Device
class DistinguishedName(BaseModel):
"""
An `X.509 distinguished name` (DN) as specified in RFC 5280
"""
country: str
state: str
city: str
organization: str
organizational_unit: str
email: str
common_name: str
@classmethod
def build(cls, device: Device | None = None) -> DistinguishedName:
"""
Create a DN from the current config and an optional device
"""
# extract server DN config
server_dn = Config._.server_dn
result = cls(
country=server_dn.country.value,
state=server_dn.state.value,
city=server_dn.city.value,
organization=server_dn.organization.value,
organizational_unit=server_dn.organizational_unit.value,
email=server_dn.email.value,
common_name=server_dn.common_name,
)
# no device specified -> done
if device is None:
return result
# don't override locked or empty fields
if not (server_dn.country.locked
or device.owner.country is None):
result.country = device.owner.country
if not (server_dn.state.locked
or device.owner.state is None):
result.state = device.owner.state
if not (server_dn.city.locked
or device.owner.city is None):
result.city = device.owner.city
if not (server_dn.organization.locked
or device.owner.organization is None):
result.organization = device.owner.organization
if not (server_dn.organizational_unit.locked
or device.owner.organizational_unit is None):
result.organizational_unit = device.owner.organizational_unit
# definitely use derived email and common_name
result.email = device.owner.email
result.common_name = f"{device.owner.name}_{device.name}"
return result
@property
def easyrsa_env(self) -> dict[str, str]:
"""
Pass this DN as arguments to easyrsa
"""
return {
"EASYRSA_DN": "org",
"EASYRSA_REQ_COUNTRY": self.country,
"EASYRSA_REQ_PROVINCE": self.state,
"EASYRSA_REQ_CITY": self.city,
"EASYRSA_REQ_ORG": self.organization,
"EASYRSA_REQ_OU": self.organizational_unit,
"EASYRSA_REQ_EMAIL": self.email,
"EASYRSA_REQ_CN": self.common_name,
}
class CertificateType(Enum):
"""
Possible types of certificates
"""
ca = auto()
client = auto()
server = auto()
def __str__(self) -> str:
return self._name_
class EasyRSA:
"""
Represents an EasyRSA PKI.
"""
__mapKeyAlgorithm = {
KeyAlgorithm.rsa2048: {
"EASYRSA_ALGO": "rsa",
"EASYRSA_KEY_SIZE": "2048",
},
KeyAlgorithm.rsa4096: {
"EASYRSA_ALGO": "rsa",
"EASYRSA_KEY_SIZE": "4096",
},
KeyAlgorithm.secp256r1: {
"EASYRSA_ALGO": "ec",
"EASYRSA_CURVE": "secp256r1",
},
KeyAlgorithm.secp384r1: {
"EASYRSA_ALGO": "ec",
"EASYRSA_CURVE": "secp384r1",
},
KeyAlgorithm.ed25519: {
"EASYRSA_ALGO": "ed",
"EASYRSA_CURVE": "ed25519",
},
None: {},
}
@property
def output_directory(self) -> Path:
"""
Where certificates are stored
"""
return SETTINGS.data_dir.joinpath("pki")
@property
def ca_password(self) -> str:
"""
Get CA password from config, or generate a new one
"""
config = Config._
if (ca_password := config.crypto.ca_password) is None:
# generate and save new CA password
ca_password = pwd.genword(
length=32,
charset="ascii_62",
)
config.crypto.ca_password = ca_password
config.save()
return ca_password
def __easyrsa(
self,
*easyrsa_cmd: str,
**easyrsa_env: str,
) -> subprocess.CompletedProcess:
"""
Call the `easyrsa` executable
"""
return subprocess.run(
[
"/usr/local/bin/easyrsa",
*easyrsa_cmd,
],
env={
# base settings
"EASYRSA_BATCH": "1",
"EASYRSA_PKI": str(self.output_directory),
# always include CA password
"EASYRSA_PASSOUT": f"pass:{self.ca_password}",
"EASYRSA_PASSIN": f"pass:{self.ca_password}",
# include env from parameters
**easyrsa_env,
},
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
check=True,
)
def __build_cert(
self,
*easyrsa_cmd: str,
**easyrsa_env: str,
) -> None:
"""
Create an X.509 certificate
"""
config = Config._
if ((algorithm := config.crypto.key_algorithm)
not in EasyRSA.__mapKeyAlgorithm):
raise ValueError(f"Unexpected algorithm: {algorithm}")
# include expiry options
if (ca_expiry_days := config.crypto.ca_expiry_days) is not None:
easyrsa_env["EASYRSA_CA_EXPIRE"] = str(ca_expiry_days)
if (cert_expiry_days := config.crypto.cert_expiry_days) is not None:
easyrsa_env["EASYRSA_CERT_EXPIRE"] = str(cert_expiry_days)
try:
# call easyrsa
self.__easyrsa(
*easyrsa_cmd,
# include algorithm options
**EasyRSA.__mapKeyAlgorithm[algorithm],
**easyrsa_env,
)
except (subprocess.CalledProcessError):
# certificate couldn't be built
pass
return None
def get_certificate(
self,
*,
cert_type: CertificateType | None = None,
dn: DistinguishedName | None = None,
) -> x509.Certificate | None:
"""
Get a certificate from the PKI directory
"""
if cert_type is CertificateType.ca:
cert_filename = self.output_directory.joinpath("ca.crt")
else:
if dn is None:
dn = DistinguishedName.build()
cert_filename = (self.output_directory.joinpath("issued")
.joinpath(f"{dn.common_name}.crt"))
try:
# parse the certificate
with open(cert_filename, "rb") as cert_file:
return x509.load_pem_x509_certificate(
cert_file.read()
)
except FileNotFoundError:
return None
def init_pki(self) -> None:
"""
Clean working directory
"""
self.__easyrsa("init-pki")
def build_ca(self) -> x509.Certificate:
"""
Build the CA certificate
"""
self.__build_cert(
"build-ca",
EASYRSA_DN="cn_only",
EASYRSA_REQ_CN="kiwi-vpn-ca",
)
cert = self.get_certificate(cert_type=CertificateType.ca)
assert cert is not None
# # this takes long!
# self.__easyrsa("gen-dh")
return cert
def issue(
self,
cert_type: CertificateType = CertificateType.client,
dn: DistinguishedName | None = None,
) -> x509.Certificate | None:
"""
Issue a client or server certificate
"""
if dn is None:
dn = DistinguishedName.build()
if not (cert_type is CertificateType.client
or cert_type is CertificateType.server):
return None
self.__build_cert(
f"build-{cert_type}-full",
dn.common_name,
"nopass",
**dn.easyrsa_env,
)
return self.get_certificate(
cert_type=cert_type,
dn=dn,
)
def renew(
self,
dn: DistinguishedName | None = None,
) -> x509.Certificate | None:
"""
Renew a client or server certificate
"""
if dn is None:
dn = DistinguishedName.build()
self.__build_cert(
"renew",
dn.common_name,
"nopass",
# allow renewal 14 days before cert expiry
EASYRSA_CERT_RENEW="14",
**dn.easyrsa_env,
)
return self.get_certificate(dn=dn)
def revoke(
self,
dn: DistinguishedName | None = None,
) -> bool:
"""
Revoke a client or server certificate
"""
if dn is None:
dn = DistinguishedName.build()
try:
self.__easyrsa(
"revoke",
dn.common_name,
**dn.easyrsa_env,
)
except subprocess.CalledProcessError:
return False
return True
EASYRSA = EasyRSA()
# some basic test
if __name__ == "__main__":
ca = EASYRSA.build_ca()
server = EASYRSA.issue(CertificateType.server)
client = None
# check if configured
if (current_config := Config.load()) is not None:
# connect to database
Connection.connect(current_config.db.uri)
if (device := Device.get(1)) is not None:
client = EASYRSA.issue(
dn=DistinguishedName.build(device)
)
for cert in (ca, server, client):
if cert is not None:
print(cert.subject)
print(cert.signature_hash_algorithm)
print(cert.not_valid_after)