kiwi-vpn/api/kiwi_vpn_api/db/schemas.py

206 lines
4.1 KiB
Python

"""
Pydantic representation of database contents.
"""
from __future__ import annotations
from datetime import datetime
from enum import Enum
from typing import Any
from passlib.context import CryptContext
from pydantic import BaseModel, validator
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import Session
from . import models
##########
# table: certificates
##########
class CertificateBase(BaseModel):
expiry: datetime
class CertificateCreate(CertificateBase):
owner_name: str
dn_id: int
class Certificate(CertificateBase):
id: int
class Config:
orm_mode = True
##########
# table: user_capabilities
##########
class UserCapability(Enum):
admin = "admin"
@classmethod
def from_value(cls, value) -> UserCapability:
"""
Create UserCapability from various formats
"""
if isinstance(value, cls):
# value is already a UserCapability, use that
return value
elif isinstance(value, models.UserCapability):
# create from db format
return cls(value.capability)
else:
# create from string representation
return cls(str(value))
##########
# table: users
##########
class UserBase(BaseModel):
name: str
class UserCreate(UserBase):
password: str
class User(UserBase):
certificates: list[Certificate] = []
capabilities: list[UserCapability] = []
class Config:
orm_mode = True
@validator("capabilities", pre=True)
@classmethod
def unify_capabilities(cls, value: list[Any]) -> list[UserCapability]:
"""
Import the capabilities from various formats
"""
return [
UserCapability.from_value(capability)
for capability in value
]
@classmethod
def from_db(
cls,
db: Session,
name: str,
) -> User | None:
"""
Load user from database by name.
"""
if (db_user := models.User.load(db, name)) is None:
return None
return cls.from_orm(db_user)
@classmethod
def create(
cls,
db: Session,
user: UserCreate,
crypt_context: CryptContext,
) -> User | None:
"""
Create a new user in the database.
"""
try:
user = models.User(
name=user.name,
password=crypt_context.hash(user.password),
capabilities=[],
)
db.add(user)
db.commit()
db.refresh(user)
return cls.from_orm(user)
except IntegrityError:
# user already existed
pass
def authenticate(
self,
db: Session,
password: str,
crypt_context: CryptContext,
) -> User | None:
"""
Authenticate with name/password against users in database.
"""
if (db_user := models.User.load(db, self.name)) is None:
# nonexistent user, fake doing password verification
crypt_context.dummy_verify()
return False
if not crypt_context.verify(password, db_user.password):
# password hash mismatch
return False
self.from_orm(db_user)
return True
def add_capabilities(
self,
db: Session,
capabilities: list[UserCapability],
) -> None:
"""
Add some capabilities to this user.
"""
for capability in capabilities:
if capability not in self.capabilities:
db.add(models.UserCapability(
user_name=self.name,
capability=capability.value,
))
db.commit()
##########
# table: distinguished_names
##########
class DistinguishedNameBase(BaseModel):
cn_only: bool
country: str
state: str
city: str
organization: str
organizational_unit: str
email: str
common_name: str
class DistinguishedNameCreate(DistinguishedNameBase):
pass
class DistinguishedName(DistinguishedNameBase):
id: int
certificates: list[Certificate]
class Config:
orm_mode = True