""" /user endpoints. """ from fastapi import APIRouter, Depends, HTTPException, status from fastapi.security import OAuth2PasswordRequestForm from pydantic import BaseModel from ..config import Config from ..db import Capability, User, UserCreate, UserRead from ._common import Responses, get_current_user, get_current_user_if_admin router = APIRouter(prefix="/user", tags=["user"]) class Token(BaseModel): """ Response model for issuing tokens. """ access_token: str token_type: str @router.post("/authenticate", response_model=Token) async def login( form_data: OAuth2PasswordRequestForm = Depends(), current_config: Config | None = Depends(Config.load), ): """ POST ./authenticate: Authenticate a user. Issues a bearer token. """ # fail if not installed if current_config is None: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST) # try logging in if not (user := User.authenticate( name=form_data.username, password=form_data.password, )): # authentication failed raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) # authentication succeeded access_token = await current_config.jwt.create_token(user.name) return {"access_token": access_token, "token_type": "bearer"} @router.get("/current", response_model=UserRead) async def get_current_user( current_user: User | None = Depends(get_current_user), ): """ GET ./current: Respond with the currently logged-in user. """ return current_user @router.post( "", responses={ status.HTTP_200_OK: Responses.OK, status.HTTP_400_BAD_REQUEST: Responses.NOT_INSTALLED, status.HTTP_401_UNAUTHORIZED: Responses.NEEDS_USER, status.HTTP_403_FORBIDDEN: Responses.NEEDS_ADMIN, status.HTTP_409_CONFLICT: Responses.ENTRY_EXISTS, }, response_model=User, ) async def add_user( user: UserCreate, _: User = Depends(get_current_user_if_admin), ): """ POST ./: Create a new user in the database. """ # actually create the new user new_user = User.create(**user.dict()) # fail if creation was unsuccessful if new_user is None: raise HTTPException(status_code=status.HTTP_409_CONFLICT) # return the created user on success return new_user @router.delete( "/{user_name}", responses={ status.HTTP_200_OK: Responses.OK, status.HTTP_400_BAD_REQUEST: Responses.NOT_INSTALLED, status.HTTP_401_UNAUTHORIZED: Responses.NEEDS_USER, status.HTTP_403_FORBIDDEN: Responses.NEEDS_ADMIN, status.HTTP_404_NOT_FOUND: Responses.ENTRY_DOESNT_EXIST, }, response_model=User, ) async def remove_user( user_name: str, _: User = Depends(get_current_user_if_admin), ): """ DELETE ./{user_name}: Remove a user from the database. """ # get the user user = User.get(user_name) # fail if user not found if user is None: raise HTTPException(status_code=status.HTTP_404_NOT_FOUND) # delete user user.delete() @router.post( "/{user_name}/capabilities", responses={ status.HTTP_200_OK: Responses.OK, status.HTTP_400_BAD_REQUEST: Responses.NOT_INSTALLED, status.HTTP_401_UNAUTHORIZED: Responses.NEEDS_USER, status.HTTP_403_FORBIDDEN: Responses.NEEDS_ADMIN, }, ) async def extend_capabilities( user_name: str, capabilities: list[Capability], _: User = Depends(get_current_user_if_admin), ): """ POST ./{user_name}/capabilities: Add capabilities to a user. """ # get and change the user user = User.get(user_name) user.set_capabilities( user.get_capabilities() | set(capabilities) ) user.update() @router.delete( "/{user_name}/capabilities", responses={ status.HTTP_200_OK: Responses.OK, status.HTTP_400_BAD_REQUEST: Responses.NOT_INSTALLED, status.HTTP_401_UNAUTHORIZED: Responses.NEEDS_USER, status.HTTP_403_FORBIDDEN: Responses.NEEDS_ADMIN, }, ) async def remove_capabilities( user_name: str, capabilities: list[Capability], _: User = Depends(get_current_user_if_admin), ): """ DELETE ./{user_name}/capabilities: Remove capabilities from a user. """ # get and change the user user = User.get(user_name) user.set_capabilities( user.get_capabilities() - set(capabilities) ) user.update()