Compare commits

..

No commits in common. "968e9491cf36f949d2e421e37a422c82e4843c78" and "23a806e325818f4228f57f7e0806602529c6d6c1" have entirely different histories.

2 changed files with 41 additions and 171 deletions

View file

@ -194,7 +194,7 @@ class LockableCountry(LockableString):
value: constr(max_length=2) value: constr(max_length=2)
class ServerDN(BaseModel): class DNParts(BaseModel):
""" """
This server's "distinguished name" This server's "distinguished name"
""" """
@ -204,8 +204,6 @@ class ServerDN(BaseModel):
city: LockableString city: LockableString
organization: LockableString organization: LockableString
organizational_unit: LockableString organizational_unit: LockableString
email: LockableString
common_name: str
class CertificateAlgo(Enum): class CertificateAlgo(Enum):
@ -229,10 +227,9 @@ class CryptoConfig(BaseModel):
schemes: list[str] = ["bcrypt"] schemes: list[str] = ["bcrypt"]
# pki settings # pki settings
cert_algo: CertificateAlgo | None
ca_password: str | None ca_password: str | None
ca_expiry_days: int | None cert_algo: CertificateAlgo | None
cert_expiry_days: int | None expiry_days: int | None
@property @property
def context(self) -> CryptContext: def context(self) -> CryptContext:
@ -247,13 +244,15 @@ class Config(BaseModel):
Configuration for `kiwi-vpn-api` Configuration for `kiwi-vpn-api`
""" """
# common name for the server
server_name: str
# may include client-to-client, cipher etc. # may include client-to-client, cipher etc.
openvpn_extra_options: dict[str, Any] | None openvpn_extra_options: dict[str, Any] | None
db: DBConfig db: DBConfig
jwt: JWTConfig jwt: JWTConfig
crypto: CryptoConfig crypto: CryptoConfig
server_dn: ServerDN default_dn: DNParts
__singleton: Config | None = None __singleton: Config | None = None

View file

@ -2,99 +2,14 @@
Python interface to EasyRSA CA. Python interface to EasyRSA CA.
""" """
from __future__ import annotations
import subprocess import subprocess
from datetime import datetime from datetime import datetime
from pathlib import Path from pathlib import Path
from OpenSSL import crypto from OpenSSL import crypto
from passlib import pwd from passlib import pwd
from pydantic import BaseModel
from .config import CertificateAlgo, Config, Settings from .config import Config, Settings
from .db import Device
class DistinguishedName(BaseModel):
"""
An `X.509 distinguished name` (DN) as specified in RFC 5280
"""
country: str
state: str
city: str
organization: str
organizational_unit: str
email: str
common_name: str
@classmethod
def build(cls, device: Device | None = None) -> DistinguishedName:
"""
Create a DN from the current config and an optional device
"""
# extract server DN config
server_dn = Config._.server_dn
result = cls(
country=server_dn.country.value,
state=server_dn.state.value,
city=server_dn.city.value,
organization=server_dn.organization.value,
organizational_unit=server_dn.organizational_unit.value,
email=server_dn.email.value,
common_name=server_dn.common_name,
)
# no device specified -> done
if device is None:
return result
# don't override locked or empty fields
if not (server_dn.country.locked
or device.owner.country is None):
result.country = device.owner.country
if not (server_dn.state.locked
or device.owner.state is None):
result.state = device.owner.state
if not (server_dn.city.locked
or device.owner.city is None):
result.city = device.owner.city
if not (server_dn.organization.locked
or device.owner.organization is None):
result.organization = device.owner.organization
if not (server_dn.organizational_unit.locked
or device.owner.organizational_unit is None):
result.organizational_unit = device.owner.organizational_unit
# definitely use derived email and common_name
result.email = device.owner.email
result.common_name = f"{device.owner.name}_{device.name}"
return result
@property
def easyrsa_args(self) -> tuple[str]:
"""
Pass this DN as arguments to easyrsa
"""
return (
"--dn-mode=org",
f"--req-c={self.country}",
f"--req-st={self.state}",
f"--req-city={self.city}",
f"--req-org={self.organization}",
f"--req-ou={self.organizational_unit}",
f"--req-email={self.email}",
f"--req-cn={self.common_name}",
)
class EasyRSA: class EasyRSA:
@ -103,19 +18,11 @@ class EasyRSA:
""" """
@property @property
def output_directory(self) -> Path: def pki_directory(self) -> Path:
"""
Where certificates are stored
"""
return Settings._.data_dir.joinpath("pki") return Settings._.data_dir.joinpath("pki")
@property @property
def ca_password(self) -> str: def ca_password(self) -> str:
"""
Get CA password from config, or generate a new one
"""
config = Config._ config = Config._
if (ca_password := config.crypto.ca_password) is None: if (ca_password := config.crypto.ca_password) is None:
@ -133,14 +40,10 @@ class EasyRSA:
self, self,
*easyrsa_args: str, *easyrsa_args: str,
) -> subprocess.CompletedProcess: ) -> subprocess.CompletedProcess:
"""
Call the `easyrsa` executable
"""
return subprocess.run( return subprocess.run(
[ [
"easyrsa", "--batch", "easyrsa", "--batch",
f"--pki-dir={self.output_directory}", f"--pki-dir={self.pki_directory}",
*easyrsa_args, *easyrsa_args,
], ],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
@ -150,113 +53,81 @@ class EasyRSA:
def __build_cert( def __build_cert(
self, self,
cert_filename: Path, cert_filename: Path,
expiry_days: int | None,
*easyrsa_args: str, *easyrsa_args: str,
) -> crypto.X509: ) -> crypto.X509:
""" self.__easyrsa(*easyrsa_args)
Create an X.509 certificate
"""
config = Config._
extra_args: tuple[str] = (
f"--passout=pass:{self.ca_password}",
f"--passin=pass:{self.ca_password}",
)
if expiry_days is not None:
extra_args += tuple([f"--days={expiry_days}"])
if (algo := config.crypto.cert_algo) is not None:
if algo is CertificateAlgo.rsa2048:
extra_args += ("--use-algo=rsa", "--keysize=2048")
elif algo is CertificateAlgo.rsa4096:
extra_args += ("--use-algo=rsa", "--keysize=4096")
elif algo is CertificateAlgo.secp256r1:
extra_args += ("--use-algo=ec", "--curve=secp256r1")
elif algo is CertificateAlgo.secp384r1:
extra_args += ("--use-algo=ec", "--curve=secp384r1")
elif algo is CertificateAlgo.ed25519:
extra_args += ("--use-algo=ed", "--curve=ed25519")
else:
raise ValueError(f"Unexpected algorithm: {algo}")
self.__easyrsa(
*extra_args,
*easyrsa_args
)
with open( with open(
self.output_directory.joinpath(cert_filename), "r" self.pki_directory.joinpath(cert_filename), "r"
) as cert_file: ) as cert_file:
return crypto.load_certificate( return crypto.load_certificate(
crypto.FILETYPE_PEM, cert_file.read() crypto.FILETYPE_PEM, cert_file.read()
) )
def init_pki(self) -> bool: def init_pki(self) -> bool:
"""
Clean the working directory
"""
self.__easyrsa("init-pki") self.__easyrsa("init-pki")
def build_ca(self) -> crypto.X509: def build_ca(
""" self,
Build the CA certificate ) -> crypto.X509:
""" config = Config._
cert = self.__build_cert( cert = self.__build_cert(
Path("ca.crt"), Path("ca.crt"),
Config._.crypto.ca_expiry_days,
"--req-cn=kiwi-vpn-ca", f"--passout=pass:{self.ca_password}",
f"--passin=pass:{self.ca_password}",
# "--dn-mode=org",
# "--req-c=EX",
# "--req-st=EXAMPLE",
# "--req-city=EXAMPLE",
# "--req-org=EXAMPLE",
# "--req-ou=EXAMPLE",
# "--req-email=EXAMPLE",
f"--req-cn={config.server_name}",
f"--days={config.crypto.expiry_days}",
# "--use-algo=ed",
# "--curve=ed25519",
"build-ca", "build-ca",
) )
# # this takes long!
# self.__easyrsa("gen-dh") # self.__easyrsa("gen-dh")
return cert return cert
def issue( def issue(
self, self,
cert_type: str = "client", cn: str = "kiwi-vpn-client",
dn: DistinguishedName = DistinguishedName.build(), cert_type: str = "client"
) -> crypto.X509: ) -> crypto.X509:
""" config = Config._
Issue a client or server certificate
"""
return self.__build_cert( return self.__build_cert(
Path(f"issued/{dn.common_name}.crt"), Path(f"issued/{cn}.crt"),
Config._.crypto.cert_expiry_days,
*dn.easyrsa_args, f"--passin=pass:{self.ca_password}",
f"--days={config.crypto.expiry_days}",
f"build-{cert_type}-full", f"build-{cert_type}-full",
dn.common_name, cn,
"nopass", "nopass",
) )
# some basic test
if __name__ == "__main__": if __name__ == "__main__":
easy_rsa = EasyRSA() easy_rsa = EasyRSA()
easy_rsa.init_pki() easy_rsa.init_pki()
ca = easy_rsa.build_ca() ca = easy_rsa.build_ca()
server = easy_rsa.issue("server") server = easy_rsa.issue(cert_type="server", cn="kiwi-vpn-server")
# client = easy_rsa.issue(cert_type="client", cn="kiwi-vpn-client") client = easy_rsa.issue(cert_type="client", cn="kiwi-vpn-client")
date_format, encoding = "%Y%m%d%H%M%SZ", "ascii" date_format, encoding = "%Y%m%d%H%M%SZ", "ascii"
for cert in (ca, server): for cert in [ca, server, client]:
print(cert.get_subject().CN) print(cert.get_subject().CN)
print(cert.get_signature_algorithm().decode(encoding)) print(cert.get_signature_algorithm().decode(encoding))
print(datetime.strptime( print(datetime.strptime(