Compare commits

..

No commits in common. "617ae92d72e13edecfee0d0d49d234cbac02f8a9" and "4120a9b71fec3716b57afe543f0e6622472e1ea5" have entirely different histories.

6 changed files with 12 additions and 97 deletions

View file

@ -85,15 +85,6 @@ class Device(DeviceBase, table=True):
# device already existed
return None
@classmethod
def get(cls, id: int) -> Device | None:
"""
Load device from database by id.
"""
with Connection.session as db:
return db.get(cls, id)
def update(self) -> None:
"""
Update this device in the database.

View file

@ -184,7 +184,6 @@ class User(UserBase, table=True):
return (
capability in self.get_capabilities()
# admin can do everything
or UserCapabilityType.admin in self.get_capabilities()
)
@ -202,17 +201,3 @@ class User(UserBase, table=True):
capability_name=capability.value,
) for capability in capabilities
]
def owns(
self,
device: Device,
) -> bool:
"""
Check if this user owns a device.
"""
return (
device.owner_name == self.name
# admin owns everything
or self.can(UserCapabilityType.admin)
)

View file

@ -7,7 +7,7 @@ from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from ..config import Config, Settings
from ..db import Device, User, UserCapabilityType
from ..db import User, UserCapabilityType
oauth2_scheme = OAuth2PasswordBearer(
tokenUrl=f"{Settings._.api_v1_prefix}/user/authenticate"
@ -52,10 +52,6 @@ class Responses:
"description": "Entry does not exist in database",
"content": None,
}
CANT_TARGET_SELF = {
"description": "Operation can't target yourself",
"content": None,
}
async def get_current_user(
@ -84,12 +80,12 @@ async def get_current_user_if_exists(
# fail if not requested by a user
if current_user is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
return current_user
async def get_current_user_if_admin(
async def current_user_is_admin(
current_user: User = Depends(get_current_user_if_exists),
) -> User:
"""
@ -99,8 +95,6 @@ async def get_current_user_if_admin(
if not current_user.can(UserCapabilityType.admin):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
return current_user
async def get_user_by_name(
user_name: str,
@ -128,31 +122,3 @@ async def get_user_by_name(
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
return user
async def get_device_by_id(
device_id: int,
current_config: Config | None = Depends(Config.load),
) -> Device | None:
# can't connect to an unconfigured database
if current_config is None:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST)
return Device.get(device_id)
async def get_device_by_id_if_editable(
device: Device | None = Depends(get_device_by_id),
current_user: User = Depends(get_current_user_if_exists),
) -> Device:
# fail if device doesn't exist
if device is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
# fail if device is not owned by current user
if not current_user.owns(device):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
return device

View file

@ -8,7 +8,7 @@ from sqlmodel import select
from ..config import Config
from ..db import Connection, User, UserCapabilityType, UserCreate
from ._common import Responses, get_current_user_if_admin
from ._common import Responses, current_user_is_admin
router = APIRouter(prefix="/admin", tags=["admin"])
@ -63,7 +63,7 @@ async def create_initial_admin(
raise HTTPException(status_code=status.HTTP_409_CONFLICT)
# create an administrative user
new_user = User.create(user=admin_user)
new_user = User.create(admin_user)
new_user.set_capabilities([UserCapabilityType.admin])
new_user.update()
@ -79,7 +79,7 @@ async def create_initial_admin(
)
async def set_config(
config: Config,
_: User = Depends(get_current_user_if_admin),
_: User = Depends(current_user_is_admin),
):
"""
PUT ./config: Edit `kiwi-vpn` main config.

View file

@ -5,7 +5,7 @@
from fastapi import APIRouter, Depends, HTTPException, status
from ..db import Device, DeviceCreate, DeviceRead, User
from ._common import Responses, get_device_by_id_if_editable, get_user_by_name
from ._common import Responses, get_user_by_name
router = APIRouter(prefix="/device", tags=["device"])
@ -42,25 +42,3 @@ async def add_device(
# return the created device on success
return new_device
@router.delete(
"/{device_id}",
responses={
status.HTTP_200_OK: Responses.OK,
status.HTTP_400_BAD_REQUEST: Responses.NOT_INSTALLED,
status.HTTP_401_UNAUTHORIZED: Responses.NEEDS_USER,
status.HTTP_403_FORBIDDEN: Responses.NEEDS_ADMIN,
status.HTTP_404_NOT_FOUND: Responses.ENTRY_DOESNT_EXIST,
},
response_model=User,
)
async def remove_device(
device: Device = Depends(get_device_by_id_if_editable),
):
"""
DELETE ./{device_id}: Remove a device from the database.
"""
# delete device
device.delete()

View file

@ -8,7 +8,7 @@ from pydantic import BaseModel
from ..config import Config
from ..db import User, UserCapabilityType, UserCreate, UserRead
from ._common import (Responses, get_current_user_if_admin,
from ._common import (Responses, current_user_is_admin,
get_current_user_if_exists, get_user_by_name)
router = APIRouter(prefix="/user", tags=["user"])
@ -81,7 +81,7 @@ async def get_current_user(
)
async def add_user(
user: UserCreate,
_: User = Depends(get_current_user_if_admin),
_: User = Depends(current_user_is_admin),
) -> User:
"""
POST ./: Create a new user in the database.
@ -109,22 +109,17 @@ async def add_user(
status.HTTP_401_UNAUTHORIZED: Responses.NEEDS_USER,
status.HTTP_403_FORBIDDEN: Responses.NEEDS_ADMIN,
status.HTTP_404_NOT_FOUND: Responses.ENTRY_DOESNT_EXIST,
status.HTTP_406_NOT_ACCEPTABLE: Responses.CANT_TARGET_SELF,
},
response_model=User,
)
async def remove_user(
current_user: User = Depends(get_current_user_if_admin),
_: User = Depends(current_user_is_admin),
user: User = Depends(get_user_by_name),
):
"""
DELETE ./{user_name}: Remove a user from the database.
"""
# stop inting
if current_user.name == user.name:
raise HTTPException(status_code=status.HTTP_406_NOT_ACCEPTABLE)
# delete user
user.delete()
@ -140,7 +135,7 @@ async def remove_user(
)
async def extend_capabilities(
capabilities: list[UserCapabilityType],
_: User = Depends(get_current_user_if_admin),
_: User = Depends(current_user_is_admin),
user: User = Depends(get_user_by_name),
):
"""
@ -163,7 +158,7 @@ async def extend_capabilities(
)
async def remove_capabilities(
capabilities: list[UserCapabilityType],
_: User = Depends(get_current_user_if_admin),
_: User = Depends(current_user_is_admin),
user: User = Depends(get_user_by_name),
):
"""