kiwi-vpn/api/kiwi_vpn_api/routers/_common.py

83 lines
2 KiB
Python

"""
Common dependencies for routers.
"""
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.orm import Session
from ..config import Config
from ..db import Connection
from ..db.schemas import User, UserCapability
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="user/authenticate")
class Responses:
"""
Just a namespace.
Describes API response status codes.
"""
OK = {
"content": None,
}
INSTALLED = {
"description": "kiwi-vpn already installed",
"content": None,
}
NOT_INSTALLED = {
"description": "kiwi-vpn not installed",
"content": None,
}
NEEDS_USER = {
"description": "Must be logged in",
"content": None,
}
NEEDS_ADMIN = {
"description": "Must be admin",
"content": None,
}
ENTRY_EXISTS = {
"description": "Entry exists in database",
"content": None,
}
async def get_current_user(
token: str = Depends(oauth2_scheme),
db: Session | None = Depends(Connection.get),
current_config: Config | None = Depends(Config.load),
) -> User | None:
"""
Get the currently logged-in user from the database.
"""
# can't connect to an unconfigured database
if current_config is None:
return None
username = await current_config.jwt.decode_token(token)
user = User.from_db(db, username)
return user
async def get_current_admin_user(
current_config: Config | None = Depends(Config.load),
current_user: User | None = Depends(get_current_user),
) -> User:
"""
Check if the currently logged-in user is an admin.
"""
# fail if not installed
if current_config is None:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST)
# fail if not requested by an admin
if (current_user is None
or UserCapability.admin not in current_user.capabilities):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)