diff --git a/api/kiwi_vpn_api/db/__init__.py b/api/kiwi_vpn_api/db/__init__.py index ae088cd..14006e1 100644 --- a/api/kiwi_vpn_api/db/__init__.py +++ b/api/kiwi_vpn_api/db/__init__.py @@ -4,8 +4,8 @@ Package `db`: ORM and schemas for database content. from .connection import Connection from .device import Device, DeviceBase, DeviceCreate, DeviceRead +from .tag import TagValue from .user import User, UserBase, UserCreate, UserRead -from .user_capability import UserCapabilityType __all__ = [ "Connection", @@ -17,5 +17,5 @@ __all__ = [ "UserBase", "UserCreate", "UserRead", - "UserCapabilityType", + "TagValue", ] diff --git a/api/kiwi_vpn_api/db/tag.py b/api/kiwi_vpn_api/db/tag.py new file mode 100644 index 0000000..b29622b --- /dev/null +++ b/api/kiwi_vpn_api/db/tag.py @@ -0,0 +1,56 @@ +""" +Python representation of `tag` table. +""" + +from enum import Enum +from typing import TYPE_CHECKING + +from sqlmodel import Field, Relationship, SQLModel + +if TYPE_CHECKING: + from .user import User + + +class TagValue(Enum): + """ + Allowed values for tags + """ + + admin = "admin" + login = "login" + issue = "issue" + renew = "renew" + + def __repr__(self) -> str: + return self.value + + +class TagBase(SQLModel): + """ + Common to all representations of tags + """ + + tag_value: str = Field(primary_key=True) + + @property + def _(self) -> TagValue: + """ + Transform into a `TagValue`. + """ + + return TagValue(self.tag_value) + + def __repr__(self) -> str: + return self.tag_value + + +class Tag(TagBase, table=True): + """ + Representation of `tag` table + """ + + user_name: str = Field(primary_key=True, foreign_key="user.name") + + user: "User" = Relationship( + back_populates="tags", + ) diff --git a/api/kiwi_vpn_api/db/user.py b/api/kiwi_vpn_api/db/user.py index eac3c03..997dab0 100644 --- a/api/kiwi_vpn_api/db/user.py +++ b/api/kiwi_vpn_api/db/user.py @@ -13,7 +13,7 @@ from sqlmodel import Field, Relationship, SQLModel from ..config import Config from .connection import Connection from .device import Device -from .user_capability import UserCapability, UserCapabilityType +from .tag import Tag, TagValue class UserBase(SQLModel): @@ -77,7 +77,7 @@ class User(UserBase, table=True): password: str - capabilities: list[UserCapability] = Relationship( + tags: list[Tag] = Relationship( back_populates="user", sa_relationship_kwargs={ "lazy": "joined", @@ -164,43 +164,43 @@ class User(UserBase, table=True): db.delete(self) db.commit() - def get_capabilities(self) -> set[UserCapabilityType]: + def get_tags(self) -> set[TagValue]: """ - Return the capabilities of this user. + Return the tags of this user. """ return set( - capability._ - for capability in self.capabilities + tag._ + for tag in self.tags ) - def set_capabilities( + def set_tags( self, - capabilities: Sequence[UserCapabilityType], + tags: Sequence[TagValue], ) -> None: """ - Change the capabilities of this user. + Change the tags of this user. """ - self.capabilities = [ - UserCapability( + self.tags = [ + Tag( user_name=self.name, - capability_name=capability.value, - ) for capability in capabilities + tag_value=tag.value, + ) for tag in tags ] def _can( self, - capability: UserCapabilityType, + tag: TagValue, ) -> bool: """ - Check if this user has a capability. + Check if this user has a tag. """ return ( - capability in self.get_capabilities() + tag in self.get_tags() # admin can do everything - or UserCapabilityType.admin in self.get_capabilities() + or TagValue.admin in self.get_tags() ) def can_edit( @@ -214,7 +214,7 @@ class User(UserBase, table=True): return ( user.name == self.name # admin can edit everything - or self._can(UserCapabilityType.admin) + or self._can(TagValue.admin) ) def is_admin( @@ -224,8 +224,8 @@ class User(UserBase, table=True): Check if this user is an admin. """ - # is admin with "admin" capability - return self._can(UserCapabilityType.admin) + # is admin with "admin" tag + return self._can(TagValue.admin) def can_login( self, @@ -235,8 +235,8 @@ class User(UserBase, table=True): """ return ( - # can login with "login" capability - self._can(UserCapabilityType.login) + # can login with "login" tag + self._can(TagValue.login) # admins can always login or self.is_admin() ) @@ -253,7 +253,7 @@ class User(UserBase, table=True): # user can edit itself self.name == user.name # admin can edit every user - or user._can(UserCapabilityType.admin) + or user._can(TagValue.admin) ) def can_be_deleted_by( @@ -266,7 +266,7 @@ class User(UserBase, table=True): return ( # only admin can delete users - user._can(UserCapabilityType.admin) + user._can(TagValue.admin) # even admin cannot delete itself and self.name != user.name ) @@ -282,5 +282,5 @@ class User(UserBase, table=True): return ( device.owner_name == self.name # admin owns everything - or self._can(UserCapabilityType.admin) + or self._can(TagValue.admin) ) diff --git a/api/kiwi_vpn_api/db/user_capability.py b/api/kiwi_vpn_api/db/user_capability.py deleted file mode 100644 index 479fec4..0000000 --- a/api/kiwi_vpn_api/db/user_capability.py +++ /dev/null @@ -1,56 +0,0 @@ -""" -Python representation of `user_capability` table. -""" - -from enum import Enum -from typing import TYPE_CHECKING - -from sqlmodel import Field, Relationship, SQLModel - -if TYPE_CHECKING: - from .user import User - - -class UserCapabilityType(Enum): - """ - Allowed values for capabilities - """ - - admin = "admin" - login = "login" - issue = "issue" - renew = "renew" - - def __repr__(self) -> str: - return self.value - - -class UserCapabilityBase(SQLModel): - """ - Common to all representations of capabilities - """ - - capability_name: str = Field(primary_key=True) - - @property - def _(self) -> UserCapabilityType: - """ - Transform into a `Capability`. - """ - - return UserCapabilityType(self.capability_name) - - def __repr__(self) -> str: - return self.capability_name - - -class UserCapability(UserCapabilityBase, table=True): - """ - Representation of `user_capability` table - """ - - user_name: str = Field(primary_key=True, foreign_key="user.name") - - user: "User" = Relationship( - back_populates="capabilities", - ) diff --git a/api/kiwi_vpn_api/routers/admin.py b/api/kiwi_vpn_api/routers/admin.py index 29c442f..e5e3941 100644 --- a/api/kiwi_vpn_api/routers/admin.py +++ b/api/kiwi_vpn_api/routers/admin.py @@ -7,7 +7,7 @@ from fastapi import APIRouter, Depends, HTTPException, status from sqlmodel import select from ..config import Config -from ..db import Connection, User, UserCapabilityType, UserCreate +from ..db import Connection, TagValue, User, UserCreate from ._common import Responses, get_current_user_if_admin router = APIRouter(prefix="/admin", tags=["admin"]) @@ -64,7 +64,7 @@ async def create_initial_admin( # create an administrative user new_user = User.create(user=admin_user) - new_user.set_capabilities([UserCapabilityType.admin]) + new_user.set_tags([TagValue.admin]) new_user.update() diff --git a/api/kiwi_vpn_api/routers/user.py b/api/kiwi_vpn_api/routers/user.py index 5772d5b..35346c6 100644 --- a/api/kiwi_vpn_api/routers/user.py +++ b/api/kiwi_vpn_api/routers/user.py @@ -7,7 +7,7 @@ from fastapi.security import OAuth2PasswordRequestForm from pydantic import BaseModel from ..config import Config -from ..db import User, UserCapabilityType, UserCreate, UserRead +from ..db import TagValue, User, UserCreate, UserRead from ._common import (Responses, get_current_user_if_admin, get_current_user_if_exists, get_user_by_name) @@ -94,7 +94,7 @@ async def add_user( if new_user is None: raise HTTPException(status_code=status.HTTP_409_CONFLICT) - new_user.set_capabilities([UserCapabilityType.login]) + new_user.set_tags([TagValue.login]) new_user.update() # return the created user on success @@ -130,7 +130,7 @@ async def remove_user( @router.post( - "/{user_name}/capabilities", + "/{user_name}/tags", responses={ status.HTTP_200_OK: Responses.OK, status.HTTP_400_BAD_REQUEST: Responses.NOT_INSTALLED, @@ -138,22 +138,22 @@ async def remove_user( status.HTTP_403_FORBIDDEN: Responses.NEEDS_ADMIN, }, ) -async def extend_capabilities( - capabilities: list[UserCapabilityType], +async def extend_tags( + tags: list[TagValue], _: User = Depends(get_current_user_if_admin), user: User = Depends(get_user_by_name), ): """ - POST ./{user_name}/capabilities: Add capabilities to a user. + POST ./{user_name}/tags: Add tags to a user. """ - user.set_capabilities(user.get_capabilities() | set(capabilities)) + user.set_tags(user.get_tags() | set(tags)) user.update() @router.delete( - "/{user_name}/capabilities", + "/{user_name}/tags", responses={ status.HTTP_200_OK: Responses.OK, status.HTTP_400_BAD_REQUEST: Responses.NOT_INSTALLED, @@ -161,15 +161,15 @@ async def extend_capabilities( status.HTTP_403_FORBIDDEN: Responses.NEEDS_ADMIN, }, ) -async def remove_capabilities( - capabilities: list[UserCapabilityType], +async def remove_tags( + tags: list[TagValue], _: User = Depends(get_current_user_if_admin), user: User = Depends(get_user_by_name), ): """ - DELETE ./{user_name}/capabilities: Remove capabilities from a user. + DELETE ./{user_name}/tags: Remove tags from a user. """ - user.set_capabilities(user.get_capabilities() - set(capabilities)) + user.set_tags(user.get_tags() - set(tags)) user.update()