kiwi-vpn/api/kiwi_vpn_api/routers/user.py

195 lines
5.1 KiB
Python

"""
/user endpoints.
"""
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from pydantic import BaseModel
from sqlalchemy.orm import Session
from ..config import Config
from ..db import Connection
from ..db.schemata import User, UserCapability, UserCreate
from ._common import Responses, get_current_user, get_current_user_if_admin
router = APIRouter(prefix="/user", tags=["user"])
class Token(BaseModel):
"""
Response model for issuing tokens.
"""
access_token: str
token_type: str
@router.post("/authenticate", response_model=Token)
async def login(
form_data: OAuth2PasswordRequestForm = Depends(),
current_config: Config | None = Depends(Config.load),
db: Session | None = Depends(Connection.get),
):
"""
POST ./authenticate: Authenticate a user. Issues a bearer token.
"""
# fail if not installed
if current_config is None:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST)
# try logging in
user = User(name=form_data.username)
if not user.authenticate(
db=db,
password=form_data.password,
crypt_context=current_config.crypto.crypt_context,
):
# authentication failed
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
# authentication succeeded
access_token = await current_config.jwt.create_token(user.name)
return {"access_token": access_token, "token_type": "bearer"}
@router.get("/current", response_model=User)
async def get_current_user(
current_user: User | None = Depends(get_current_user),
):
"""
GET ./current: Respond with the currently logged-in user.
"""
return current_user
@router.post(
"",
responses={
status.HTTP_200_OK: Responses.OK,
status.HTTP_400_BAD_REQUEST: Responses.NOT_INSTALLED,
status.HTTP_401_UNAUTHORIZED: Responses.NEEDS_USER,
status.HTTP_403_FORBIDDEN: Responses.NEEDS_ADMIN,
status.HTTP_409_CONFLICT: Responses.ENTRY_EXISTS,
},
response_model=User,
)
async def add_user(
user: UserCreate,
current_config: Config | None = Depends(Config.load),
_: User = Depends(get_current_user_if_admin),
db: Session | None = Depends(Connection.get),
):
"""
POST ./: Create a new user in the database.
"""
# actually create the new user
new_user = User.create(
db=db,
user=user,
crypt_context=current_config.crypto.crypt_context,
)
# fail if creation was unsuccessful
if new_user is None:
raise HTTPException(status_code=status.HTTP_409_CONFLICT)
# return the created user on success
return new_user
@router.delete(
"/{user_name}",
responses={
status.HTTP_200_OK: Responses.OK,
status.HTTP_400_BAD_REQUEST: Responses.NOT_INSTALLED,
status.HTTP_401_UNAUTHORIZED: Responses.NEEDS_USER,
status.HTTP_403_FORBIDDEN: Responses.NEEDS_ADMIN,
status.HTTP_404_NOT_FOUND: Responses.ENTRY_DOESNT_EXIST,
},
response_model=User,
)
async def remove_user(
user_name: str,
_: User = Depends(get_current_user_if_admin),
db: Session | None = Depends(Connection.get),
):
"""
DELETE ./{user_name}: Remove a user from the database.
"""
# get the user
user = User.from_db(
db=db,
name=user_name,
)
# fail if deletion was unsuccessful
if user is None or not user.delete(db):
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
@router.post(
"/{user_name}/capabilities",
responses={
status.HTTP_200_OK: Responses.OK,
status.HTTP_400_BAD_REQUEST: Responses.NOT_INSTALLED,
status.HTTP_401_UNAUTHORIZED: Responses.NEEDS_USER,
status.HTTP_403_FORBIDDEN: Responses.NEEDS_ADMIN,
},
)
async def extend_capabilities(
user_name: str,
capabilities: list[UserCapability],
_: User = Depends(get_current_user_if_admin),
db: Session | None = Depends(Connection.get),
):
"""
POST ./{user_name}/capabilities: Add capabilities to a user.
"""
# get and change the user
user = User.from_db(
db=db,
name=user_name,
)
user.capabilities.extend(capabilities)
user.update(db)
@router.delete(
"/{user_name}/capabilities",
responses={
status.HTTP_200_OK: Responses.OK,
status.HTTP_400_BAD_REQUEST: Responses.NOT_INSTALLED,
status.HTTP_401_UNAUTHORIZED: Responses.NEEDS_USER,
status.HTTP_403_FORBIDDEN: Responses.NEEDS_ADMIN,
},
)
async def remove_capabilities(
user_name: str,
capabilities: list[UserCapability],
_: User = Depends(get_current_user_if_admin),
db: Session | None = Depends(Connection.get),
):
"""
DELETE ./{user_name}/capabilities: Remove capabilities from a user.
"""
# get and change the user
user = User.from_db(
db=db,
name=user_name,
)
for capability in capabilities:
user.capabilities.remove(capability)
user.update(db)