"approved: bool | None" -> "status: DeviceStatus"

This commit is contained in:
Jörn-Michael Miehe 2022-04-07 08:00:41 +00:00
parent 96a3aed24e
commit aa7becf057
2 changed files with 35 additions and 16 deletions

View file

@ -5,6 +5,7 @@ Python representation of `device` table.
from __future__ import annotations from __future__ import annotations
from datetime import datetime from datetime import datetime
from enum import Enum
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from sqlalchemy.exc import IntegrityError from sqlalchemy.exc import IntegrityError
@ -16,6 +17,15 @@ if TYPE_CHECKING:
from .user import User from .user import User
class DeviceStatus(Enum):
uncertified = "uncertified"
pending = "pending"
certified = "certified"
def __repr__(self) -> str:
return self.value
class DeviceBase(SQLModel): class DeviceBase(SQLModel):
""" """
Common to all representations of devices Common to all representations of devices
@ -37,10 +47,18 @@ class DeviceRead(DeviceBase):
""" """
id: int | None = Field(primary_key=True) id: int | None = Field(primary_key=True)
approved: bool | None = Field(default=None) status_str: str = Field(default=repr(DeviceStatus.uncertified))
expiry: datetime | None = Field(default=None) expiry: datetime | None = Field(default=None)
owner_name: str = Field(foreign_key="user.name") owner_name: str = Field(foreign_key="user.name")
@property
def status(self) -> DeviceStatus:
return DeviceStatus(self.status_str)
# property setters don't work with sqlmodel
def set_status(self, status: DeviceStatus) -> None:
self.status_str = repr(status)
class Device(DeviceRead, table=True): class Device(DeviceRead, table=True):
""" """

View file

@ -3,6 +3,7 @@
""" """
from fastapi import APIRouter, Depends, HTTPException, status from fastapi import APIRouter, Depends, HTTPException, status
from kiwi_vpn_api.db.device import DeviceStatus
from ..db import Device, DeviceCreate, DeviceRead, User from ..db import Device, DeviceCreate, DeviceRead, User
from ..easyrsa import EASYRSA, DistinguishedName from ..easyrsa import EASYRSA, DistinguishedName
@ -116,18 +117,18 @@ async def request_certificate_issuance(
if not current_user.can_edit(device): if not current_user.can_edit(device):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
# can only request for a newly created device # can only "request" on an uncertified device
if device.approved is not None: if device.status is not DeviceStatus.uncertified:
raise HTTPException(status_code=status.HTTP_409_CONFLICT) raise HTTPException(status_code=status.HTTP_409_CONFLICT)
# check if we must wait for approval device.set_status(DeviceStatus.pending)
device.approved = current_user.can_issue
if device.approved: # check if we can issue the certificate immediately
# issue the certificate immediately if current_user.can_issue:
if (certificate := EASYRSA.issue( if (certificate := EASYRSA.issue(
dn=DistinguishedName.build(device) dn=DistinguishedName.build(device)
)) is not None: )) is not None:
device.set_status(DeviceStatus.certified)
device.expiry = certificate.not_valid_after device.expiry = certificate.not_valid_after
# return updated device # return updated device
@ -164,18 +165,18 @@ async def request_certificate_renewal(
if not current_user.can_edit(device): if not current_user.can_edit(device):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
# can only renew an already certified device # can only "renew" on an already certified device
if device.approved is not True: if device.status is not DeviceStatus.certified:
raise HTTPException(status_code=status.HTTP_409_CONFLICT) raise HTTPException(status_code=status.HTTP_409_CONFLICT)
# check if we must wait for approval device.set_status(DeviceStatus.pending)
device.approved = current_user.can_renew
if device.approved: # check if we can renew the certificate immediately
# renew the certificate immediately if current_user.can_renew:
if (certificate := EASYRSA.renew( if (certificate := EASYRSA.renew(
dn=DistinguishedName.build(device) dn=DistinguishedName.build(device)
)) is not None: )) is not None:
device.set_status(DeviceStatus.certified)
device.expiry = certificate.not_valid_after device.expiry = certificate.not_valid_after
# return updated device # return updated device
@ -212,15 +213,15 @@ async def revoke_certificate(
if not current_user.can_edit(device): if not current_user.can_edit(device):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
# can only revoke a currently certified device # can only "revoke" on a currently certified device
if device.approved is not True: if device.status is not DeviceStatus.certified:
raise HTTPException(status_code=status.HTTP_409_CONFLICT) raise HTTPException(status_code=status.HTTP_409_CONFLICT)
# revoke the device certificate # revoke the device certificate
EASYRSA.revoke(dn=DistinguishedName.build(device)) EASYRSA.revoke(dn=DistinguishedName.build(device))
# reset the device # reset the device
device.approved = None device.set_status(DeviceStatus.uncertified)
device.expiry = None device.expiry = None
# return updated device # return updated device