kiwi-vpn/api/kiwi_vpn_api/db/schemas.py

144 lines
3 KiB
Python

from __future__ import annotations
from datetime import datetime
from passlib.context import CryptContext
from pydantic import BaseModel, validator
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import Session
from . import models
class CertificateBase(BaseModel):
expiry: datetime
class CertificateCreate(CertificateBase):
owner_name: str
dn_id: int
class Certificate(CertificateBase):
id: int
class Config:
orm_mode = True
class UserBase(BaseModel):
name: str
capabilities: list[str]
@validator("capabilities", pre=True)
@classmethod
def unify_capabilities(
cls,
value: list[models.UserCapability | str]
) -> list[str]:
return [
capability.capability
if isinstance(capability, models.UserCapability)
else str(capability)
for capability in value
]
class UserCreate(UserBase):
password: str
class User(UserBase):
certificates: list[Certificate]
class Config:
orm_mode = True
@classmethod
def from_db(
cls,
db: Session,
name: str,
) -> User | None:
user = (db
.query(models.User)
.filter(models.User.name == name)
.first())
if user is None:
return None
return cls.from_orm(user)
@classmethod
def login(
cls,
db: Session,
name: str,
password: str,
crypt_context: CryptContext,
) -> User | None:
user = (db
.query(models.User)
.filter(models.User.name == name)
.first())
if user is None:
# inexistent user, fake doing password verification
crypt_context.dummy_verify()
return None
if not crypt_context.verify(password, user.password):
# password hash mismatch
return None
return cls.from_orm(user)
@classmethod
def create(
cls,
db: Session,
user: UserCreate,
crypt_context: CryptContext,
) -> User | None:
try:
user = models.User(
name=user.name,
password=crypt_context.hash(user.password),
capabilities=[
models.UserCapability(capability=capability)
for capability in user.capabilities
]
)
db.add(user)
db.commit()
db.refresh(user)
return cls.from_orm(user)
except IntegrityError:
pass
class DistinguishedNameBase(BaseModel):
cn_only: bool
country: str
state: str
city: str
organization: str
organizational_unit: str
email: str
common_name: str
class DistinguishedNameCreate(DistinguishedNameBase):
pass
class DistinguishedName(DistinguishedNameBase):
id: int
certificates: list[Certificate]
class Config:
orm_mode = True