kiwi-vpn/api/kiwi_vpn_api/config.py

271 lines
5.9 KiB
Python
Raw Normal View History

2022-03-20 03:45:40 +00:00
"""
Configuration definition.
Converts per-run (environment) variables and config files into the
"python world" using `pydantic`.
Pydantic models might have convenience methods attached.
"""
2022-03-18 22:43:02 +00:00
from __future__ import annotations
import functools
2022-03-18 18:22:17 +00:00
import json
2022-03-19 02:22:49 +00:00
from datetime import datetime, timedelta
2022-03-18 18:24:09 +00:00
from enum import Enum
2022-03-20 02:25:42 +00:00
from pathlib import Path
from secrets import token_urlsafe
2022-03-30 10:15:24 +00:00
from typing import Any
2022-03-16 00:23:57 +00:00
2022-03-19 02:22:49 +00:00
from jose import JWTError, jwt
2022-03-16 00:23:57 +00:00
from jose.constants import ALGORITHMS
from passlib.context import CryptContext
2022-03-30 10:15:24 +00:00
from pydantic import BaseModel, BaseSettings, constr, validator
2022-03-16 00:23:57 +00:00
2022-03-18 22:43:02 +00:00
class Settings(BaseSettings):
2022-03-20 03:45:40 +00:00
"""
Per-run settings
"""
2022-03-18 22:43:02 +00:00
production_mode: bool = False
2022-03-20 02:25:42 +00:00
data_dir: Path = Path("./tmp")
2022-03-28 20:59:27 +00:00
config_file_name: Path = Path("config.json")
2022-03-28 20:04:19 +00:00
api_v1_prefix: str = "api/v1"
2022-03-18 22:43:02 +00:00
openapi_url: str = "/openapi.json"
docs_url: str | None = "/docs"
2022-03-19 02:23:29 +00:00
redoc_url: str | None = "/redoc"
2022-03-18 22:43:02 +00:00
2022-03-28 20:04:19 +00:00
@classmethod
@property
2022-03-18 22:43:02 +00:00
@functools.lru_cache
2022-03-28 20:04:19 +00:00
def _(cls) -> Settings:
return cls()
2022-03-17 22:47:31 +00:00
2022-03-20 03:45:40 +00:00
@property
def config_file(self) -> Path:
2022-03-28 20:59:27 +00:00
return self.data_dir.joinpath(self.config_file_name)
2022-03-20 03:45:40 +00:00
2022-03-16 00:23:57 +00:00
class DBType(Enum):
2022-03-20 03:45:40 +00:00
"""
Supported database types
"""
2022-03-16 00:23:57 +00:00
sqlite = "sqlite"
mysql = "mysql"
class DBConfig(BaseModel):
2022-03-20 03:45:40 +00:00
"""
Database connection configuration
"""
2022-03-19 03:31:15 +00:00
type: DBType = DBType.sqlite
user: str | None = None
password: str | None = None
host: str | None = None
2022-03-30 10:15:24 +00:00
database: str | None = Settings._.data_dir.joinpath("kiwi-vpn.db")
2022-03-19 03:31:15 +00:00
mysql_driver: str = "pymysql"
mysql_args: list[str] = ["charset=utf8mb4"]
2022-03-16 00:23:57 +00:00
2022-03-18 18:22:17 +00:00
@property
2022-03-28 02:00:58 +00:00
def uri(self) -> str:
2022-03-20 03:45:40 +00:00
"""
2022-03-28 02:00:58 +00:00
Construct a database connection string
2022-03-20 03:45:40 +00:00
"""
2022-03-19 03:31:15 +00:00
if self.type is DBType.sqlite:
# SQLite backend
2022-03-28 02:00:58 +00:00
return f"sqlite:///{self.database}"
2022-03-19 03:31:15 +00:00
elif self.type is DBType.mysql:
# MySQL backend
if self.mysql_args:
args_str = "?" + "&".join(self.mysql_args)
else:
args_str = ""
2022-03-28 02:00:58 +00:00
return (f"mysql+{self.mysql_driver}://"
f"{self.user}:{self.password}@{self.host}"
f"/{self.database}{args_str}")
2022-03-18 18:22:17 +00:00
2022-03-16 00:23:57 +00:00
class JWTConfig(BaseModel):
2022-03-20 03:45:40 +00:00
"""
Configuration for JSON Web Tokens
"""
2022-03-17 23:00:49 +00:00
secret: str | None = None
2022-03-16 00:23:57 +00:00
hash_algorithm: str = ALGORITHMS.HS256
expiry_minutes: int = 30
2022-03-19 16:57:25 +00:00
@validator("secret")
@classmethod
def ensure_secret(cls, value: str | None) -> str:
2022-03-20 03:45:40 +00:00
"""
Generate a per-run secret if `None` was loaded from the config file
"""
2022-03-19 16:57:25 +00:00
if value is None:
2022-03-20 02:25:53 +00:00
return token_urlsafe(128)
2022-03-19 16:57:25 +00:00
return value
2022-03-19 02:38:32 +00:00
async def create_token(
2022-03-19 02:22:49 +00:00
self,
username: str,
expiry_minutes: int | None = None,
) -> str:
2022-03-20 03:45:40 +00:00
"""
Build and sign a JSON Web Token
"""
2022-03-19 02:22:49 +00:00
if expiry_minutes is None:
expiry_minutes = self.expiry_minutes
return jwt.encode(
{
"sub": username,
"exp": datetime.utcnow() + timedelta(minutes=expiry_minutes),
},
self.secret,
algorithm=self.hash_algorithm,
)
2022-03-19 02:38:32 +00:00
async def decode_token(
2022-03-19 02:22:49 +00:00
self,
token: str,
) -> str | None:
2022-03-20 03:45:40 +00:00
"""
Verify a JSON Web Token, then extract the username
"""
2022-03-19 02:22:49 +00:00
# decode JWT token
try:
payload = jwt.decode(
token,
self.secret,
algorithms=[self.hash_algorithm],
)
except JWTError:
return None
# check expiry
expiry = payload.get("exp")
if expiry is None:
return None
if datetime.fromtimestamp(expiry) < datetime.utcnow():
return None
# get username
username = payload.get("sub")
if username is None:
return None
return username
2022-03-16 00:23:57 +00:00
2022-03-30 10:36:14 +00:00
class LockableString(BaseModel):
2022-03-30 10:15:24 +00:00
value: str
2022-03-30 10:36:14 +00:00
locked: bool
2022-03-30 10:15:24 +00:00
2022-03-30 10:36:14 +00:00
class LockableCountry(LockableString):
2022-03-30 10:15:24 +00:00
value: constr(max_length=2)
class DNParts(BaseModel):
"""
This server's "distinguished name"
"""
2022-03-30 10:36:14 +00:00
country: LockableCountry
state: LockableString
city: LockableString
organization: LockableString
organizational_unit: LockableString
2022-03-30 10:15:24 +00:00
class CertificateAlgo(Enum):
rsa2048 = "rsa2048"
rsa4096 = "rsa4096"
secp256r1 = "secp256r1"
secp384r1 = "secp384r1"
ed25519 = "ed25519"
2022-03-16 00:23:57 +00:00
class CryptoConfig(BaseModel):
2022-03-20 03:45:40 +00:00
"""
2022-03-30 10:36:14 +00:00
Configuration for cryptography
2022-03-20 03:45:40 +00:00
"""
2022-03-30 10:36:14 +00:00
# password hash algorithms
2022-03-16 00:23:57 +00:00
schemes: list[str] = ["bcrypt"]
2022-03-30 10:36:14 +00:00
# pki settings
2022-03-30 10:15:24 +00:00
cert_algo: CertificateAlgo
expiry_days: int
2022-03-27 01:17:48 +00:00
@property
2022-03-30 10:15:24 +00:00
def context(self) -> CryptContext:
2022-03-18 17:36:44 +00:00
return CryptContext(
2022-03-18 18:22:17 +00:00
schemes=self.schemes,
2022-03-18 17:36:44 +00:00
deprecated="auto",
)
2022-03-18 22:43:02 +00:00
class Config(BaseModel):
2022-03-20 03:45:40 +00:00
"""
Configuration for `kiwi-vpn-api`
"""
2022-03-30 10:36:14 +00:00
# common name for the server
2022-03-30 10:15:24 +00:00
server_name: str
2022-03-30 10:36:14 +00:00
# may include client-to-client, cipher etc.
openvpn_extra_options: dict[str, Any]
2022-03-30 10:15:24 +00:00
db: DBConfig
jwt: JWTConfig
crypto: CryptoConfig
2022-03-30 10:36:14 +00:00
default_dn: DNParts
2022-03-18 17:36:44 +00:00
2022-03-28 02:15:42 +00:00
__singleton: Config | None = None
@classmethod
2022-03-28 02:23:00 +00:00
def load(cls) -> Config | None:
2022-03-27 01:17:48 +00:00
"""
Load configuration from config file
"""
2022-03-28 02:15:42 +00:00
if cls.__singleton is not None:
return cls.__singleton
2022-03-27 01:17:48 +00:00
try:
2022-03-28 20:04:19 +00:00
with open(Settings._.config_file, "r") as config_file:
2022-03-28 02:15:42 +00:00
cls.__singleton = Config.parse_obj(json.load(config_file))
return cls.__singleton
2022-03-27 01:17:48 +00:00
except FileNotFoundError:
return None
2022-03-28 02:23:00 +00:00
@classmethod
@property
def _(cls) -> Config | None:
2022-03-20 03:45:40 +00:00
"""
2022-03-28 02:23:00 +00:00
Shorthand for load()
2022-03-20 03:45:40 +00:00
"""
2022-03-28 02:23:00 +00:00
return cls.load()
2022-03-18 22:43:02 +00:00
2022-03-28 02:15:42 +00:00
def save(self) -> None:
2022-03-20 03:45:40 +00:00
"""
Save configuration to config file
"""
2022-03-28 20:04:19 +00:00
with open(Settings._.config_file, "w") as config_file:
2022-03-19 02:28:18 +00:00
config_file.write(self.json(indent=2))