""" Pydantic representation of database contents. """ from __future__ import annotations from datetime import datetime from enum import Enum from typing import Any from passlib.context import CryptContext from pydantic import BaseModel, Field, constr, validator from sqlalchemy.exc import IntegrityError from sqlalchemy.orm import Session from . import models ########## # table: distinguished_names ########## class DistinguishedNameBase(BaseModel): cn_only: bool country: constr(max_length=2) | None state: str | None city: str | None organization: str | None organizational_unit: str | None email: str | None common_name: str class DistinguishedNameCreate(DistinguishedNameBase): pass class DistinguishedName(DistinguishedNameBase): class Config: orm_mode = True @classmethod def create( cls, db: Session, dn: DistinguishedNameCreate, owner: User, ) -> User | None: """ Create a new distinguished name in the database. """ try: db_owner = models.User.load( db=db, name=owner.name, ) dn = models.DistinguishedName( cn_only=dn.cn_only, country=dn.country, state=dn.state, city=dn.city, organization=dn.organization, organizational_unit=dn.organizational_unit, email=dn.email, common_name=dn.common_name, owner=db_owner, ) db.add(dn) db.commit() db.refresh(dn) return cls.from_orm(dn) except IntegrityError: # distinguished name already existed pass def delete( self, db: Session, ) -> bool: """ Delete this distinguished name from the database. """ db_dn = models.DistinguishedName(**dict(self)) db.refresh(db_dn) # .load( # db=db, # country=self.country, # state=self.state, # city=self.city, # organization=self.organization, # organizational_unit=self.organizational_unit, # email=self.email, # common_name=self.common_name, # ) if db_dn is None: # nonexistent user return False db.delete(db_dn) db.commit() return True ########## # table: certificates ########## class CertificateBase(BaseModel): expiry: datetime class CertificateCreate(CertificateBase): pass class Certificate(CertificateBase): distinguished_name: DistinguishedName class Config: orm_mode = True ########## # table: user_capabilities ########## class UserCapability(Enum): admin = "admin" def __repr__(self) -> str: return self.value @classmethod def from_value(cls, value) -> UserCapability: """ Create UserCapability from various formats """ if isinstance(value, cls): # value is already a UserCapability, use that return value elif isinstance(value, models.UserCapability): # create from db format return cls(value.capability) else: # create from string representation return cls(str(value)) ########## # table: users ########## class UserBase(BaseModel): name: str class UserCreate(UserBase): password: str class User(UserBase): capabilities: list[UserCapability] = [] distinguished_names: list[DistinguishedName] = Field( default=[], repr=False ) certificates: list[Certificate] = Field( default=[], repr=False ) class Config: orm_mode = True @validator("capabilities", pre=True) @classmethod def unify_capabilities(cls, value: list[Any]) -> list[UserCapability]: """ Import the capabilities from various formats """ return [ UserCapability.from_value(capability) for capability in value ] @classmethod def from_db( cls, db: Session, name: str, ) -> User | None: """ Load user from database by name. """ if (db_user := models.User.load(db, name)) is None: return None return cls.from_orm(db_user) @classmethod def create( cls, db: Session, user: UserCreate, crypt_context: CryptContext, ) -> User | None: """ Create a new user in the database. """ try: user = models.User( name=user.name, password=crypt_context.hash(user.password), capabilities=[], ) db.add(user) db.commit() db.refresh(user) return cls.from_orm(user) except IntegrityError: # user already existed pass def is_admin(self) -> bool: return UserCapability.admin in self.capabilities def authenticate( self, db: Session, password: str, crypt_context: CryptContext, ) -> User | None: """ Authenticate with name/password against users in database. """ if (db_user := models.User.load(db, self.name)) is None: # nonexistent user, fake doing password verification crypt_context.dummy_verify() return False if not crypt_context.verify(password, db_user.password): # password hash mismatch return False self.from_orm(db_user) return True def update( self, db: Session, ) -> None: """ Update this user in the database. """ old_dbuser = models.User.load(db, self.name) old_user = self.from_orm(old_dbuser) for capability in self.capabilities: if capability not in old_user.capabilities: old_dbuser.capabilities.append( models.UserCapability(capability=capability.value) ) for capability in old_dbuser.capabilities: if UserCapability.from_value(capability) not in self.capabilities: db.delete(capability) db.commit() def delete( self, db: Session, ) -> bool: """ Delete this user from the database. """ if (db_user := models.User.load(db, self.name)) is None: # nonexistent user return False db.delete(db_user) db.commit() return True