config handling

This commit is contained in:
Jörn-Michael Miehe 2022-03-16 00:23:57 +00:00
parent 112d86d827
commit d136363256
2 changed files with 123 additions and 11 deletions

View file

@ -1,5 +1,13 @@
from peewee import SqliteDatabase from __future__ import annotations
from enum import Enum
from pathlib import Path
from typing import Optional
from jose.constants import ALGORITHMS
from passlib.context import CryptContext from passlib.context import CryptContext
from peewee import Database, SqliteDatabase
from pydantic import BaseModel, Field
DB = SqliteDatabase("tmp/vpn.db") DB = SqliteDatabase("tmp/vpn.db")
PRODUCTION_MODE = False PRODUCTION_MODE = False
@ -11,3 +19,42 @@ ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30 ACCESS_TOKEN_EXPIRE_MINUTES = 30
CRYPT_CONTEXT = CryptContext(schemes=["bcrypt"], deprecated="auto") CRYPT_CONTEXT = CryptContext(schemes=["bcrypt"], deprecated="auto")
CONFIG_FILE = "tmp/config.json"
class DBType(Enum):
sqlite = "sqlite"
mysql = "mysql"
class DBConfig(BaseModel):
db_type: DBType = "sqlite"
@property
def database(self) -> Database:
return SqliteDatabase("tmp/vpn.db")
class JWTConfig(BaseModel):
secret: Optional[str] = None
hash_algorithm: str = ALGORITHMS.HS256
expiry_minutes: int = 30
class CryptoConfig(BaseModel):
schemes: list[str] = ["bcrypt"]
@property
def cryptContext(self) -> CryptContext:
return CryptContext(schemes=self.schemes, deprecated="auto")
class BaseConfig(BaseModel):
db: DBConfig = Field(default_factory=DBConfig)
jwt: JWTConfig = Field(default_factory=JWTConfig)
crypto: CryptoConfig = Field(default_factory=CryptoConfig)
def save(self, filename: Path) -> None:
with open(filename, "w") as kv:
kv.write(self.json(indent=2))

View file

@ -1,12 +1,75 @@
import json
from pathlib import Path
from fastapi import APIRouter, Depends, status from fastapi import APIRouter, Depends, status
from fastapi.responses import JSONResponse from fastapi.responses import JSONResponse
from ..config import CRYPT_CONTEXT, DB from ..config import CONFIG_FILE, CRYPT_CONTEXT, DB, BaseConfig
from ..db import Certificate, DistinguishedName, User, UserCapability from ..db import Certificate, DistinguishedName, User, UserCapability
router = APIRouter(prefix="/install") router = APIRouter(prefix="/install")
async def get_default_config() -> BaseConfig:
return BaseConfig()
async def get_config() -> BaseConfig:
with open(CONFIG_FILE, "r") as kv:
return BaseConfig.parse_obj(json.load(kv))
async def is_configured() -> bool:
return Path(CONFIG_FILE).is_file()
@router.get("/config/get_default", response_model=BaseConfig)
async def config_get_default(config: BaseConfig = Depends(get_default_config)):
return config
@router.get(
"/config/get",
response_model=BaseConfig,
responses={
status.HTTP_404_NOT_FOUND: {
"description": "Not configured",
"content": None,
},
},
)
async def config_get(
config: BaseConfig = Depends(get_config),
is_configured: bool = Depends(is_configured),
):
if not is_configured:
return JSONResponse(status_code=status.HTTP_404_NOT_FOUND)
return config
@router.post(
"/config/set",
responses={
status.HTTP_200_OK: {
"content": None,
},
status.HTTP_403_FORBIDDEN: {
"description": "Must be admin",
"content": None,
},
},
)
async def config_set(
config: BaseConfig,
is_configured: bool = Depends(is_configured),
):
if is_configured:
return JSONResponse(status_code=status.HTTP_403_FORBIDDEN)
config.save(CONFIG_FILE)
async def is_installed(): async def is_installed():
return DB.table_exists(User) return DB.table_exists(User)
@ -20,16 +83,18 @@ async def check_installed(is_installed: bool = Depends(is_installed)):
return is_installed return is_installed
@router.get("/create_db", responses={ @router.get(
"/create_db",
responses={
status.HTTP_200_OK: { status.HTTP_200_OK: {
"description": "Database created",
"content": None, "content": None,
}, },
status.HTTP_400_BAD_REQUEST: { status.HTTP_400_BAD_REQUEST: {
"description": "Could not create Database", "description": "Could not create Database",
"content": None, "content": None,
} },
}) },
)
async def create_db(is_installed: bool = Depends(is_installed)): async def create_db(is_installed: bool = Depends(is_installed)):
if is_installed: if is_installed:
return JSONResponse( return JSONResponse(