Compare commits

..

No commits in common. "2d755b8e3d8b1cf7ee9cf36e9c5af6d6cd4c8477" and "d8bdb46a5c62f16733fa95c727f417ac5b815840" have entirely different histories.

6 changed files with 73 additions and 61 deletions

View file

@ -56,9 +56,6 @@ class Device(DeviceRead, table=True):
# might be a future problem?
owner: User = Relationship(
back_populates="devices",
sa_relationship_kwargs={
"lazy": "joined",
},
)
@classmethod

View file

@ -282,24 +282,22 @@ class User(UserBase, table=True):
# deny be default
return False
@property
def can_issue(self) -> bool:
def can_issue(self, device: Device) -> bool:
"""
Check if this user can issue a certificate without approval.
"""
return (
self.is_admin
or self.has_tag(TagValue.issue)
device.approved in (None, False)
and (self.is_admin or self.has_tag(TagValue.issue))
)
@property
def can_renew(self) -> bool:
def can_renew(self, device: Device) -> bool:
"""
Check if this user can renew a certificate without approval.
"""
return (
self.is_admin
or self.has_tag(TagValue.renew)
device.approved is True
and (self.is_admin or self.has_tag(TagValue.renew))
)

View file

@ -4,12 +4,12 @@ Python interface to EasyRSA CA.
from __future__ import annotations
import functools
import subprocess
from datetime import datetime
from enum import Enum, auto
from pathlib import Path
from cryptography import x509
from OpenSSL import crypto
from passlib import pwd
from pydantic import BaseModel
@ -140,20 +140,6 @@ class EasyRSA:
None: {},
}
@classmethod
@functools.lru_cache
def _load(cls) -> EasyRSA:
return cls()
@classmethod
@property
def _(cls) -> EasyRSA:
"""
Get the singleton
"""
return cls._load()
@property
def output_directory(self) -> Path:
"""
@ -210,7 +196,7 @@ class EasyRSA:
cert_filename: Path,
*easyrsa_cmd: str,
**easyrsa_env: str,
) -> x509.Certificate:
) -> crypto.X509:
"""
Create an X.509 certificate
"""
@ -245,8 +231,8 @@ class EasyRSA:
with open(
self.output_directory.joinpath(cert_filename), "rb"
) as cert_file:
return x509.load_pem_x509_certificate(
cert_file.read()
return crypto.load_certificate(
crypto.FILETYPE_PEM, cert_file.read()
)
def init_pki(self) -> None:
@ -256,7 +242,7 @@ class EasyRSA:
self.__easyrsa("init-pki")
def build_ca(self) -> x509.Certificate:
def build_ca(self) -> crypto.X509:
"""
Build the CA certificate
"""
@ -277,7 +263,7 @@ class EasyRSA:
self,
cert_type: CertificateType = CertificateType.client,
dn: DistinguishedName | None = None,
) -> x509.Certificate | None:
) -> crypto.X509 | None:
"""
Issue a client or server certificate
"""
@ -316,12 +302,18 @@ if __name__ == "__main__":
Connection.connect(current_config.db.uri)
if (device := Device.get(1)) is not None:
client = easy_rsa.issue(
dn=DistinguishedName.build(device)
)
with Connection.session as db:
db.add(device)
dn = DistinguishedName.build(device)
client = easy_rsa.issue(dn=dn)
date_format, encoding = "%Y%m%d%H%M%SZ", "ascii"
for cert in (ca, server, client):
if cert is not None:
print(cert.subject)
print(cert.signature_hash_algorithm)
print(cert.not_valid_after)
print(cert.get_subject().CN)
print(cert.get_signature_algorithm().decode(encoding))
assert (na := cert.get_notAfter()) is not None
print(datetime.strptime(na.decode(encoding), date_format))

View file

@ -2,10 +2,12 @@
/device endpoints.
"""
from datetime import datetime
from fastapi import APIRouter, Depends, HTTPException, status
from ..db import Device, DeviceCreate, DeviceRead, User
from ..easyrsa import DistinguishedName, EasyRSA
from ..db import Connection, Device, DeviceCreate, DeviceRead, User
from ..easyrsa import CertificateType, DistinguishedName, EasyRSA
from ._common import (Responses, get_current_user, get_device_by_id,
get_user_by_name)
@ -79,43 +81,47 @@ async def remove_device(
@router.post(
"/{device_id}/issue",
"/{device_id}/csr",
responses={
status.HTTP_200_OK: Responses.OK,
status.HTTP_400_BAD_REQUEST: Responses.NOT_INSTALLED,
status.HTTP_401_UNAUTHORIZED: Responses.NEEDS_USER,
status.HTTP_403_FORBIDDEN: Responses.NEEDS_PERMISSION,
status.HTTP_404_NOT_FOUND: Responses.ENTRY_DOESNT_EXIST,
status.HTTP_409_CONFLICT: Responses.ENTRY_EXISTS,
},
response_model=DeviceRead,
)
async def request_certificate(
current_user: User = Depends(get_current_user),
device: Device = Depends(get_device_by_id),
) -> Device:
):
"""
POST ./{device_id}/issue: Request certificate for a device.
POST ./{device_id}/csr: Request certificate for a device.
"""
# check permission
if not current_user.can_edit(device):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
# cannot request for a newly created device
if device.approved is not None:
raise HTTPException(status_code=status.HTTP_409_CONFLICT)
easy_rsa = EasyRSA()
# check if we must wait for approval
device.approved = current_user.can_issue
with Connection.session as db:
db.add(device)
dn = DistinguishedName.build(device)
if device.approved:
# issue the certificate immediately
if (certificate := EasyRSA._.issue(
dn=DistinguishedName.build(device)
)) is not None:
device.expiry = certificate.not_valid_after
if current_user.can_issue(device):
device.approved = True
# return updated device
device.update()
return device
if (cert := easy_rsa.issue(
dn=dn,
cert_type=CertificateType.server,
)) is not None:
assert (expiry := cert.get_notAfter()) is not None
date_format, encoding = "%Y%m%d%H%M%SZ", "ascii"
expiry = datetime.strptime(
expiry.decode(encoding),
date_format,
)
device.expiry = expiry
db.commit()

21
api/poetry.lock generated
View file

@ -292,6 +292,21 @@ typing-extensions = ">=3.7.4.3"
dotenv = ["python-dotenv (>=0.10.4)"]
email = ["email-validator (>=1.0.3)"]
[[package]]
name = "pyopenssl"
version = "22.0.0"
description = "Python wrapper module around the OpenSSL library"
category = "main"
optional = false
python-versions = ">=3.6"
[package.dependencies]
cryptography = ">=35.0"
[package.extras]
docs = ["sphinx", "sphinx-rtd-theme"]
test = ["flaky", "pretend", "pytest (>=3.0.1)"]
[[package]]
name = "pyparsing"
version = "3.0.7"
@ -486,7 +501,7 @@ standard = ["websockets (>=10.0)", "httptools (>=0.4.0)", "watchgod (>=0.6)", "p
[metadata]
lock-version = "1.1"
python-versions = "^3.10"
content-hash = "36a56b6982734607590597302276605f8977119869934f35116e72377905b6b5"
content-hash = "ec07664a3624e6204beb2371bccc164ca1029f6e80663a9bd5946f4eaea04ca1"
[metadata.files]
anyio = [
@ -775,6 +790,10 @@ pydantic = [
{file = "pydantic-1.9.0-py3-none-any.whl", hash = "sha256:085ca1de245782e9b46cefcf99deecc67d418737a1fd3f6a4f511344b613a5b3"},
{file = "pydantic-1.9.0.tar.gz", hash = "sha256:742645059757a56ecd886faf4ed2441b9c0cd406079c2b4bee51bcc3fbcd510a"},
]
pyopenssl = [
{file = "pyOpenSSL-22.0.0-py2.py3-none-any.whl", hash = "sha256:ea252b38c87425b64116f808355e8da644ef9b07e429398bfece610f893ee2e0"},
{file = "pyOpenSSL-22.0.0.tar.gz", hash = "sha256:660b1b1425aac4a1bea1d94168a85d99f0b3144c869dd4390d27629d0087f1bf"},
]
pyparsing = [
{file = "pyparsing-3.0.7-py3-none-any.whl", hash = "sha256:a6c06a88f252e6c322f65faf8f418b16213b51bdfaece0524c1c1bc30c63c484"},
{file = "pyparsing-3.0.7.tar.gz", hash = "sha256:18ee9022775d270c55187733956460083db60b37d0d0fb357445f3094eed3eea"},

View file

@ -9,11 +9,11 @@ python = "^3.10"
fastapi = "^0.75.0"
passlib = {extras = ["argon2", "bcrypt"], version = "^1.7.4"}
pyOpenSSL = "^22.0.0"
python-jose = {extras = ["cryptography"], version = "^3.3.0"}
python-multipart = "^0.0.5"
sqlmodel = "^0.0.6"
uvicorn = "^0.17.6"
cryptography = "^36.0.2"
[tool.poetry.dev-dependencies]
pytest = "^7.1.0"