""" Common dependencies for routers. """ from fastapi import Depends, HTTPException, status from fastapi.security import OAuth2PasswordBearer from sqlalchemy.orm import Session from ..config import Config from ..db import Connection from ..db.schemata import User oauth2_scheme = OAuth2PasswordBearer(tokenUrl="user/authenticate") class Responses: """ Just a namespace. Describes API response status codes. """ OK = { "content": None, } INSTALLED = { "description": "kiwi-vpn already installed", "content": None, } NOT_INSTALLED = { "description": "kiwi-vpn not installed", "content": None, } NEEDS_USER = { "description": "Must be logged in", "content": None, } NEEDS_ADMIN = { "description": "Must be admin", "content": None, } NEEDS_ADMIN_OR_SELF = { "description": "Must be the requested user", "content": None, } ENTRY_EXISTS = { "description": "Entry exists in database", "content": None, } ENTRY_DOESNT_EXIST = { "description": "Entry does not exist in database", "content": None, } async def get_current_user( token: str = Depends(oauth2_scheme), db: Session | None = Depends(Connection.get), current_config: Config | None = Depends(Config.load), ) -> User | None: """ Get the currently logged-in user from the database. """ # can't connect to an unconfigured database if current_config is None: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST) username = await current_config.jwt.decode_token(token) user = User.from_db(db, username) return user async def get_current_user_if_exists( current_config: Config | None = Depends(Config.load), current_user: User | None = Depends(get_current_user), ) -> User: """ Get the currently logged-in user if it exists. """ # fail if not requested by a user if current_user is None: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) return current_user async def get_current_user_if_admin( current_config: Config | None = Depends(Config.load), current_user: User = Depends(get_current_user_if_exists), ) -> User: """ Get the currently logged-in user if it is an admin. """ # fail if not requested by an admin if not current_user.is_admin(): raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) return current_user async def get_current_user_if_admin_or_self( user_name: str, current_config: Config | None = Depends(Config.load), current_user: User = Depends(get_current_user_if_exists), ) -> User: """ Get the currently logged-in user. Fails a) if the currently logged-in user is not the requested user, and b) if it is not an admin. """ # fail if not requested by an admin or self if not (current_user.is_admin() or current_user.name == user_name): raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) return current_user