import json from pathlib import Path from secrets import token_hex from fastapi import APIRouter, Depends, HTTPException, status from peewee import Database from ..config import BaseConfig from ..db import Certificate, DistinguishedName, User, UserCapability router = APIRouter(prefix="/install") CONFIG_FILE = "tmp/config.json" async def has_config() -> bool: return Path(CONFIG_FILE).is_file() async def load_config() -> BaseConfig: try: with open(CONFIG_FILE, "r") as kv: return BaseConfig.parse_obj(json.load(kv)) except FileNotFoundError: return BaseConfig() @router.get( "/config", response_model=BaseConfig, responses={ status.HTTP_403_FORBIDDEN: { "description": "Must be admin", "content": None, }, }, ) async def get_config( config: BaseConfig = Depends(load_config), has_config: bool = Depends(has_config), ): if has_config: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) return config @router.put( "/config", responses={ status.HTTP_200_OK: { "content": None, }, status.HTTP_403_FORBIDDEN: { "description": "Must be admin", "content": None, }, }, ) async def set_config( config: BaseConfig, has_config: bool = Depends(has_config), ): if has_config: raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) if config.jwt.secret is None: config.jwt.secret = token_hex(32) with open(CONFIG_FILE, "w") as kv: kv.write(config.json(indent=2)) async def connect_db(config: BaseConfig = Depends(load_config)) -> Database: db = await config.db.database db.connect() return db async def has_tables(db: Database = Depends(connect_db)): return db.table_exists(User) @router.get("/db", responses={ status.HTTP_200_OK: { "model": bool, }, }) async def check_db( has_tables: bool = Depends(has_tables), ): return has_tables @router.put( "/db", responses={ status.HTTP_200_OK: { "content": None, }, status.HTTP_400_BAD_REQUEST: { "description": "Database exists", "content": None, }, }, ) async def create_db( admin_name: str, admin_password: str, config: BaseConfig = Depends(load_config), db: Database = Depends(connect_db), ): if await has_tables(db): raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST) db.create_tables([Certificate, DistinguishedName, User, UserCapability]) cryptContext = await config.crypto.cryptContext admin = User.create( name=admin_name, password=cryptContext.hash(admin_password), ) UserCapability.create(user=admin, capability="admin")