Compare commits

...

5 commits

6 changed files with 91 additions and 68 deletions

View file

@ -2,6 +2,8 @@
Python representation of `tag` table. Python representation of `tag` table.
""" """
from __future__ import annotations
from enum import Enum from enum import Enum
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
@ -24,6 +26,16 @@ class TagValue(Enum):
def __repr__(self) -> str: def __repr__(self) -> str:
return self.value return self.value
def _(self, user: User) -> Tag:
"""
Transform into a `Tag`.
"""
return Tag(
user=user,
tag_value=self.value,
)
class TagBase(SQLModel): class TagBase(SQLModel):
""" """
@ -51,6 +63,6 @@ class Tag(TagBase, table=True):
user_name: str = Field(primary_key=True, foreign_key="user.name") user_name: str = Field(primary_key=True, foreign_key="user.name")
user: "User" = Relationship( user: User = Relationship(
back_populates="tags", back_populates="tags",
) )

View file

@ -4,7 +4,7 @@ Python representation of `user` table.
from __future__ import annotations from __future__ import annotations
from typing import Any, Sequence from typing import Any, Iterable, Sequence
from pydantic import root_validator from pydantic import root_validator
from sqlalchemy.exc import IntegrityError from sqlalchemy.exc import IntegrityError
@ -143,7 +143,8 @@ class User(UserBase, table=True):
# password hash mismatch # password hash mismatch
return None return None
if TagValue.login in user.get_tags(): if not (user.has_tag(TagValue.login)
or user.has_tag(TagValue.admin)):
# no login permission # no login permission
return None return None
@ -168,32 +169,50 @@ class User(UserBase, table=True):
db.delete(self) db.delete(self)
db.commit() db.commit()
def get_tags(self) -> set[TagValue]: def _get_tags(self) -> Iterable[TagValue]:
""" """
Return the tags of this user. Return the tags of this user.
""" """
return set( return (
tag._ tag._
for tag in self.tags for tag in self.tags
) )
def set_tags( def has_tag(self, tag: TagValue) -> bool:
"""
Check if this user has a tag.
"""
return tag in self._get_tags()
def add_tags(
self, self,
tags: Sequence[TagValue], tags: Sequence[TagValue],
) -> None: ) -> None:
""" """
Change the tags of this user. Add tags to this user.
""" """
self.tags = [ self.tags = [
Tag( tag._(self)
user_name=self.name, for tag in (set(self._get_tags()) | set(tags))
tag_value=tag.value,
) for tag in tags
] ]
def may_edit( def remove_tags(
self,
tags: Sequence[TagValue],
) -> None:
"""
remove tags from this user.
"""
self.tags = [
tag._(self)
for tag in (set(self._get_tags()) - set(tags))
]
def can_edit(
self, self,
target: User | Device, target: User | Device,
) -> bool: ) -> bool:
@ -202,7 +221,7 @@ class User(UserBase, table=True):
""" """
# admin can "edit" everything # admin can "edit" everything
if TagValue.admin in self.get_tags(): if self.has_tag(TagValue.admin):
return True return True
# user can "edit" itself # user can "edit" itself
@ -212,7 +231,7 @@ class User(UserBase, table=True):
# user can edit its owned devices # user can edit its owned devices
return target.owner == self return target.owner == self
def may_admin( def can_admin(
self, self,
target: User | Device, target: User | Device,
) -> bool: ) -> bool:
@ -221,7 +240,7 @@ class User(UserBase, table=True):
""" """
# only admin can "admin" anything # only admin can "admin" anything
if TagValue.admin not in self.get_tags(): if not self.has_tag(TagValue.admin):
return False return False
# admin canot "admin itself"! # admin canot "admin itself"!
@ -231,7 +250,7 @@ class User(UserBase, table=True):
# admin can "admin" everything else # admin can "admin" everything else
return True return True
def may_create( def can_create(
self, self,
target: type, target: type,
owner: User | None = None, owner: User | None = None,
@ -245,7 +264,7 @@ class User(UserBase, table=True):
return False return False
# admin can "create" everything # admin can "create" everything
if TagValue.admin in self.get_tags(): if self.has_tag(TagValue.admin):
return True return True
# user can only create devices for itself # user can only create devices for itself

View file

@ -5,7 +5,6 @@ Common dependencies for routers.
from fastapi import Depends, HTTPException, status from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer from fastapi.security import OAuth2PasswordBearer
from kiwi_vpn_api.db.tag import TagValue
from ..config import Config, Settings from ..config import Config, Settings
from ..db import Device, User from ..db import Device, User
@ -37,12 +36,8 @@ class Responses:
"description": "Must be logged in", "description": "Must be logged in",
"content": None, "content": None,
} }
NEEDS_ADMIN = { NEEDS_PERMISSION = {
"description": "Must be admin", "description": "You're not allowed that operation",
"content": None,
}
PERMISSION_ERROR = {
"description": "You're not allowed that action",
"content": None, "content": None,
} }
ENTRY_EXISTS = { ENTRY_EXISTS = {
@ -53,10 +48,6 @@ class Responses:
"description": "Entry does not exist in database", "description": "Entry does not exist in database",
"content": None, "content": None,
} }
CANT_TARGET_SELF = {
"description": "Operation can't target yourself",
"content": None,
}
async def get_current_user( async def get_current_user(
@ -81,19 +72,6 @@ async def get_current_user(
return user return user
async def get_current_user_if_admin(
current_user: User = Depends(get_current_user),
) -> User:
"""
Fail if the currently logged-in user is not an admin.
"""
if TagValue.admin not in current_user.get_tags():
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
return current_user
async def get_user_by_name( async def get_user_by_name(
user_name: str, user_name: str,
current_config: Config | None = Depends(Config.load), current_config: Config | None = Depends(Config.load),

View file

@ -8,7 +8,7 @@ from sqlmodel import select
from ..config import Config from ..config import Config
from ..db import Connection, TagValue, User, UserCreate from ..db import Connection, TagValue, User, UserCreate
from ._common import Responses, get_current_user_if_admin from ._common import Responses, get_current_user
router = APIRouter(prefix="/admin", tags=["admin"]) router = APIRouter(prefix="/admin", tags=["admin"])
@ -64,7 +64,7 @@ async def create_initial_admin(
# create an administrative user # create an administrative user
new_user = User.create(user=admin_user) new_user = User.create(user=admin_user)
new_user.set_tags([TagValue.admin]) new_user.add_tags([TagValue.admin])
new_user.update() new_user.update()
@ -74,17 +74,21 @@ async def create_initial_admin(
status.HTTP_200_OK: Responses.OK, status.HTTP_200_OK: Responses.OK,
status.HTTP_400_BAD_REQUEST: Responses.NOT_INSTALLED, status.HTTP_400_BAD_REQUEST: Responses.NOT_INSTALLED,
status.HTTP_401_UNAUTHORIZED: Responses.NEEDS_USER, status.HTTP_401_UNAUTHORIZED: Responses.NEEDS_USER,
status.HTTP_403_FORBIDDEN: Responses.NEEDS_ADMIN, status.HTTP_403_FORBIDDEN: Responses.NEEDS_PERMISSION,
}, },
) )
async def set_config( async def set_config(
config: Config, config: Config,
_: User = Depends(get_current_user_if_admin), current_user: User = Depends(get_current_user),
): ):
""" """
PUT ./config: Edit `kiwi-vpn` main config. PUT ./config: Edit `kiwi-vpn` main config.
""" """
# check permissions
if not current_user.has_tag(TagValue.admin):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
# update config file, reconnect to database # update config file, reconnect to database
config.save() config.save()
Connection.connect(config.db.uri) Connection.connect(config.db.uri)

View file

@ -17,7 +17,7 @@ router = APIRouter(prefix="/device", tags=["device"])
status.HTTP_200_OK: Responses.OK, status.HTTP_200_OK: Responses.OK,
status.HTTP_400_BAD_REQUEST: Responses.NOT_INSTALLED, status.HTTP_400_BAD_REQUEST: Responses.NOT_INSTALLED,
status.HTTP_401_UNAUTHORIZED: Responses.NEEDS_USER, status.HTTP_401_UNAUTHORIZED: Responses.NEEDS_USER,
status.HTTP_403_FORBIDDEN: Responses.PERMISSION_ERROR, status.HTTP_403_FORBIDDEN: Responses.NEEDS_PERMISSION,
status.HTTP_404_NOT_FOUND: Responses.ENTRY_DOESNT_EXIST, status.HTTP_404_NOT_FOUND: Responses.ENTRY_DOESNT_EXIST,
status.HTTP_409_CONFLICT: Responses.ENTRY_EXISTS, status.HTTP_409_CONFLICT: Responses.ENTRY_EXISTS,
}, },
@ -33,7 +33,7 @@ async def add_device(
""" """
# check permission # check permission
if not current_user.may_create(Device, owner): if not current_user.can_create(Device, owner):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
# create the new device # create the new device
@ -56,7 +56,7 @@ async def add_device(
status.HTTP_200_OK: Responses.OK, status.HTTP_200_OK: Responses.OK,
status.HTTP_400_BAD_REQUEST: Responses.NOT_INSTALLED, status.HTTP_400_BAD_REQUEST: Responses.NOT_INSTALLED,
status.HTTP_401_UNAUTHORIZED: Responses.NEEDS_USER, status.HTTP_401_UNAUTHORIZED: Responses.NEEDS_USER,
status.HTTP_403_FORBIDDEN: Responses.NEEDS_ADMIN, status.HTTP_403_FORBIDDEN: Responses.NEEDS_PERMISSION,
status.HTTP_404_NOT_FOUND: Responses.ENTRY_DOESNT_EXIST, status.HTTP_404_NOT_FOUND: Responses.ENTRY_DOESNT_EXIST,
}, },
response_model=User, response_model=User,
@ -70,7 +70,7 @@ async def remove_device(
""" """
# check permission # check permission
if not current_user.may_edit(device): if not current_user.can_edit(device):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
# delete device # delete device

View file

@ -8,8 +8,7 @@ from pydantic import BaseModel
from ..config import Config from ..config import Config
from ..db import TagValue, User, UserCreate, UserRead from ..db import TagValue, User, UserCreate, UserRead
from ._common import (Responses, get_current_user_if_admin, from ._common import Responses, get_current_user, get_user_by_name
get_current_user, get_user_by_name)
router = APIRouter(prefix="/user", tags=["user"]) router = APIRouter(prefix="/user", tags=["user"])
@ -54,7 +53,7 @@ async def login(
@router.get("/current", response_model=UserRead) @router.get("/current", response_model=UserRead)
async def get_current_user( async def get_current_user_route(
current_user: User = Depends(get_current_user), current_user: User = Depends(get_current_user),
): ):
""" """
@ -70,27 +69,31 @@ async def get_current_user(
status.HTTP_200_OK: Responses.OK, status.HTTP_200_OK: Responses.OK,
status.HTTP_400_BAD_REQUEST: Responses.NOT_INSTALLED, status.HTTP_400_BAD_REQUEST: Responses.NOT_INSTALLED,
status.HTTP_401_UNAUTHORIZED: Responses.NEEDS_USER, status.HTTP_401_UNAUTHORIZED: Responses.NEEDS_USER,
status.HTTP_403_FORBIDDEN: Responses.NEEDS_ADMIN, status.HTTP_403_FORBIDDEN: Responses.NEEDS_PERMISSION,
status.HTTP_409_CONFLICT: Responses.ENTRY_EXISTS, status.HTTP_409_CONFLICT: Responses.ENTRY_EXISTS,
}, },
response_model=UserRead, response_model=UserRead,
) )
async def add_user( async def add_user(
user: UserCreate, user: UserCreate,
_: User = Depends(get_current_user_if_admin), current_user: User = Depends(get_current_user),
) -> User: ) -> User:
""" """
POST ./: Create a new user in the database. POST ./: Create a new user in the database.
""" """
# actually create the new user # check permissions
if not current_user.can_create(User):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
# create the new user
new_user = User.create(user=user) new_user = User.create(user=user)
# fail if creation was unsuccessful # fail if creation was unsuccessful
if new_user is None: if new_user is None:
raise HTTPException(status_code=status.HTTP_409_CONFLICT) raise HTTPException(status_code=status.HTTP_409_CONFLICT)
new_user.set_tags([TagValue.login]) new_user.add_tags([TagValue.login])
new_user.update() new_user.update()
# return the created user on success # return the created user on success
@ -103,23 +106,22 @@ async def add_user(
status.HTTP_200_OK: Responses.OK, status.HTTP_200_OK: Responses.OK,
status.HTTP_400_BAD_REQUEST: Responses.NOT_INSTALLED, status.HTTP_400_BAD_REQUEST: Responses.NOT_INSTALLED,
status.HTTP_401_UNAUTHORIZED: Responses.NEEDS_USER, status.HTTP_401_UNAUTHORIZED: Responses.NEEDS_USER,
status.HTTP_403_FORBIDDEN: Responses.NEEDS_ADMIN, status.HTTP_403_FORBIDDEN: Responses.NEEDS_PERMISSION,
status.HTTP_404_NOT_FOUND: Responses.ENTRY_DOESNT_EXIST, status.HTTP_404_NOT_FOUND: Responses.ENTRY_DOESNT_EXIST,
status.HTTP_406_NOT_ACCEPTABLE: Responses.CANT_TARGET_SELF,
}, },
response_model=User, response_model=User,
) )
async def remove_user( async def remove_user(
current_user: User = Depends(get_current_user_if_admin), current_user: User = Depends(get_current_user),
user: User = Depends(get_user_by_name), user: User = Depends(get_user_by_name),
): ):
""" """
DELETE ./{user_name}: Remove a user from the database. DELETE ./{user_name}: Remove a user from the database.
""" """
# stop inting # check permissions
if current_user.name == user.name: if not current_user.can_admin(user):
raise HTTPException(status_code=status.HTTP_406_NOT_ACCEPTABLE) raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
# delete user # delete user
user.delete() user.delete()
@ -131,20 +133,24 @@ async def remove_user(
status.HTTP_200_OK: Responses.OK, status.HTTP_200_OK: Responses.OK,
status.HTTP_400_BAD_REQUEST: Responses.NOT_INSTALLED, status.HTTP_400_BAD_REQUEST: Responses.NOT_INSTALLED,
status.HTTP_401_UNAUTHORIZED: Responses.NEEDS_USER, status.HTTP_401_UNAUTHORIZED: Responses.NEEDS_USER,
status.HTTP_403_FORBIDDEN: Responses.NEEDS_ADMIN, status.HTTP_403_FORBIDDEN: Responses.NEEDS_PERMISSION,
}, },
) )
async def extend_tags( async def extend_tags(
tags: list[TagValue], tags: list[TagValue],
_: User = Depends(get_current_user_if_admin), current_user: User = Depends(get_current_user),
user: User = Depends(get_user_by_name), user: User = Depends(get_user_by_name),
): ):
""" """
POST ./{user_name}/tags: Add tags to a user. POST ./{user_name}/tags: Add tags to a user.
""" """
user.set_tags(user.get_tags() | set(tags)) # check permissions
if not current_user.can_admin(user):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
# change user
user.add_tags(tags)
user.update() user.update()
@ -154,18 +160,22 @@ async def extend_tags(
status.HTTP_200_OK: Responses.OK, status.HTTP_200_OK: Responses.OK,
status.HTTP_400_BAD_REQUEST: Responses.NOT_INSTALLED, status.HTTP_400_BAD_REQUEST: Responses.NOT_INSTALLED,
status.HTTP_401_UNAUTHORIZED: Responses.NEEDS_USER, status.HTTP_401_UNAUTHORIZED: Responses.NEEDS_USER,
status.HTTP_403_FORBIDDEN: Responses.NEEDS_ADMIN, status.HTTP_403_FORBIDDEN: Responses.NEEDS_PERMISSION,
}, },
) )
async def remove_tags( async def remove_tags(
tags: list[TagValue], tags: list[TagValue],
_: User = Depends(get_current_user_if_admin), current_user: User = Depends(get_current_user),
user: User = Depends(get_user_by_name), user: User = Depends(get_user_by_name),
): ):
""" """
DELETE ./{user_name}/tags: Remove tags from a user. DELETE ./{user_name}/tags: Remove tags from a user.
""" """
user.set_tags(user.get_tags() - set(tags)) # check permissions
if not current_user.can_admin(user):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
# change user
user.remove_tags(tags)
user.update() user.update()