Compare commits

...

5 commits

6 changed files with 61 additions and 73 deletions

View file

@ -56,6 +56,9 @@ class Device(DeviceRead, table=True):
# might be a future problem?
owner: User = Relationship(
back_populates="devices",
sa_relationship_kwargs={
"lazy": "joined",
},
)
@classmethod

View file

@ -282,22 +282,24 @@ class User(UserBase, table=True):
# deny be default
return False
def can_issue(self, device: Device) -> bool:
@property
def can_issue(self) -> bool:
"""
Check if this user can issue a certificate without approval.
"""
return (
device.approved in (None, False)
and (self.is_admin or self.has_tag(TagValue.issue))
self.is_admin
or self.has_tag(TagValue.issue)
)
def can_renew(self, device: Device) -> bool:
@property
def can_renew(self) -> bool:
"""
Check if this user can renew a certificate without approval.
"""
return (
device.approved is True
and (self.is_admin or self.has_tag(TagValue.renew))
self.is_admin
or self.has_tag(TagValue.renew)
)

View file

@ -4,12 +4,12 @@ Python interface to EasyRSA CA.
from __future__ import annotations
import functools
import subprocess
from datetime import datetime
from enum import Enum, auto
from pathlib import Path
from OpenSSL import crypto
from cryptography import x509
from passlib import pwd
from pydantic import BaseModel
@ -140,6 +140,20 @@ class EasyRSA:
None: {},
}
@classmethod
@functools.lru_cache
def _load(cls) -> EasyRSA:
return cls()
@classmethod
@property
def _(cls) -> EasyRSA:
"""
Get the singleton
"""
return cls._load()
@property
def output_directory(self) -> Path:
"""
@ -196,7 +210,7 @@ class EasyRSA:
cert_filename: Path,
*easyrsa_cmd: str,
**easyrsa_env: str,
) -> crypto.X509:
) -> x509.Certificate:
"""
Create an X.509 certificate
"""
@ -231,8 +245,8 @@ class EasyRSA:
with open(
self.output_directory.joinpath(cert_filename), "rb"
) as cert_file:
return crypto.load_certificate(
crypto.FILETYPE_PEM, cert_file.read()
return x509.load_pem_x509_certificate(
cert_file.read()
)
def init_pki(self) -> None:
@ -242,7 +256,7 @@ class EasyRSA:
self.__easyrsa("init-pki")
def build_ca(self) -> crypto.X509:
def build_ca(self) -> x509.Certificate:
"""
Build the CA certificate
"""
@ -263,7 +277,7 @@ class EasyRSA:
self,
cert_type: CertificateType = CertificateType.client,
dn: DistinguishedName | None = None,
) -> crypto.X509 | None:
) -> x509.Certificate | None:
"""
Issue a client or server certificate
"""
@ -302,18 +316,12 @@ if __name__ == "__main__":
Connection.connect(current_config.db.uri)
if (device := Device.get(1)) is not None:
with Connection.session as db:
db.add(device)
dn = DistinguishedName.build(device)
client = easy_rsa.issue(dn=dn)
date_format, encoding = "%Y%m%d%H%M%SZ", "ascii"
client = easy_rsa.issue(
dn=DistinguishedName.build(device)
)
for cert in (ca, server, client):
if cert is not None:
print(cert.get_subject().CN)
print(cert.get_signature_algorithm().decode(encoding))
assert (na := cert.get_notAfter()) is not None
print(datetime.strptime(na.decode(encoding), date_format))
print(cert.subject)
print(cert.signature_hash_algorithm)
print(cert.not_valid_after)

View file

@ -2,12 +2,10 @@
/device endpoints.
"""
from datetime import datetime
from fastapi import APIRouter, Depends, HTTPException, status
from ..db import Connection, Device, DeviceCreate, DeviceRead, User
from ..easyrsa import CertificateType, DistinguishedName, EasyRSA
from ..db import Device, DeviceCreate, DeviceRead, User
from ..easyrsa import DistinguishedName, EasyRSA
from ._common import (Responses, get_current_user, get_device_by_id,
get_user_by_name)
@ -81,47 +79,43 @@ async def remove_device(
@router.post(
"/{device_id}/csr",
"/{device_id}/issue",
responses={
status.HTTP_200_OK: Responses.OK,
status.HTTP_400_BAD_REQUEST: Responses.NOT_INSTALLED,
status.HTTP_401_UNAUTHORIZED: Responses.NEEDS_USER,
status.HTTP_403_FORBIDDEN: Responses.NEEDS_PERMISSION,
status.HTTP_404_NOT_FOUND: Responses.ENTRY_DOESNT_EXIST,
status.HTTP_409_CONFLICT: Responses.ENTRY_EXISTS,
},
response_model=DeviceRead,
)
async def request_certificate(
current_user: User = Depends(get_current_user),
device: Device = Depends(get_device_by_id),
):
) -> Device:
"""
POST ./{device_id}/csr: Request certificate for a device.
POST ./{device_id}/issue: Request certificate for a device.
"""
# check permission
if not current_user.can_edit(device):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
easy_rsa = EasyRSA()
# cannot request for a newly created device
if device.approved is not None:
raise HTTPException(status_code=status.HTTP_409_CONFLICT)
with Connection.session as db:
db.add(device)
dn = DistinguishedName.build(device)
# check if we must wait for approval
device.approved = current_user.can_issue
if current_user.can_issue(device):
device.approved = True
if (cert := easy_rsa.issue(
dn=dn,
cert_type=CertificateType.server,
if device.approved:
# issue the certificate immediately
if (certificate := EasyRSA._.issue(
dn=DistinguishedName.build(device)
)) is not None:
assert (expiry := cert.get_notAfter()) is not None
device.expiry = certificate.not_valid_after
date_format, encoding = "%Y%m%d%H%M%SZ", "ascii"
expiry = datetime.strptime(
expiry.decode(encoding),
date_format,
)
device.expiry = expiry
db.commit()
# return updated device
device.update()
return device

21
api/poetry.lock generated
View file

@ -292,21 +292,6 @@ typing-extensions = ">=3.7.4.3"
dotenv = ["python-dotenv (>=0.10.4)"]
email = ["email-validator (>=1.0.3)"]
[[package]]
name = "pyopenssl"
version = "22.0.0"
description = "Python wrapper module around the OpenSSL library"
category = "main"
optional = false
python-versions = ">=3.6"
[package.dependencies]
cryptography = ">=35.0"
[package.extras]
docs = ["sphinx", "sphinx-rtd-theme"]
test = ["flaky", "pretend", "pytest (>=3.0.1)"]
[[package]]
name = "pyparsing"
version = "3.0.7"
@ -501,7 +486,7 @@ standard = ["websockets (>=10.0)", "httptools (>=0.4.0)", "watchgod (>=0.6)", "p
[metadata]
lock-version = "1.1"
python-versions = "^3.10"
content-hash = "ec07664a3624e6204beb2371bccc164ca1029f6e80663a9bd5946f4eaea04ca1"
content-hash = "36a56b6982734607590597302276605f8977119869934f35116e72377905b6b5"
[metadata.files]
anyio = [
@ -790,10 +775,6 @@ pydantic = [
{file = "pydantic-1.9.0-py3-none-any.whl", hash = "sha256:085ca1de245782e9b46cefcf99deecc67d418737a1fd3f6a4f511344b613a5b3"},
{file = "pydantic-1.9.0.tar.gz", hash = "sha256:742645059757a56ecd886faf4ed2441b9c0cd406079c2b4bee51bcc3fbcd510a"},
]
pyopenssl = [
{file = "pyOpenSSL-22.0.0-py2.py3-none-any.whl", hash = "sha256:ea252b38c87425b64116f808355e8da644ef9b07e429398bfece610f893ee2e0"},
{file = "pyOpenSSL-22.0.0.tar.gz", hash = "sha256:660b1b1425aac4a1bea1d94168a85d99f0b3144c869dd4390d27629d0087f1bf"},
]
pyparsing = [
{file = "pyparsing-3.0.7-py3-none-any.whl", hash = "sha256:a6c06a88f252e6c322f65faf8f418b16213b51bdfaece0524c1c1bc30c63c484"},
{file = "pyparsing-3.0.7.tar.gz", hash = "sha256:18ee9022775d270c55187733956460083db60b37d0d0fb357445f3094eed3eea"},

View file

@ -9,11 +9,11 @@ python = "^3.10"
fastapi = "^0.75.0"
passlib = {extras = ["argon2", "bcrypt"], version = "^1.7.4"}
pyOpenSSL = "^22.0.0"
python-jose = {extras = ["cryptography"], version = "^3.3.0"}
python-multipart = "^0.0.5"
sqlmodel = "^0.0.6"
uvicorn = "^0.17.6"
cryptography = "^36.0.2"
[tool.poetry.dev-dependencies]
pytest = "^7.1.0"