""" /user endpoints. """ from fastapi import APIRouter, Depends, HTTPException, status from fastapi.security import OAuth2PasswordRequestForm from pydantic import BaseModel from sqlalchemy.orm import Session from ..config import Config from ..db import Connection from ..db.schemas import User, UserCapability, UserCreate from ._common import Responses, get_current_user, get_current_user_if_admin router = APIRouter(prefix="/user") class Token(BaseModel): """ Response model for issuing tokens. """ access_token: str token_type: str @router.post("/authenticate", response_model=Token) async def login( form_data: OAuth2PasswordRequestForm = Depends(), current_config: Config | None = Depends(Config.load), db: Session | None = Depends(Connection.get), ): """ POST ./authenticate: Authenticate a user. Issues a bearer token. """ # fail if not installed if current_config is None: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST) # try logging in user = User(name=form_data.username) if not user.authenticate( db=db, password=form_data.password, crypt_context=await current_config.crypto.crypt_context, ): # authentication failed raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) # authentication succeeded access_token = await current_config.jwt.create_token(user.name) return {"access_token": access_token, "token_type": "bearer"} @router.get("/current", response_model=User) async def get_current_user( current_user: User | None = Depends(get_current_user), ): """ GET ./current: Respond with the currently logged-in user. """ return current_user @router.post( "", responses={ status.HTTP_200_OK: Responses.OK, status.HTTP_400_BAD_REQUEST: Responses.NOT_INSTALLED, status.HTTP_401_UNAUTHORIZED: Responses.NEEDS_USER, status.HTTP_403_FORBIDDEN: Responses.NEEDS_ADMIN, status.HTTP_409_CONFLICT: Responses.ENTRY_EXISTS, }, response_model=User, ) async def add_user( user: UserCreate, current_config: Config | None = Depends(Config.load), _: User = Depends(get_current_user_if_admin), db: Session | None = Depends(Connection.get), ): """ POST ./: Create a new user in the database. """ # actually create the new user new_user = User.create( db=db, user=user, crypt_context=await current_config.crypto.crypt_context, ) # fail if creation was unsuccessful if new_user is None: raise HTTPException(status_code=status.HTTP_409_CONFLICT) # return the created user on success return new_user @router.delete( "/{user_name}", responses={ status.HTTP_200_OK: Responses.OK, status.HTTP_400_BAD_REQUEST: Responses.NOT_INSTALLED, status.HTTP_401_UNAUTHORIZED: Responses.NEEDS_USER, status.HTTP_403_FORBIDDEN: Responses.NEEDS_ADMIN, status.HTTP_404_NOT_FOUND: Responses.ENTRY_DOESNT_EXIST, }, response_model=User, ) async def remove_user( user_name: str, _: User = Depends(get_current_user_if_admin), db: Session | None = Depends(Connection.get), ): """ DELETE ./{user_name}: Remove a user from the database. """ # get the user user = User.from_db( db=db, name=user_name, ) # fail if deletion was unsuccessful if user is None or not user.delete(db): raise HTTPException(status_code=status.HTTP_404_NOT_FOUND) @router.post( "/{user_name}/capabilities", responses={ status.HTTP_200_OK: Responses.OK, status.HTTP_400_BAD_REQUEST: Responses.NOT_INSTALLED, status.HTTP_401_UNAUTHORIZED: Responses.NEEDS_USER, status.HTTP_403_FORBIDDEN: Responses.NEEDS_ADMIN, }, ) async def extend_capabilities( user_name: str, capabilities: list[UserCapability], _: User = Depends(get_current_user_if_admin), db: Session | None = Depends(Connection.get), ): """ POST ./{user_name}/capabilities: Add capabilities to a user. """ # get and change the user user = User.from_db( db=db, name=user_name, ) user.capabilities.extend(capabilities) user.update(db) @router.delete( "/{user_name}/capabilities", responses={ status.HTTP_200_OK: Responses.OK, status.HTTP_400_BAD_REQUEST: Responses.NOT_INSTALLED, status.HTTP_401_UNAUTHORIZED: Responses.NEEDS_USER, status.HTTP_403_FORBIDDEN: Responses.NEEDS_ADMIN, }, ) async def remove_capabilities( user_name: str, capabilities: list[UserCapability], _: User = Depends(get_current_user_if_admin), db: Session | None = Depends(Connection.get), ): """ DELETE ./{user_name}/capabilities: Remove capabilities from a user. """ # get and change the user user = User.from_db( db=db, name=user_name, ) for capability in capabilities: user.capabilities.remove(capability) user.update(db)