From aa7becf057cf4a030eed85e5bbebd51730afa3a0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn-Michael=20Miehe?= <40151420+ldericher@users.noreply.github.com> Date: Thu, 7 Apr 2022 08:00:41 +0000 Subject: [PATCH] "approved: bool | None" -> "status: DeviceStatus" --- api/kiwi_vpn_api/db/device.py | 20 ++++++++++++++++++- api/kiwi_vpn_api/routers/device.py | 31 +++++++++++++++--------------- 2 files changed, 35 insertions(+), 16 deletions(-) diff --git a/api/kiwi_vpn_api/db/device.py b/api/kiwi_vpn_api/db/device.py index 1b98122..97def63 100644 --- a/api/kiwi_vpn_api/db/device.py +++ b/api/kiwi_vpn_api/db/device.py @@ -5,6 +5,7 @@ Python representation of `device` table. from __future__ import annotations from datetime import datetime +from enum import Enum from typing import TYPE_CHECKING from sqlalchemy.exc import IntegrityError @@ -16,6 +17,15 @@ if TYPE_CHECKING: from .user import User +class DeviceStatus(Enum): + uncertified = "uncertified" + pending = "pending" + certified = "certified" + + def __repr__(self) -> str: + return self.value + + class DeviceBase(SQLModel): """ Common to all representations of devices @@ -37,10 +47,18 @@ class DeviceRead(DeviceBase): """ id: int | None = Field(primary_key=True) - approved: bool | None = Field(default=None) + status_str: str = Field(default=repr(DeviceStatus.uncertified)) expiry: datetime | None = Field(default=None) owner_name: str = Field(foreign_key="user.name") + @property + def status(self) -> DeviceStatus: + return DeviceStatus(self.status_str) + + # property setters don't work with sqlmodel + def set_status(self, status: DeviceStatus) -> None: + self.status_str = repr(status) + class Device(DeviceRead, table=True): """ diff --git a/api/kiwi_vpn_api/routers/device.py b/api/kiwi_vpn_api/routers/device.py index 3f78607..28b0450 100644 --- a/api/kiwi_vpn_api/routers/device.py +++ b/api/kiwi_vpn_api/routers/device.py @@ -3,6 +3,7 @@ """ from fastapi import APIRouter, Depends, HTTPException, status +from kiwi_vpn_api.db.device import DeviceStatus from ..db import Device, DeviceCreate, DeviceRead, User from ..easyrsa import EASYRSA, DistinguishedName @@ -116,18 +117,18 @@ async def request_certificate_issuance( if not current_user.can_edit(device): raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) - # can only request for a newly created device - if device.approved is not None: + # can only "request" on an uncertified device + if device.status is not DeviceStatus.uncertified: raise HTTPException(status_code=status.HTTP_409_CONFLICT) - # check if we must wait for approval - device.approved = current_user.can_issue + device.set_status(DeviceStatus.pending) - if device.approved: - # issue the certificate immediately + # check if we can issue the certificate immediately + if current_user.can_issue: if (certificate := EASYRSA.issue( dn=DistinguishedName.build(device) )) is not None: + device.set_status(DeviceStatus.certified) device.expiry = certificate.not_valid_after # return updated device @@ -164,18 +165,18 @@ async def request_certificate_renewal( if not current_user.can_edit(device): raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) - # can only renew an already certified device - if device.approved is not True: + # can only "renew" on an already certified device + if device.status is not DeviceStatus.certified: raise HTTPException(status_code=status.HTTP_409_CONFLICT) - # check if we must wait for approval - device.approved = current_user.can_renew + device.set_status(DeviceStatus.pending) - if device.approved: - # renew the certificate immediately + # check if we can renew the certificate immediately + if current_user.can_renew: if (certificate := EASYRSA.renew( dn=DistinguishedName.build(device) )) is not None: + device.set_status(DeviceStatus.certified) device.expiry = certificate.not_valid_after # return updated device @@ -212,15 +213,15 @@ async def revoke_certificate( if not current_user.can_edit(device): raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) - # can only revoke a currently certified device - if device.approved is not True: + # can only "revoke" on a currently certified device + if device.status is not DeviceStatus.certified: raise HTTPException(status_code=status.HTTP_409_CONFLICT) # revoke the device certificate EASYRSA.revoke(dn=DistinguishedName.build(device)) # reset the device - device.approved = None + device.set_status(DeviceStatus.uncertified) device.expiry = None # return updated device