""" Common dependencies for routers. """ from fastapi import Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer from ..config import Config from ..db_new import Capability, User oauth2_scheme = OAuth2PasswordBearer(tokenUrl="user/authenticate") class Responses: """ Just a namespace. Describes API response status codes. """ OK = { "content": None, } INSTALLED = { "description": "kiwi-vpn already installed", "content": None, } NOT_INSTALLED = { "description": "kiwi-vpn not installed", "content": None, } NEEDS_USER = { "description": "Must be logged in", "content": None, } NEEDS_ADMIN = { "description": "Must be admin", "content": None, } NEEDS_ADMIN_OR_SELF = { "description": "Must be the requested user", "content": None, } ENTRY_EXISTS = { "description": "Entry exists in database", "content": None, } ENTRY_DOESNT_EXIST = { "description": "Entry does not exist in database", "content": None, } async def get_current_user( token: str = Depends(oauth2_scheme), current_config: Config | None = Depends(Config.load), ) -> User | None: """ Get the currently logged-in user from the database. """ # can't connect to an unconfigured database if current_config is None: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST) username = await current_config.jwt.decode_token(token) return User.get(username) async def get_current_user_if_exists( current_user: User | None = Depends(get_current_user), ) -> User: """ Get the currently logged-in user if it exists. """ # fail if not requested by a user if current_user is None: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) return current_user async def get_current_user_if_admin( current_user: User = Depends(get_current_user_if_exists), ) -> User: """ Get the currently logged-in user if it is an admin. """ # fail if not requested by an admin if not current_user.can(Capability.admin): raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) return current_user async def get_current_user_if_admin_or_self( user_name: str, current_user: User = Depends(get_current_user_if_exists), ) -> User: """ Get the currently logged-in user. Fails a) if the currently logged-in user is not the requested user, and b) if it is not an admin. """ # fail if not requested by an admin or self if not (current_user.can(Capability.admin) or current_user.name == user_name): raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) return current_user