diff --git a/api/kiwi_vpn_api/db/user.py b/api/kiwi_vpn_api/db/user.py index ccc4d5e..3ba5072 100644 --- a/api/kiwi_vpn_api/db/user.py +++ b/api/kiwi_vpn_api/db/user.py @@ -220,67 +220,71 @@ class User(UserBase, table=True): for tag in (set(self.__tags) - set(tags)) ] - def can_edit( + def check_edit( self, target: User | Device, - ) -> bool: + ) -> None: """ Check if this user can edit another user or a device. """ # admin can "edit" everything if self.is_admin: - return True + return None - # user can "edit" itself - if isinstance(target, User): - return target == self + # user can only "edit" itself + if isinstance(target, User) and target == self: + return None # user can edit its owned devices - return target.owner == self + if isinstance(target, Device) and target.owner == self: + return None - def can_admin( + # deny by default + raise PermissionError() + + def check_admin( self, target: User | Device, - ) -> bool: + ) -> None: """ Check if this user can administer another user or a device. """ # only admin can "admin" anything if not self.is_admin: - return False + raise PermissionError("Must be admin") - # admin canot "admin itself"! + # admin cannot "admin" itself! if isinstance(target, User) and target == self: - return False + raise PermissionError("Can't administer self") # admin can "admin" everything else - return True + return None - def can_create( + def check_create( self, target: type, owner: User | None = None, - ) -> bool: + ) -> None: """ Check if this user can create another user or a device. """ # can never create anything but users or devices if not issubclass(target, (User, Device)): - return False + raise PermissionError(f"Cannot create target type {target}") # admin can "create" everything if self.is_admin: - return True + return None # user can only create devices for itself if target is Device and owner == self: - return True + return None - # deny be default - return False + # deny by default + raise PermissionError() @property def can_issue(self) -> bool: diff --git a/api/kiwi_vpn_api/routers/device.py b/api/kiwi_vpn_api/routers/device.py index 28b0450..a270dd0 100644 --- a/api/kiwi_vpn_api/routers/device.py +++ b/api/kiwi_vpn_api/routers/device.py @@ -39,9 +39,12 @@ async def add_device( - 409: device creation unsuccessful """ - # check permission - if not current_user.can_create(Device, owner): - raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) + # check permissions + try: + current_user.check_create(Device, owner) + + except PermissionError as e: + raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) from e # create the new device new_device = Device.create( @@ -80,9 +83,12 @@ async def remove_device( - 403: no user permission to edit device """ - # check permission - if not current_user.can_edit(device): - raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) + # check permissions + try: + current_user.check_edit(device) + + except PermissionError as e: + raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) from e # delete device device.delete() @@ -113,9 +119,12 @@ async def request_certificate_issuance( - 409: device certificate cannot be "issued" """ - # check permission - if not current_user.can_edit(device): - raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) + # check permissions + try: + current_user.check_edit(device) + + except PermissionError as e: + raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) from e # can only "request" on an uncertified device if device.status is not DeviceStatus.uncertified: @@ -161,9 +170,12 @@ async def request_certificate_renewal( - 409: device certificate cannot be "renewed" """ - # check permission - if not current_user.can_edit(device): - raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) + # check permissions + try: + current_user.check_edit(device) + + except PermissionError as e: + raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) from e # can only "renew" on an already certified device if device.status is not DeviceStatus.certified: @@ -209,9 +221,12 @@ async def revoke_certificate( - 409: device certificate cannot be "revoked" """ - # check permission - if not current_user.can_edit(device): - raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) + # check permissions + try: + current_user.check_edit(device) + + except PermissionError as e: + raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) from e # can only "revoke" on a currently certified device if device.status is not DeviceStatus.certified: diff --git a/api/kiwi_vpn_api/routers/user.py b/api/kiwi_vpn_api/routers/user.py index b75e441..f015fc9 100644 --- a/api/kiwi_vpn_api/routers/user.py +++ b/api/kiwi_vpn_api/routers/user.py @@ -107,8 +107,11 @@ async def add_user( """ # check permissions - if not current_user.can_create(User): - raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) + try: + current_user.check_create(User) + + except PermissionError as e: + raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) from e # create the new user new_user = User.create(user=user) @@ -147,8 +150,11 @@ async def remove_user( """ # check permissions - if not current_user.can_admin(user): - raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) + try: + current_user.check_admin(user) + + except PermissionError as e: + raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) from e # delete user user.delete() @@ -178,8 +184,11 @@ async def extend_tags( """ # check permissions - if not current_user.can_admin(user): - raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) + try: + current_user.check_admin(user) + + except PermissionError as e: + raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) from e # change user user.add_tags(tags) @@ -209,8 +218,11 @@ async def remove_tags( """ # check permissions - if not current_user.can_admin(user): - raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) + try: + current_user.check_admin(user) + + except PermissionError as e: + raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) from e # change user user.remove_tags(tags)