kiwi-vpn/api/kiwi_vpn_api/routers/user.py

85 lines
2.5 KiB
Python
Raw Normal View History

from fastapi import APIRouter, Depends, HTTPException, status
2022-03-19 04:07:19 +00:00
from fastapi.security import OAuth2PasswordRequestForm
from pydantic import BaseModel
2022-03-18 23:04:28 +00:00
from sqlalchemy.orm import Session
2022-03-19 02:22:49 +00:00
from ..config import Config
2022-03-18 23:45:09 +00:00
from ..db import Connection, schemas
2022-03-19 04:07:19 +00:00
from . import _deps
2022-03-15 16:25:07 +00:00
2022-03-18 23:04:28 +00:00
router = APIRouter(prefix="/user")
class Token(BaseModel):
access_token: str
token_type: str
2022-03-18 23:04:28 +00:00
@router.post("/auth", response_model=Token)
2022-03-18 23:45:09 +00:00
async def login(
2022-03-18 23:04:28 +00:00
form_data: OAuth2PasswordRequestForm = Depends(),
2022-03-19 04:07:19 +00:00
current_config: Config | None = Depends(Config.load),
db: Session | None = Depends(Connection.get),
):
2022-03-19 04:07:19 +00:00
if current_config is None:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST)
2022-03-19 02:38:32 +00:00
user = schemas.User.login(
db=db,
name=form_data.username,
password=form_data.password,
2022-03-19 04:07:19 +00:00
crypt_context=await current_config.crypto.crypt_context,
)
if user is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
2022-03-19 04:07:19 +00:00
access_token = await current_config.jwt.create_token(user.name)
return {"access_token": access_token, "token_type": "bearer"}
@router.get("/current", response_model=schemas.User)
async def get_current_user(
2022-03-19 04:07:19 +00:00
current_user: schemas.User | None = Depends(_deps.get_current_user),
):
return current_user
2022-03-19 18:06:28 +00:00
@router.post(
"/new",
responses={
status.HTTP_200_OK: _deps.Responses.ok,
status.HTTP_400_BAD_REQUEST: _deps.Responses.not_installed,
status.HTTP_401_UNAUTHORIZED: _deps.Responses.needs_user,
status.HTTP_403_FORBIDDEN: _deps.Responses.needs_admin,
status.HTTP_409_CONFLICT: _deps.Responses.entry_exists,
},
response_model=schemas.User,
)
async def add_user(
user: schemas.UserCreate,
current_config: Config | None = Depends(Config.load),
current_user: schemas.User | None = Depends(_deps.get_current_user),
db: Session | None = Depends(Connection.get),
):
if current_config is None:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST)
if current_user is None or "admin" not in current_user.capabilities:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
new_user = schemas.User.create(
db=db,
user=user,
crypt_context=await current_config.crypto.crypt_context,
)
if new_user is None:
raise HTTPException(status_code=status.HTTP_409_CONFLICT)
return new_user