""" Pydantic representation of database contents. """ from __future__ import annotations from typing import Any from passlib.context import CryptContext from pydantic import BaseModel, Field, validator from sqlalchemy.exc import IntegrityError, InvalidRequestError from sqlalchemy.orm import Session from .. import models from .device import Device from .user_capability import UserCapability class UserBase(BaseModel): name: str country: str state: str city: str organization: str organizational_unit: str email: str class UserCreate(UserBase): password: str class User(UserBase): capabilities: list[UserCapability] = [] devices: list[Device] = Field( default=[], repr=False ) class Config: orm_mode = True @validator("capabilities", pre=True) @classmethod def unify_capabilities(cls, value: list[Any]) -> list[UserCapability]: """ Import the capabilities from various formats """ return [ UserCapability.from_value(capability) for capability in value ] @classmethod def from_db( cls, db: Session, name: str, ) -> User | None: """ Load user from database by name. """ try: db_user = models.User(name=name) db.refresh(db_user) return cls.from_orm(db_user) except InvalidRequestError: return None @classmethod def create( cls, db: Session, user: UserCreate, crypt_context: CryptContext, ) -> User | None: """ Create a new user in the database. """ try: db_user = models.User( name=user.name, password=crypt_context.hash(user.password), capabilities=[ capability.model for capability in user.capabilities ], ) db.add(db_user) db.commit() db.refresh(db_user) return cls.from_orm(db_user) except IntegrityError: # user already existed return None def is_admin(self) -> bool: return UserCapability.admin in self.capabilities def authenticate( self, db: Session, password: str, crypt_context: CryptContext, ) -> User | None: """ Authenticate with name/password against users in database. """ db_user = models.User(name=self.name) db.refresh(db_user) if db_user is None: # nonexistent user, fake doing password verification crypt_context.dummy_verify() return False if not crypt_context.verify(password, db_user.password): # password hash mismatch return False self.from_orm(db_user) return True def update( self, db: Session, ) -> None: """ Update this user in the database. """ db_user = models.User(name=self.name) db.refresh(db_user) for capability in db_user.capabilities: db.delete(capability) db_user.capabilities = [ capability.model for capability in self.capabilities ] db.commit() def delete( self, db: Session, ) -> bool: """ Delete this user from the database. """ db_user = models.User(name=self.name) db.refresh(db_user) if db_user is None: # nonexistent user return False db.delete(db_user) db.commit() return True