151 lines
3.2 KiB
Python
151 lines
3.2 KiB
Python
"""
|
|
Common dependencies for routers.
|
|
"""
|
|
|
|
from fastapi import Depends, HTTPException, status
|
|
from fastapi.security import OAuth2PasswordBearer
|
|
|
|
from ..config import SETTINGS, Config
|
|
from ..db import Device, User
|
|
from ..easyrsa import EASYRSA, CertificateType, EasyRSA
|
|
|
|
oauth2_scheme = OAuth2PasswordBearer(
|
|
tokenUrl=f"{SETTINGS.api_v1_prefix}/user/authenticate"
|
|
)
|
|
|
|
|
|
class Responses:
|
|
"""
|
|
Just a namespace.
|
|
|
|
Describes API response status codes.
|
|
"""
|
|
|
|
OK = {
|
|
"content": None,
|
|
}
|
|
NOT_INSTALLED = {
|
|
"description": "kiwi-vpn not installed",
|
|
"content": None,
|
|
}
|
|
NEEDS_USER = {
|
|
"description": "Not logged in",
|
|
"content": None,
|
|
}
|
|
NEEDS_PERMISSION = {
|
|
"description": "Operation not permitted",
|
|
"content": None,
|
|
}
|
|
NEEDS_PKI = {
|
|
"description": "PKI hasn't been initialized",
|
|
"content": None,
|
|
}
|
|
ENTRY_ADDED = {
|
|
"description": "Entry added to database",
|
|
"content": None,
|
|
}
|
|
ENTRY_EXISTS = {
|
|
"description": "Entry exists in database",
|
|
"content": None,
|
|
}
|
|
ENTRY_DOESNT_EXIST = {
|
|
"description": "Entry does not exist in database",
|
|
"content": None,
|
|
}
|
|
|
|
|
|
async def get_current_config(
|
|
current_config: Config | None = Depends(Config.load),
|
|
) -> Config:
|
|
"""
|
|
Get the current configuration if it exists.
|
|
|
|
Status:
|
|
|
|
- 400: `kiwi-vpn` not installed
|
|
"""
|
|
|
|
# fail if not configured
|
|
if current_config is None:
|
|
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST)
|
|
|
|
return current_config
|
|
|
|
|
|
async def get_current_user(
|
|
token: str = Depends(oauth2_scheme),
|
|
current_config: Config = Depends(get_current_config),
|
|
) -> User:
|
|
"""
|
|
Get the currently logged-in user if it exists.
|
|
|
|
Status:
|
|
|
|
- (400: `kiwi-vpn` not installed)
|
|
- 401: No auth token provided/not logged in
|
|
- 403: invalid auth token, or user not found
|
|
"""
|
|
|
|
# don't use error 404 here - possible user enumeration
|
|
|
|
# fail if not requested by a user
|
|
if (username := await current_config.jwt.decode_token(token)) is None:
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
|
|
|
|
if (user := User.get(username)) is None:
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
|
|
|
|
return user
|
|
|
|
|
|
async def get_user_by_name(
|
|
user_name: str,
|
|
) -> User:
|
|
"""
|
|
Get a user by name.
|
|
|
|
Status:
|
|
|
|
- 403: user not found
|
|
"""
|
|
|
|
# don't use error 404 here - possible user enumeration
|
|
|
|
# fail if user doesn't exist
|
|
if (user := User.get(user_name)) is None:
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
|
|
|
|
return user
|
|
|
|
|
|
async def get_device_by_id(
|
|
device_id: int,
|
|
) -> Device:
|
|
"""
|
|
Get a device by ID.
|
|
|
|
Status:
|
|
|
|
- 404: device not found
|
|
"""
|
|
|
|
# fail if device doesn't exist
|
|
if (device := Device.get(device_id)) is None:
|
|
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
|
|
|
|
return device
|
|
|
|
|
|
async def get_pki() -> EasyRSA:
|
|
"""
|
|
Get the EasyRSA object if the CA has been built.
|
|
|
|
Status:
|
|
|
|
- 425: EasyRSA not initialized
|
|
"""
|
|
|
|
if EASYRSA.get_certificate(CertificateType.ca) is None:
|
|
raise HTTPException(status_code=status.HTTP_425_TOO_EARLY)
|
|
|
|
return EASYRSA
|