Compare commits
5 commits
03d3a86668
...
d02239816a
| Author | SHA1 | Date | |
|---|---|---|---|
| d02239816a | |||
| cb3a3fca69 | |||
| 598b0ca2cb | |||
| 3b66565481 | |||
| 9b5a98e0c0 |
6 changed files with 91 additions and 68 deletions
|
|
@ -2,6 +2,8 @@
|
|||
Python representation of `tag` table.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from enum import Enum
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
|
|
@ -24,6 +26,16 @@ class TagValue(Enum):
|
|||
def __repr__(self) -> str:
|
||||
return self.value
|
||||
|
||||
def _(self, user: User) -> Tag:
|
||||
"""
|
||||
Transform into a `Tag`.
|
||||
"""
|
||||
|
||||
return Tag(
|
||||
user=user,
|
||||
tag_value=self.value,
|
||||
)
|
||||
|
||||
|
||||
class TagBase(SQLModel):
|
||||
"""
|
||||
|
|
@ -51,6 +63,6 @@ class Tag(TagBase, table=True):
|
|||
|
||||
user_name: str = Field(primary_key=True, foreign_key="user.name")
|
||||
|
||||
user: "User" = Relationship(
|
||||
user: User = Relationship(
|
||||
back_populates="tags",
|
||||
)
|
||||
|
|
|
|||
|
|
@ -4,7 +4,7 @@ Python representation of `user` table.
|
|||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import Any, Sequence
|
||||
from typing import Any, Iterable, Sequence
|
||||
|
||||
from pydantic import root_validator
|
||||
from sqlalchemy.exc import IntegrityError
|
||||
|
|
@ -143,7 +143,8 @@ class User(UserBase, table=True):
|
|||
# password hash mismatch
|
||||
return None
|
||||
|
||||
if TagValue.login in user.get_tags():
|
||||
if not (user.has_tag(TagValue.login)
|
||||
or user.has_tag(TagValue.admin)):
|
||||
# no login permission
|
||||
return None
|
||||
|
||||
|
|
@ -168,32 +169,50 @@ class User(UserBase, table=True):
|
|||
db.delete(self)
|
||||
db.commit()
|
||||
|
||||
def get_tags(self) -> set[TagValue]:
|
||||
def _get_tags(self) -> Iterable[TagValue]:
|
||||
"""
|
||||
Return the tags of this user.
|
||||
"""
|
||||
|
||||
return set(
|
||||
return (
|
||||
tag._
|
||||
for tag in self.tags
|
||||
)
|
||||
|
||||
def set_tags(
|
||||
def has_tag(self, tag: TagValue) -> bool:
|
||||
"""
|
||||
Check if this user has a tag.
|
||||
"""
|
||||
|
||||
return tag in self._get_tags()
|
||||
|
||||
def add_tags(
|
||||
self,
|
||||
tags: Sequence[TagValue],
|
||||
) -> None:
|
||||
"""
|
||||
Change the tags of this user.
|
||||
Add tags to this user.
|
||||
"""
|
||||
|
||||
self.tags = [
|
||||
Tag(
|
||||
user_name=self.name,
|
||||
tag_value=tag.value,
|
||||
) for tag in tags
|
||||
tag._(self)
|
||||
for tag in (set(self._get_tags()) | set(tags))
|
||||
]
|
||||
|
||||
def may_edit(
|
||||
def remove_tags(
|
||||
self,
|
||||
tags: Sequence[TagValue],
|
||||
) -> None:
|
||||
"""
|
||||
remove tags from this user.
|
||||
"""
|
||||
|
||||
self.tags = [
|
||||
tag._(self)
|
||||
for tag in (set(self._get_tags()) - set(tags))
|
||||
]
|
||||
|
||||
def can_edit(
|
||||
self,
|
||||
target: User | Device,
|
||||
) -> bool:
|
||||
|
|
@ -202,7 +221,7 @@ class User(UserBase, table=True):
|
|||
"""
|
||||
|
||||
# admin can "edit" everything
|
||||
if TagValue.admin in self.get_tags():
|
||||
if self.has_tag(TagValue.admin):
|
||||
return True
|
||||
|
||||
# user can "edit" itself
|
||||
|
|
@ -212,7 +231,7 @@ class User(UserBase, table=True):
|
|||
# user can edit its owned devices
|
||||
return target.owner == self
|
||||
|
||||
def may_admin(
|
||||
def can_admin(
|
||||
self,
|
||||
target: User | Device,
|
||||
) -> bool:
|
||||
|
|
@ -221,7 +240,7 @@ class User(UserBase, table=True):
|
|||
"""
|
||||
|
||||
# only admin can "admin" anything
|
||||
if TagValue.admin not in self.get_tags():
|
||||
if not self.has_tag(TagValue.admin):
|
||||
return False
|
||||
|
||||
# admin canot "admin itself"!
|
||||
|
|
@ -231,7 +250,7 @@ class User(UserBase, table=True):
|
|||
# admin can "admin" everything else
|
||||
return True
|
||||
|
||||
def may_create(
|
||||
def can_create(
|
||||
self,
|
||||
target: type,
|
||||
owner: User | None = None,
|
||||
|
|
@ -245,7 +264,7 @@ class User(UserBase, table=True):
|
|||
return False
|
||||
|
||||
# admin can "create" everything
|
||||
if TagValue.admin in self.get_tags():
|
||||
if self.has_tag(TagValue.admin):
|
||||
return True
|
||||
|
||||
# user can only create devices for itself
|
||||
|
|
|
|||
|
|
@ -5,7 +5,6 @@ Common dependencies for routers.
|
|||
|
||||
from fastapi import Depends, HTTPException, status
|
||||
from fastapi.security import OAuth2PasswordBearer
|
||||
from kiwi_vpn_api.db.tag import TagValue
|
||||
|
||||
from ..config import Config, Settings
|
||||
from ..db import Device, User
|
||||
|
|
@ -37,12 +36,8 @@ class Responses:
|
|||
"description": "Must be logged in",
|
||||
"content": None,
|
||||
}
|
||||
NEEDS_ADMIN = {
|
||||
"description": "Must be admin",
|
||||
"content": None,
|
||||
}
|
||||
PERMISSION_ERROR = {
|
||||
"description": "You're not allowed that action",
|
||||
NEEDS_PERMISSION = {
|
||||
"description": "You're not allowed that operation",
|
||||
"content": None,
|
||||
}
|
||||
ENTRY_EXISTS = {
|
||||
|
|
@ -53,10 +48,6 @@ class Responses:
|
|||
"description": "Entry does not exist in database",
|
||||
"content": None,
|
||||
}
|
||||
CANT_TARGET_SELF = {
|
||||
"description": "Operation can't target yourself",
|
||||
"content": None,
|
||||
}
|
||||
|
||||
|
||||
async def get_current_user(
|
||||
|
|
@ -81,19 +72,6 @@ async def get_current_user(
|
|||
return user
|
||||
|
||||
|
||||
async def get_current_user_if_admin(
|
||||
current_user: User = Depends(get_current_user),
|
||||
) -> User:
|
||||
"""
|
||||
Fail if the currently logged-in user is not an admin.
|
||||
"""
|
||||
|
||||
if TagValue.admin not in current_user.get_tags():
|
||||
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
|
||||
|
||||
return current_user
|
||||
|
||||
|
||||
async def get_user_by_name(
|
||||
user_name: str,
|
||||
current_config: Config | None = Depends(Config.load),
|
||||
|
|
|
|||
|
|
@ -8,7 +8,7 @@ from sqlmodel import select
|
|||
|
||||
from ..config import Config
|
||||
from ..db import Connection, TagValue, User, UserCreate
|
||||
from ._common import Responses, get_current_user_if_admin
|
||||
from ._common import Responses, get_current_user
|
||||
|
||||
router = APIRouter(prefix="/admin", tags=["admin"])
|
||||
|
||||
|
|
@ -64,7 +64,7 @@ async def create_initial_admin(
|
|||
|
||||
# create an administrative user
|
||||
new_user = User.create(user=admin_user)
|
||||
new_user.set_tags([TagValue.admin])
|
||||
new_user.add_tags([TagValue.admin])
|
||||
new_user.update()
|
||||
|
||||
|
||||
|
|
@ -74,17 +74,21 @@ async def create_initial_admin(
|
|||
status.HTTP_200_OK: Responses.OK,
|
||||
status.HTTP_400_BAD_REQUEST: Responses.NOT_INSTALLED,
|
||||
status.HTTP_401_UNAUTHORIZED: Responses.NEEDS_USER,
|
||||
status.HTTP_403_FORBIDDEN: Responses.NEEDS_ADMIN,
|
||||
status.HTTP_403_FORBIDDEN: Responses.NEEDS_PERMISSION,
|
||||
},
|
||||
)
|
||||
async def set_config(
|
||||
config: Config,
|
||||
_: User = Depends(get_current_user_if_admin),
|
||||
current_user: User = Depends(get_current_user),
|
||||
):
|
||||
"""
|
||||
PUT ./config: Edit `kiwi-vpn` main config.
|
||||
"""
|
||||
|
||||
# check permissions
|
||||
if not current_user.has_tag(TagValue.admin):
|
||||
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
|
||||
|
||||
# update config file, reconnect to database
|
||||
config.save()
|
||||
Connection.connect(config.db.uri)
|
||||
|
|
|
|||
|
|
@ -17,7 +17,7 @@ router = APIRouter(prefix="/device", tags=["device"])
|
|||
status.HTTP_200_OK: Responses.OK,
|
||||
status.HTTP_400_BAD_REQUEST: Responses.NOT_INSTALLED,
|
||||
status.HTTP_401_UNAUTHORIZED: Responses.NEEDS_USER,
|
||||
status.HTTP_403_FORBIDDEN: Responses.PERMISSION_ERROR,
|
||||
status.HTTP_403_FORBIDDEN: Responses.NEEDS_PERMISSION,
|
||||
status.HTTP_404_NOT_FOUND: Responses.ENTRY_DOESNT_EXIST,
|
||||
status.HTTP_409_CONFLICT: Responses.ENTRY_EXISTS,
|
||||
},
|
||||
|
|
@ -33,7 +33,7 @@ async def add_device(
|
|||
"""
|
||||
|
||||
# check permission
|
||||
if not current_user.may_create(Device, owner):
|
||||
if not current_user.can_create(Device, owner):
|
||||
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
|
||||
|
||||
# create the new device
|
||||
|
|
@ -56,7 +56,7 @@ async def add_device(
|
|||
status.HTTP_200_OK: Responses.OK,
|
||||
status.HTTP_400_BAD_REQUEST: Responses.NOT_INSTALLED,
|
||||
status.HTTP_401_UNAUTHORIZED: Responses.NEEDS_USER,
|
||||
status.HTTP_403_FORBIDDEN: Responses.NEEDS_ADMIN,
|
||||
status.HTTP_403_FORBIDDEN: Responses.NEEDS_PERMISSION,
|
||||
status.HTTP_404_NOT_FOUND: Responses.ENTRY_DOESNT_EXIST,
|
||||
},
|
||||
response_model=User,
|
||||
|
|
@ -70,7 +70,7 @@ async def remove_device(
|
|||
"""
|
||||
|
||||
# check permission
|
||||
if not current_user.may_edit(device):
|
||||
if not current_user.can_edit(device):
|
||||
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
|
||||
|
||||
# delete device
|
||||
|
|
|
|||
|
|
@ -8,8 +8,7 @@ from pydantic import BaseModel
|
|||
|
||||
from ..config import Config
|
||||
from ..db import TagValue, User, UserCreate, UserRead
|
||||
from ._common import (Responses, get_current_user_if_admin,
|
||||
get_current_user, get_user_by_name)
|
||||
from ._common import Responses, get_current_user, get_user_by_name
|
||||
|
||||
router = APIRouter(prefix="/user", tags=["user"])
|
||||
|
||||
|
|
@ -54,7 +53,7 @@ async def login(
|
|||
|
||||
|
||||
@router.get("/current", response_model=UserRead)
|
||||
async def get_current_user(
|
||||
async def get_current_user_route(
|
||||
current_user: User = Depends(get_current_user),
|
||||
):
|
||||
"""
|
||||
|
|
@ -70,27 +69,31 @@ async def get_current_user(
|
|||
status.HTTP_200_OK: Responses.OK,
|
||||
status.HTTP_400_BAD_REQUEST: Responses.NOT_INSTALLED,
|
||||
status.HTTP_401_UNAUTHORIZED: Responses.NEEDS_USER,
|
||||
status.HTTP_403_FORBIDDEN: Responses.NEEDS_ADMIN,
|
||||
status.HTTP_403_FORBIDDEN: Responses.NEEDS_PERMISSION,
|
||||
status.HTTP_409_CONFLICT: Responses.ENTRY_EXISTS,
|
||||
},
|
||||
response_model=UserRead,
|
||||
)
|
||||
async def add_user(
|
||||
user: UserCreate,
|
||||
_: User = Depends(get_current_user_if_admin),
|
||||
current_user: User = Depends(get_current_user),
|
||||
) -> User:
|
||||
"""
|
||||
POST ./: Create a new user in the database.
|
||||
"""
|
||||
|
||||
# actually create the new user
|
||||
# check permissions
|
||||
if not current_user.can_create(User):
|
||||
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
|
||||
|
||||
# create the new user
|
||||
new_user = User.create(user=user)
|
||||
|
||||
# fail if creation was unsuccessful
|
||||
if new_user is None:
|
||||
raise HTTPException(status_code=status.HTTP_409_CONFLICT)
|
||||
|
||||
new_user.set_tags([TagValue.login])
|
||||
new_user.add_tags([TagValue.login])
|
||||
new_user.update()
|
||||
|
||||
# return the created user on success
|
||||
|
|
@ -103,23 +106,22 @@ async def add_user(
|
|||
status.HTTP_200_OK: Responses.OK,
|
||||
status.HTTP_400_BAD_REQUEST: Responses.NOT_INSTALLED,
|
||||
status.HTTP_401_UNAUTHORIZED: Responses.NEEDS_USER,
|
||||
status.HTTP_403_FORBIDDEN: Responses.NEEDS_ADMIN,
|
||||
status.HTTP_403_FORBIDDEN: Responses.NEEDS_PERMISSION,
|
||||
status.HTTP_404_NOT_FOUND: Responses.ENTRY_DOESNT_EXIST,
|
||||
status.HTTP_406_NOT_ACCEPTABLE: Responses.CANT_TARGET_SELF,
|
||||
},
|
||||
response_model=User,
|
||||
)
|
||||
async def remove_user(
|
||||
current_user: User = Depends(get_current_user_if_admin),
|
||||
current_user: User = Depends(get_current_user),
|
||||
user: User = Depends(get_user_by_name),
|
||||
):
|
||||
"""
|
||||
DELETE ./{user_name}: Remove a user from the database.
|
||||
"""
|
||||
|
||||
# stop inting
|
||||
if current_user.name == user.name:
|
||||
raise HTTPException(status_code=status.HTTP_406_NOT_ACCEPTABLE)
|
||||
# check permissions
|
||||
if not current_user.can_admin(user):
|
||||
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
|
||||
|
||||
# delete user
|
||||
user.delete()
|
||||
|
|
@ -131,20 +133,24 @@ async def remove_user(
|
|||
status.HTTP_200_OK: Responses.OK,
|
||||
status.HTTP_400_BAD_REQUEST: Responses.NOT_INSTALLED,
|
||||
status.HTTP_401_UNAUTHORIZED: Responses.NEEDS_USER,
|
||||
status.HTTP_403_FORBIDDEN: Responses.NEEDS_ADMIN,
|
||||
status.HTTP_403_FORBIDDEN: Responses.NEEDS_PERMISSION,
|
||||
},
|
||||
)
|
||||
async def extend_tags(
|
||||
tags: list[TagValue],
|
||||
_: User = Depends(get_current_user_if_admin),
|
||||
current_user: User = Depends(get_current_user),
|
||||
user: User = Depends(get_user_by_name),
|
||||
):
|
||||
"""
|
||||
POST ./{user_name}/tags: Add tags to a user.
|
||||
"""
|
||||
|
||||
user.set_tags(user.get_tags() | set(tags))
|
||||
# check permissions
|
||||
if not current_user.can_admin(user):
|
||||
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
|
||||
|
||||
# change user
|
||||
user.add_tags(tags)
|
||||
user.update()
|
||||
|
||||
|
||||
|
|
@ -154,18 +160,22 @@ async def extend_tags(
|
|||
status.HTTP_200_OK: Responses.OK,
|
||||
status.HTTP_400_BAD_REQUEST: Responses.NOT_INSTALLED,
|
||||
status.HTTP_401_UNAUTHORIZED: Responses.NEEDS_USER,
|
||||
status.HTTP_403_FORBIDDEN: Responses.NEEDS_ADMIN,
|
||||
status.HTTP_403_FORBIDDEN: Responses.NEEDS_PERMISSION,
|
||||
},
|
||||
)
|
||||
async def remove_tags(
|
||||
tags: list[TagValue],
|
||||
_: User = Depends(get_current_user_if_admin),
|
||||
current_user: User = Depends(get_current_user),
|
||||
user: User = Depends(get_user_by_name),
|
||||
):
|
||||
"""
|
||||
DELETE ./{user_name}/tags: Remove tags from a user.
|
||||
"""
|
||||
|
||||
user.set_tags(user.get_tags() - set(tags))
|
||||
# check permissions
|
||||
if not current_user.can_admin(user):
|
||||
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
|
||||
|
||||
# change user
|
||||
user.remove_tags(tags)
|
||||
user.update()
|
||||
|
|
|
|||
Loading…
Reference in a new issue