kiwi-vpn/api/kiwi_vpn_api/routers/_common.py

125 lines
3 KiB
Python

"""
Common dependencies for routers.
"""
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from kiwi_vpn_api.db.tag import TagValue
from ..config import Config, Settings
from ..db import Device, User
oauth2_scheme = OAuth2PasswordBearer(
tokenUrl=f"{Settings._.api_v1_prefix}/user/authenticate"
)
class Responses:
"""
Just a namespace.
Describes API response status codes.
"""
OK = {
"content": None,
}
INSTALLED = {
"description": "kiwi-vpn already installed",
"content": None,
}
NOT_INSTALLED = {
"description": "kiwi-vpn not installed",
"content": None,
}
NEEDS_USER = {
"description": "Must be logged in",
"content": None,
}
NEEDS_ADMIN = {
"description": "Must be admin",
"content": None,
}
PERMISSION_ERROR = {
"description": "You're not allowed that action",
"content": None,
}
ENTRY_EXISTS = {
"description": "Entry exists in database",
"content": None,
}
ENTRY_DOESNT_EXIST = {
"description": "Entry does not exist in database",
"content": None,
}
CANT_TARGET_SELF = {
"description": "Operation can't target yourself",
"content": None,
}
async def get_current_user(
token: str = Depends(oauth2_scheme),
current_config: Config | None = Depends(Config.load),
) -> User:
"""
Get the currently logged-in user if it exists.
"""
# can't connect to an unconfigured database
if current_config is None:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST)
username = await current_config.jwt.decode_token(token)
# fail if not requested by a user
if (user := User.get(username)) is None:
# don't use error 404 here: possible user enumeration
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
return user
async def get_current_user_if_admin(
current_user: User = Depends(get_current_user),
) -> User:
"""
Fail if the currently logged-in user is not an admin.
"""
if current_user.has_tag(TagValue.admin):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
return current_user
async def get_user_by_name(
user_name: str,
current_config: Config | None = Depends(Config.load),
) -> User | None:
"""
Get a user by name.
"""
# can't connect to an unconfigured database
if current_config is None:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST)
return User.get(user_name)
async def get_device_by_id(
device_id: int,
current_config: Config | None = Depends(Config.load),
) -> Device | None:
# can't connect to an unconfigured database
if current_config is None:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST)
# fail if device doesn't exist
if (device := Device.get(device_id)) is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
return device