kiwi-vpn/api/kiwi_vpn_api/routers/user.py

115 lines
3.1 KiB
Python

"""
/user endpoints.
"""
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from pydantic import BaseModel
from sqlalchemy.orm import Session
from ..config import Config
from ..db import Connection
from ..db.schemas import User, UserCapability, UserCreate
from ._common import Responses, get_current_user
router = APIRouter(prefix="/user")
class Token(BaseModel):
"""
Response model for issuing tokens.
"""
access_token: str
token_type: str
@router.post("/authenticate", response_model=Token)
async def login(
form_data: OAuth2PasswordRequestForm = Depends(),
current_config: Config | None = Depends(Config.load),
db: Session | None = Depends(Connection.get),
):
"""
POST ./authenticate: Authenticate a user. Issues a bearer token.
"""
# fail if not installed
if current_config is None:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST)
# try logging in
user = User.authenticate(
db=db,
name=form_data.username,
password=form_data.password,
crypt_context=await current_config.crypto.crypt_context,
)
# authentication failed
if user is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
# authentication succeeded
access_token = await current_config.jwt.create_token(user.name)
return {"access_token": access_token, "token_type": "bearer"}
@router.get("/current", response_model=User)
async def get_current_user(
current_user: User | None = Depends(get_current_user),
):
"""
GET ./current: Respond with the currently logged-in user.
"""
return current_user
@router.post(
"/new",
responses={
status.HTTP_200_OK: Responses.OK,
status.HTTP_400_BAD_REQUEST: Responses.NOT_INSTALLED,
status.HTTP_401_UNAUTHORIZED: Responses.NEEDS_USER,
status.HTTP_403_FORBIDDEN: Responses.NEEDS_ADMIN,
status.HTTP_409_CONFLICT: Responses.ENTRY_EXISTS,
},
response_model=User,
)
async def add_user(
user: UserCreate,
current_config: Config | None = Depends(Config.load),
current_user: User | None = Depends(get_current_user),
db: Session | None = Depends(Connection.get),
):
"""
POST ./new: Create a new user in the database.
"""
# fail if not installed
if current_config is None:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST)
# fail if not requested by an admin
if (current_user is None
or UserCapability.admin not in current_user.capabilities):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
# actually create the new user
new_user = User.create(
db=db,
user=user,
crypt_context=await current_config.crypto.crypt_context,
)
# fail if creation was unsuccessful
if new_user is None:
raise HTTPException(status_code=status.HTTP_409_CONFLICT)
# return the created user on success
return new_user