""" Common dependencies for routers. """ from fastapi import Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer from ..config import SETTINGS, Config from ..db import Device, User from ..easyrsa import EASYRSA, EasyRSA oauth2_scheme = OAuth2PasswordBearer( tokenUrl=f"{SETTINGS.api_v1_prefix}/user/authenticate" ) class Responses: """ Just a namespace. Describes API response status codes. """ OK = { "description": "Operation successful", } OK_NONE = { "description": "Operation successful", "content": None, } OK_WAIT = { "description": "Operation successful, waiting for approval", } NOT_INSTALLED = { "description": "kiwi-vpn not installed", "content": None, } NEEDS_USER = { "description": "Not logged in", "content": None, } NEEDS_PERMISSION = { "description": "Operation not permitted", "content": None, } NEEDS_PKI = { "description": "PKI hasn't been initialized", "content": None, } ENTRY_ADDED = { "description": "Entry added to database", "content": None, } ENTRY_EXISTS = { "description": "Entry exists in database", "content": None, } ENTRY_DOESNT_EXIST = { "description": "Entry does not exist in database", "content": None, } async def get_current_config( current_config: Config | None = Depends(Config.load), ) -> Config: """ Get the current configuration if it exists. Status: - 400: `kiwi-vpn` not installed """ # fail if not configured if current_config is None: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST) return current_config async def get_current_user( token: str = Depends(oauth2_scheme), current_config: Config = Depends(get_current_config), ) -> User: """ Get the currently logged-in user if it exists. Status: - (400: `kiwi-vpn` not installed) - 401: No auth token provided/not logged in - 403: invalid auth token, or user not found """ # don't use error 404 here - possible user enumeration # fail if not requested by a user if (username := await current_config.jwt.decode_token(token)) is None: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) if (user := User.get(username)) is None: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) return user async def get_user_by_name( user_name: str, ) -> User: """ Get a user by name. Status: - 403: user not found """ # don't use error 404 here - possible user enumeration # fail if user doesn't exist if (user := User.get(user_name)) is None: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) return user async def get_device_by_id( device_id: int, ) -> Device: """ Get a device by ID. Status: - 404: device not found """ # fail if device doesn't exist if (device := Device.get(device_id)) is None: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND) return device async def get_pki() -> EasyRSA: """ Get the EasyRSA object if the CA has been built. Status: - 425: EasyRSA not initialized """ if EASYRSA.get_certificate() is None: raise HTTPException(status_code=status.HTTP_425_TOO_EARLY) return EASYRSA