kiwi-vpn/api/kiwi_vpn_api/routers/install.py

127 lines
2.8 KiB
Python

import json
from pathlib import Path
from secrets import token_hex
from fastapi import APIRouter, Depends, HTTPException, status
from peewee import Database
from ..config import BaseConfig
from ..db import DB, Certificate, DistinguishedName, User, UserCapability
router = APIRouter(prefix="/install")
CONFIG_FILE = "tmp/config.json"
async def has_config() -> bool:
return Path(CONFIG_FILE).is_file()
async def load_config() -> BaseConfig:
try:
with open(CONFIG_FILE, "r") as kv:
return BaseConfig.parse_obj(json.load(kv))
except FileNotFoundError:
return BaseConfig()
async def connect_db(config: BaseConfig = Depends(load_config)) -> Database:
db = await config.db.database
db.connect()
return db
async def has_tables(db: Database = Depends(connect_db)) -> bool:
return db.table_exists(User)
@router.get(
"/config",
response_model=BaseConfig,
responses={
status.HTTP_403_FORBIDDEN: {
"description": "Must be admin",
"content": None,
},
},
)
async def get_config(
config: BaseConfig = Depends(load_config),
has_config: bool = Depends(has_config),
):
if has_config:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
return config
@router.put(
"/config",
responses={
status.HTTP_200_OK: {
"content": None,
},
status.HTTP_403_FORBIDDEN: {
"description": "Must be admin",
"content": None,
},
},
)
async def set_config(
config: BaseConfig,
has_config: bool = Depends(has_config),
):
if has_config:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
if config.jwt.secret is None:
config.jwt.secret = token_hex(32)
DB.initialize(await connect_db(config))
with open(CONFIG_FILE, "w") as kv:
kv.write(config.json(indent=2))
@router.get("/db", responses={
status.HTTP_200_OK: {
"model": bool,
},
})
async def check_db(
has_tables: bool = Depends(has_tables),
):
return has_tables
@router.put(
"/db",
responses={
status.HTTP_200_OK: {
"content": None,
},
status.HTTP_400_BAD_REQUEST: {
"description": "Database exists",
"content": None,
},
},
)
async def create_db(
admin_name: str,
admin_password: str,
config: BaseConfig = Depends(load_config),
db: Database = Depends(connect_db),
):
if await has_tables(db):
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST)
db.create_tables([Certificate, DistinguishedName, User, UserCapability])
cryptContext = await config.crypto.cryptContext
admin = User.create(
name=admin_name,
password=cryptContext.hash(admin_password),
)
UserCapability.create(user=admin, capability="admin")