Compare commits

...

3 commits

Author SHA1 Message Date
e6fe35d14e EASYRSA global object 2022-04-05 22:39:09 +00:00
c0388d58c1 few renames 2022-04-05 22:34:25 +00:00
bca5b2b55c global object SETTINGS 2022-04-05 21:33:48 +00:00
6 changed files with 35 additions and 65 deletions

View file

@ -9,7 +9,6 @@ Pydantic models might have convenience methods attached.
from __future__ import annotations
import functools
import json
from datetime import datetime, timedelta
from enum import Enum
@ -36,25 +35,14 @@ class Settings(BaseSettings):
docs_url: str | None = "/docs"
redoc_url: str | None = "/redoc"
@classmethod
@functools.lru_cache
def load(cls) -> Settings:
return cls()
@classmethod
@property
def _(cls) -> Settings:
"""
Shorthand for load()
"""
return cls.load()
@property
def config_file(self) -> Path:
return self.data_dir.joinpath(self.config_file_name)
SETTINGS = Settings()
class DBType(Enum):
"""
Supported database types
@ -73,7 +61,7 @@ class DBConfig(BaseModel):
user: str | None = None
password: str | None = None
host: str | None = None
database: str | None = str(Settings._.data_dir.joinpath("kiwi-vpn.db"))
database: str | Path | None = SETTINGS.data_dir.joinpath("kiwi-vpn.db")
mysql_driver: str = "pymysql"
mysql_args: list[str] = ["charset=utf8mb4"]
@ -253,7 +241,7 @@ class Config(BaseModel):
crypto: CryptoConfig
server_dn: ServerDN
__singleton: Config | None = None
__instance: Config | None = None
@classmethod
def load(cls) -> Config | None:
@ -261,16 +249,15 @@ class Config(BaseModel):
Load configuration from config file
"""
if cls.__singleton is not None:
return cls.__singleton
if cls.__instance is None:
try:
with open(SETTINGS.config_file, "r") as config_file:
cls.__instance = cls.parse_obj(json.load(config_file))
try:
with open(Settings._.config_file, "r") as config_file:
cls.__singleton = Config.parse_obj(json.load(config_file))
return cls.__singleton
except FileNotFoundError:
pass
except FileNotFoundError:
return None
return cls.__instance
@classmethod
@property
@ -280,7 +267,7 @@ class Config(BaseModel):
"""
if (config := cls.load()) is None:
raise FileNotFoundError(Settings._.config_file)
raise FileNotFoundError(SETTINGS.config_file)
return config
@ -289,5 +276,5 @@ class Config(BaseModel):
Save configuration to config file
"""
with open(Settings._.config_file, "w") as config_file:
with open(SETTINGS.config_file, "w") as config_file:
config_file.write(self.json(indent=2))

View file

@ -4,7 +4,6 @@ Python interface to EasyRSA CA.
from __future__ import annotations
import functools
import subprocess
from enum import Enum, auto
from pathlib import Path
@ -13,7 +12,7 @@ from cryptography import x509
from passlib import pwd
from pydantic import BaseModel
from .config import Config, KeyAlgorithm, Settings
from .config import SETTINGS, Config, KeyAlgorithm
from .db import Connection, Device
@ -140,27 +139,13 @@ class EasyRSA:
None: {},
}
@classmethod
@functools.lru_cache
def _load(cls) -> EasyRSA:
return cls()
@classmethod
@property
def _(cls) -> EasyRSA:
"""
Get the singleton
"""
return cls._load()
@property
def output_directory(self) -> Path:
"""
Where certificates are stored
"""
return Settings._.data_dir.joinpath("pki")
return SETTINGS.data_dir.joinpath("pki")
@property
def ca_password(self) -> str:
@ -300,14 +285,14 @@ class EasyRSA:
)
EASYRSA = EasyRSA()
# some basic test
if __name__ == "__main__":
easy_rsa = EasyRSA()
easy_rsa.init_pki()
ca = easy_rsa.build_ca()
server = easy_rsa.issue(CertificateType.server)
ca = EASYRSA.build_ca()
server = EASYRSA.issue(CertificateType.server)
client = None
# check if configured
@ -316,7 +301,7 @@ if __name__ == "__main__":
Connection.connect(current_config.db.uri)
if (device := Device.get(1)) is not None:
client = easy_rsa.issue(
client = EASYRSA.issue(
dn=DistinguishedName.build(device)
)

View file

@ -12,12 +12,10 @@ If run directly, uses `uvicorn` to run the app.
import uvicorn
from fastapi import FastAPI
from .config import Config, Settings
from .config import SETTINGS, Config
from .db import Connection, User
from .routers import main_router
settings = Settings._
app = FastAPI(
title="kiwi-vpn API",
description="This API enables the `kiwi-vpn` service.",
@ -29,9 +27,9 @@ app = FastAPI(
"name": "MIT License",
"url": "https://opensource.org/licenses/mit-license.php",
},
openapi_url=settings.openapi_url,
docs_url=settings.docs_url if not settings.production_mode else None,
redoc_url=settings.redoc_url if not settings.production_mode else None,
openapi_url=SETTINGS.openapi_url,
docs_url=SETTINGS.docs_url if not SETTINGS.production_mode else None,
redoc_url=SETTINGS.redoc_url if not SETTINGS.production_mode else None,
)
app.include_router(main_router)

View file

@ -6,10 +6,10 @@ This file: Main API router definition.
from fastapi import APIRouter
from ..config import Settings
from ..config import SETTINGS
from . import admin, device, service, user
main_router = APIRouter(prefix=f"/{Settings._.api_v1_prefix}")
main_router = APIRouter(prefix=f"/{SETTINGS.api_v1_prefix}")
main_router.include_router(admin.router)
main_router.include_router(service.router)

View file

@ -5,11 +5,11 @@ Common dependencies for routers.
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from ..config import Config, Settings
from ..config import SETTINGS, Config
from ..db import Device, User
oauth2_scheme = OAuth2PasswordBearer(
tokenUrl=f"{Settings._.api_v1_prefix}/user/authenticate"
tokenUrl=f"{SETTINGS.api_v1_prefix}/user/authenticate"
)

View file

@ -5,7 +5,7 @@
from fastapi import APIRouter, Depends, HTTPException, status
from ..db import Device, DeviceCreate, DeviceRead, User
from ..easyrsa import DistinguishedName, EasyRSA
from ..easyrsa import EASYRSA, DistinguishedName
from ._common import (Responses, get_current_user, get_device_by_id,
get_user_by_name)
@ -90,19 +90,19 @@ async def remove_device(
},
response_model=DeviceRead,
)
async def request_certificate(
async def request_certificate_issuance(
current_user: User = Depends(get_current_user),
device: Device = Depends(get_device_by_id),
) -> Device:
"""
POST ./{device_id}/issue: Request certificate for a device.
POST ./{device_id}/issue: Request certificate issuance for a device.
"""
# check permission
if not current_user.can_edit(device):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
# cannot request for a newly created device
# can only request for a newly created device
if device.approved is not None:
raise HTTPException(status_code=status.HTTP_409_CONFLICT)
@ -111,7 +111,7 @@ async def request_certificate(
if device.approved:
# issue the certificate immediately
if (certificate := EasyRSA._.issue(
if (certificate := EASYRSA.issue(
dn=DistinguishedName.build(device)
)) is not None:
device.expiry = certificate.not_valid_after