kiwi-vpn/api/kiwi_vpn_api/routers/user.py

180 lines
4.5 KiB
Python
Raw Normal View History

2022-03-20 03:45:40 +00:00
"""
/user endpoints.
"""
from fastapi import APIRouter, Depends, HTTPException, status
2022-03-19 04:07:19 +00:00
from fastapi.security import OAuth2PasswordRequestForm
from pydantic import BaseModel
2022-03-19 02:22:49 +00:00
from ..config import Config
2022-03-28 22:07:12 +00:00
from ..db import User, UserCapabilityType, UserCreate, UserRead
2022-03-23 01:14:02 +00:00
from ._common import Responses, get_current_user, get_current_user_if_admin
2022-03-15 16:25:07 +00:00
2022-03-24 23:45:01 +00:00
router = APIRouter(prefix="/user", tags=["user"])
class Token(BaseModel):
2022-03-20 03:45:40 +00:00
"""
Response model for issuing tokens.
"""
access_token: str
token_type: str
2022-03-20 03:45:40 +00:00
@router.post("/authenticate", response_model=Token)
2022-03-18 23:45:09 +00:00
async def login(
2022-03-18 23:04:28 +00:00
form_data: OAuth2PasswordRequestForm = Depends(),
2022-03-19 04:07:19 +00:00
current_config: Config | None = Depends(Config.load),
):
2022-03-20 03:45:40 +00:00
"""
POST ./authenticate: Authenticate a user. Issues a bearer token.
"""
# fail if not installed
2022-03-19 04:07:19 +00:00
if current_config is None:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST)
2022-03-20 03:45:40 +00:00
# try logging in
2022-03-28 20:18:19 +00:00
if not (user := User.authenticate(
name=form_data.username,
password=form_data.password,
2022-03-28 20:18:19 +00:00
)):
# authentication failed
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
2022-03-20 03:45:40 +00:00
# authentication succeeded
2022-03-19 04:07:19 +00:00
access_token = await current_config.jwt.create_token(user.name)
return {"access_token": access_token, "token_type": "bearer"}
2022-03-28 20:18:19 +00:00
@router.get("/current", response_model=UserRead)
async def get_current_user(
2022-03-20 03:45:40 +00:00
current_user: User | None = Depends(get_current_user),
):
2022-03-20 03:45:40 +00:00
"""
GET ./current: Respond with the currently logged-in user.
"""
return current_user
2022-03-19 18:06:28 +00:00
@router.post(
2022-03-23 13:40:14 +00:00
"",
2022-03-19 18:06:28 +00:00
responses={
2022-03-20 03:45:40 +00:00
status.HTTP_200_OK: Responses.OK,
status.HTTP_400_BAD_REQUEST: Responses.NOT_INSTALLED,
status.HTTP_401_UNAUTHORIZED: Responses.NEEDS_USER,
status.HTTP_403_FORBIDDEN: Responses.NEEDS_ADMIN,
status.HTTP_409_CONFLICT: Responses.ENTRY_EXISTS,
2022-03-19 18:06:28 +00:00
},
2022-03-20 02:32:40 +00:00
response_model=User,
2022-03-19 18:06:28 +00:00
)
async def add_user(
2022-03-20 02:32:40 +00:00
user: UserCreate,
2022-03-23 01:14:02 +00:00
_: User = Depends(get_current_user_if_admin),
2022-03-19 18:06:28 +00:00
):
2022-03-20 03:45:40 +00:00
"""
2022-03-23 13:25:00 +00:00
POST ./: Create a new user in the database.
2022-03-20 03:45:40 +00:00
"""
# actually create the new user
2022-03-28 20:18:19 +00:00
new_user = User.create(**user.dict())
2022-03-19 18:06:28 +00:00
2022-03-20 03:45:40 +00:00
# fail if creation was unsuccessful
2022-03-19 18:06:28 +00:00
if new_user is None:
raise HTTPException(status_code=status.HTTP_409_CONFLICT)
2022-03-20 03:45:40 +00:00
# return the created user on success
2022-03-19 18:06:28 +00:00
return new_user
2022-03-23 00:39:19 +00:00
2022-03-23 13:25:00 +00:00
@router.delete(
"/{user_name}",
responses={
status.HTTP_200_OK: Responses.OK,
status.HTTP_400_BAD_REQUEST: Responses.NOT_INSTALLED,
status.HTTP_401_UNAUTHORIZED: Responses.NEEDS_USER,
status.HTTP_403_FORBIDDEN: Responses.NEEDS_ADMIN,
2022-03-23 15:27:41 +00:00
status.HTTP_404_NOT_FOUND: Responses.ENTRY_DOESNT_EXIST,
2022-03-23 13:25:00 +00:00
},
response_model=User,
)
async def remove_user(
user_name: str,
_: User = Depends(get_current_user_if_admin),
):
"""
DELETE ./{user_name}: Remove a user from the database.
"""
# get the user
2022-03-28 20:18:19 +00:00
user = User.get(user_name)
2022-03-23 13:25:00 +00:00
2022-03-28 20:18:19 +00:00
# fail if user not found
if user is None:
2022-03-23 15:27:41 +00:00
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
2022-03-23 13:25:00 +00:00
2022-03-28 20:18:19 +00:00
# delete user
user.delete()
2022-03-23 13:25:00 +00:00
2022-03-23 00:39:19 +00:00
@router.post(
"/{user_name}/capabilities",
responses={
status.HTTP_200_OK: Responses.OK,
status.HTTP_400_BAD_REQUEST: Responses.NOT_INSTALLED,
status.HTTP_401_UNAUTHORIZED: Responses.NEEDS_USER,
status.HTTP_403_FORBIDDEN: Responses.NEEDS_ADMIN,
},
)
async def extend_capabilities(
user_name: str,
2022-03-28 21:41:49 +00:00
capabilities: list[UserCapabilityType],
2022-03-23 01:14:02 +00:00
_: User = Depends(get_current_user_if_admin),
2022-03-23 00:39:19 +00:00
):
"""
POST ./{user_name}/capabilities: Add capabilities to a user.
"""
# get and change the user
2022-03-28 20:18:19 +00:00
user = User.get(user_name)
user.set_capabilities(
user.get_capabilities() | set(capabilities)
2022-03-23 00:39:19 +00:00
)
2022-03-28 20:18:19 +00:00
user.update()
2022-03-23 00:39:19 +00:00
@router.delete(
"/{user_name}/capabilities",
responses={
status.HTTP_200_OK: Responses.OK,
status.HTTP_400_BAD_REQUEST: Responses.NOT_INSTALLED,
status.HTTP_401_UNAUTHORIZED: Responses.NEEDS_USER,
status.HTTP_403_FORBIDDEN: Responses.NEEDS_ADMIN,
},
)
async def remove_capabilities(
user_name: str,
2022-03-28 21:41:49 +00:00
capabilities: list[UserCapabilityType],
2022-03-23 01:14:02 +00:00
_: User = Depends(get_current_user_if_admin),
2022-03-23 00:39:19 +00:00
):
"""
2022-03-23 01:14:02 +00:00
DELETE ./{user_name}/capabilities: Remove capabilities from a user.
2022-03-23 00:39:19 +00:00
"""
# get and change the user
2022-03-28 20:18:19 +00:00
user = User.get(user_name)
2022-03-23 00:39:19 +00:00
2022-03-28 20:18:19 +00:00
user.set_capabilities(
user.get_capabilities() - set(capabilities)
)
2022-03-23 00:39:19 +00:00
2022-03-28 20:18:19 +00:00
user.update()