175 lines
4.7 KiB
Python
175 lines
4.7 KiB
Python
"""
|
|
/user endpoints.
|
|
"""
|
|
|
|
from fastapi import APIRouter, Depends, HTTPException, status
|
|
from fastapi.security import OAuth2PasswordRequestForm
|
|
from pydantic import BaseModel
|
|
|
|
from ..config import Config
|
|
from ..db import User, UserCapabilityType, UserCreate, UserRead
|
|
from ._common import (Responses, current_user_is_admin,
|
|
get_current_user_if_exists, get_user_by_name)
|
|
|
|
router = APIRouter(prefix="/user", tags=["user"])
|
|
|
|
|
|
class Token(BaseModel):
|
|
"""
|
|
Response model for issuing tokens.
|
|
"""
|
|
|
|
access_token: str
|
|
token_type: str
|
|
|
|
|
|
@router.post("/authenticate", response_model=Token)
|
|
async def login(
|
|
form_data: OAuth2PasswordRequestForm = Depends(),
|
|
current_config: Config | None = Depends(Config.load),
|
|
):
|
|
"""
|
|
POST ./authenticate: Authenticate a user. Issues a bearer token.
|
|
"""
|
|
|
|
# fail if not installed
|
|
if current_config is None:
|
|
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST)
|
|
|
|
# try logging in
|
|
if not (user := User.authenticate(
|
|
name=form_data.username,
|
|
password=form_data.password,
|
|
)):
|
|
# authentication failed
|
|
raise HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="Could not validate credentials",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
|
|
if not user.can(UserCapabilityType.login):
|
|
# user cannot login
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
|
|
|
|
# authentication succeeded
|
|
access_token = await current_config.jwt.create_token(user.name)
|
|
return {"access_token": access_token, "token_type": "bearer"}
|
|
|
|
|
|
@router.get("/current", response_model=UserRead)
|
|
async def get_current_user(
|
|
current_user: User = Depends(get_current_user_if_exists),
|
|
):
|
|
"""
|
|
GET ./current: Respond with the currently logged-in user.
|
|
"""
|
|
|
|
return current_user
|
|
|
|
|
|
@router.post(
|
|
"",
|
|
responses={
|
|
status.HTTP_200_OK: Responses.OK,
|
|
status.HTTP_400_BAD_REQUEST: Responses.NOT_INSTALLED,
|
|
status.HTTP_401_UNAUTHORIZED: Responses.NEEDS_USER,
|
|
status.HTTP_403_FORBIDDEN: Responses.NEEDS_ADMIN,
|
|
status.HTTP_409_CONFLICT: Responses.ENTRY_EXISTS,
|
|
},
|
|
response_model=UserRead,
|
|
)
|
|
async def add_user(
|
|
user: UserCreate,
|
|
_: User = Depends(current_user_is_admin),
|
|
) -> User:
|
|
"""
|
|
POST ./: Create a new user in the database.
|
|
"""
|
|
|
|
# actually create the new user
|
|
new_user = User.create(user=user)
|
|
|
|
# fail if creation was unsuccessful
|
|
if new_user is None:
|
|
raise HTTPException(status_code=status.HTTP_409_CONFLICT)
|
|
|
|
new_user.set_capabilities([UserCapabilityType.login])
|
|
new_user.update()
|
|
|
|
# return the created user on success
|
|
return new_user
|
|
|
|
|
|
@router.delete(
|
|
"/{user_name}",
|
|
responses={
|
|
status.HTTP_200_OK: Responses.OK,
|
|
status.HTTP_400_BAD_REQUEST: Responses.NOT_INSTALLED,
|
|
status.HTTP_401_UNAUTHORIZED: Responses.NEEDS_USER,
|
|
status.HTTP_403_FORBIDDEN: Responses.NEEDS_ADMIN,
|
|
status.HTTP_404_NOT_FOUND: Responses.ENTRY_DOESNT_EXIST,
|
|
status.HTTP_406_NOT_ACCEPTABLE: Responses.CANT_TARGET_SELF,
|
|
},
|
|
response_model=User,
|
|
)
|
|
async def remove_user(
|
|
_: User = Depends(current_user_is_admin),
|
|
user: User = Depends(get_user_by_name),
|
|
):
|
|
"""
|
|
DELETE ./{user_name}: Remove a user from the database.
|
|
"""
|
|
|
|
# stop inting
|
|
if current_user.name == user.name:
|
|
raise HTTPException(status_code=status.HTTP_406_NOT_ACCEPTABLE)
|
|
|
|
# delete user
|
|
user.delete()
|
|
|
|
|
|
@router.post(
|
|
"/{user_name}/capabilities",
|
|
responses={
|
|
status.HTTP_200_OK: Responses.OK,
|
|
status.HTTP_400_BAD_REQUEST: Responses.NOT_INSTALLED,
|
|
status.HTTP_401_UNAUTHORIZED: Responses.NEEDS_USER,
|
|
status.HTTP_403_FORBIDDEN: Responses.NEEDS_ADMIN,
|
|
},
|
|
)
|
|
async def extend_capabilities(
|
|
capabilities: list[UserCapabilityType],
|
|
_: User = Depends(current_user_is_admin),
|
|
user: User = Depends(get_user_by_name),
|
|
):
|
|
"""
|
|
POST ./{user_name}/capabilities: Add capabilities to a user.
|
|
"""
|
|
|
|
user.set_capabilities(user.get_capabilities() | set(capabilities))
|
|
|
|
user.update()
|
|
|
|
|
|
@router.delete(
|
|
"/{user_name}/capabilities",
|
|
responses={
|
|
status.HTTP_200_OK: Responses.OK,
|
|
status.HTTP_400_BAD_REQUEST: Responses.NOT_INSTALLED,
|
|
status.HTTP_401_UNAUTHORIZED: Responses.NEEDS_USER,
|
|
status.HTTP_403_FORBIDDEN: Responses.NEEDS_ADMIN,
|
|
},
|
|
)
|
|
async def remove_capabilities(
|
|
capabilities: list[UserCapabilityType],
|
|
_: User = Depends(current_user_is_admin),
|
|
user: User = Depends(get_user_by_name),
|
|
):
|
|
"""
|
|
DELETE ./{user_name}/capabilities: Remove capabilities from a user.
|
|
"""
|
|
|
|
user.set_capabilities(user.get_capabilities() - set(capabilities))
|
|
|
|
user.update()
|