""" /device endpoints. """ from fastapi import APIRouter, Depends, HTTPException, status from ..db import Device, DeviceCreate, DeviceRead, DeviceStatus, User from ..easyrsa import DistinguishedName, EasyRSA from ._common import (Responses, get_current_user, get_device_by_id, get_pki, get_user_by_name) router = APIRouter(prefix="/device", tags=["device"]) @router.post( "/{user_name}", responses={ status.HTTP_201_CREATED: Responses.ENTRY_ADDED, status.HTTP_400_BAD_REQUEST: Responses.NOT_INSTALLED, status.HTTP_401_UNAUTHORIZED: Responses.NEEDS_USER, status.HTTP_403_FORBIDDEN: Responses.NEEDS_PERMISSION, status.HTTP_409_CONFLICT: Responses.ENTRY_EXISTS, }, response_model=DeviceRead, status_code=status.HTTP_201_CREATED, ) async def add_device( device: DeviceCreate, current_user: User = Depends(get_current_user), owner: User = Depends(get_user_by_name), ) -> Device: """ POST ./: Create a new device in the database. Status: - 403: no user permission to create device - 409: device creation unsuccessful """ # check permission if not current_user.can_create(Device, owner): raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) # create the new device new_device = Device.create( owner=owner, device=device, ) # fail if creation was unsuccessful if new_device is None: raise HTTPException(status_code=status.HTTP_409_CONFLICT) # return the created device on success return new_device @router.delete( "/{device_id}", responses={ status.HTTP_200_OK: Responses.OK, status.HTTP_400_BAD_REQUEST: Responses.NOT_INSTALLED, status.HTTP_401_UNAUTHORIZED: Responses.NEEDS_USER, status.HTTP_403_FORBIDDEN: Responses.NEEDS_PERMISSION, status.HTTP_404_NOT_FOUND: Responses.ENTRY_DOESNT_EXIST, }, response_model=User, ) async def remove_device( current_user: User = Depends(get_current_user), device: Device = Depends(get_device_by_id), ): """ DELETE ./{device_id}: Remove a device from the database. Status: - 403: no user permission to edit device """ # check permission if not current_user.can_edit(device): raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) # delete device device.delete() @router.post( "/{device_id}/issue", responses={ status.HTTP_200_OK: Responses.OK, status.HTTP_400_BAD_REQUEST: Responses.NOT_INSTALLED, status.HTTP_401_UNAUTHORIZED: Responses.NEEDS_USER, status.HTTP_403_FORBIDDEN: Responses.NEEDS_PERMISSION, status.HTTP_404_NOT_FOUND: Responses.ENTRY_DOESNT_EXIST, status.HTTP_409_CONFLICT: Responses.ENTRY_EXISTS, status.HTTP_425_TOO_EARLY: Responses.NEEDS_PKI, }, response_model=DeviceRead, ) async def request_certificate_issuance( current_user: User = Depends(get_current_user), device: Device = Depends(get_device_by_id), pki: EasyRSA = Depends(get_pki), ) -> Device: """ POST ./{device_id}/issue: Request certificate issuance for a device. Status: - 403: no user permission to edit device - 409: device certificate cannot be "issued" """ # check permission if not current_user.can_edit(device): raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) # can only "request" on an uncertified device if device.status is not DeviceStatus.uncertified: raise HTTPException(status_code=status.HTTP_409_CONFLICT) device.set_status(DeviceStatus.pending) # check if we can issue the certificate immediately if current_user.can_issue: if (certificate := pki.issue( dn=DistinguishedName.build(device) )) is not None: device.set_status(DeviceStatus.certified) device.expiry = certificate.not_valid_after # return updated device device.update() return device @router.post( "/{device_id}/renew", responses={ status.HTTP_200_OK: Responses.OK, status.HTTP_400_BAD_REQUEST: Responses.NOT_INSTALLED, status.HTTP_401_UNAUTHORIZED: Responses.NEEDS_USER, status.HTTP_403_FORBIDDEN: Responses.NEEDS_PERMISSION, status.HTTP_404_NOT_FOUND: Responses.ENTRY_DOESNT_EXIST, status.HTTP_409_CONFLICT: Responses.ENTRY_EXISTS, status.HTTP_425_TOO_EARLY: Responses.NEEDS_PKI, }, response_model=DeviceRead, ) async def request_certificate_renewal( current_user: User = Depends(get_current_user), device: Device = Depends(get_device_by_id), pki: EasyRSA = Depends(get_pki), ) -> Device: """ POST ./{device_id}/renew: Request certificate renewal for a device. Status: - 403: no user permission to edit device - 409: device certificate cannot be "renewed" """ # check permission if not current_user.can_edit(device): raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) # can only "renew" on an already certified device if device.status is not DeviceStatus.certified: raise HTTPException(status_code=status.HTTP_409_CONFLICT) device.set_status(DeviceStatus.pending) # check if we can renew the certificate immediately if current_user.can_renew: if (certificate := pki.renew( dn=DistinguishedName.build(device) )) is not None: device.set_status(DeviceStatus.certified) device.expiry = certificate.not_valid_after # return updated device device.update() return device @router.post( "/{device_id}/revoke", responses={ status.HTTP_200_OK: Responses.OK, status.HTTP_400_BAD_REQUEST: Responses.NOT_INSTALLED, status.HTTP_401_UNAUTHORIZED: Responses.NEEDS_USER, status.HTTP_403_FORBIDDEN: Responses.NEEDS_PERMISSION, status.HTTP_404_NOT_FOUND: Responses.ENTRY_DOESNT_EXIST, status.HTTP_409_CONFLICT: Responses.ENTRY_EXISTS, status.HTTP_425_TOO_EARLY: Responses.NEEDS_PKI, }, response_model=DeviceRead, ) async def revoke_certificate( current_user: User = Depends(get_current_user), device: Device = Depends(get_device_by_id), pki: EasyRSA = Depends(get_pki), ) -> Device: """ POST ./{device_id}/revoke: Revoke a device certificate. Status: - 403: no user permission to edit device - 409: device certificate cannot be "revoked" """ # check permission if not current_user.can_edit(device): raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) # can only "revoke" on a currently certified device if device.status is not DeviceStatus.certified: raise HTTPException(status_code=status.HTTP_409_CONFLICT) # revoke the device certificate pki.revoke(dn=DistinguishedName.build(device)) # reset the device device.set_status(DeviceStatus.uncertified) device.expiry = None # return updated device device.update() return device