""" Pydantic representation of database contents. """ from __future__ import annotations from typing import Any from passlib.context import CryptContext from pydantic import BaseModel, Field, validator from sqlalchemy.exc import IntegrityError from sqlalchemy.orm import Session from .. import models from .device import Device from .user_capability import UserCapability class UserBase(BaseModel): name: str email: str capabilities: list[UserCapability] = [] country: str | None = Field(default=None, repr=False) state: str | None = Field(default=None, repr=False) city: str | None = Field(default=None, repr=False) organization: str | None = Field(default=None, repr=False) organizational_unit: str | None = Field(default=None, repr=False) class UserCreate(UserBase): password: str class User(UserBase): devices: list[Device] = Field( default=[], repr=False ) class Config: orm_mode = True @validator("capabilities", pre=True) @classmethod def unify_capabilities(cls, value: list[Any]) -> list[UserCapability]: """ Import the capabilities from various formats """ return [ UserCapability.from_value(capability) for capability in value ] @classmethod def from_db( cls, db: Session, name: str, ) -> User | None: """ Load user from database by name. """ if (db_user := models.User.from_db(db, name)) is None: return None return cls.from_orm(db_user) @classmethod def create( cls, db: Session, user: UserCreate, crypt_context: CryptContext, ) -> User | None: """ Create a new user in the database. """ try: db_user = models.User( name=user.name, password=crypt_context.hash(user.password), email=user.email, capabilities=[ capability.model for capability in user.capabilities ], ) db.add(db_user) db.commit() db.refresh(db_user) return cls.from_orm(db_user) except IntegrityError: # user already existed return None def is_admin(self) -> bool: return UserCapability.admin in self.capabilities def authenticate( self, db: Session, password: str, crypt_context: CryptContext, ) -> User | None: """ Authenticate with name/password against users in database. """ if (db_user := models.User.from_db(db, self.name)) is None: # nonexistent user, fake doing password verification crypt_context.dummy_verify() return False if not crypt_context.verify(password, db_user.password): # password hash mismatch return False self.from_orm(db_user) return True def update( self, db: Session, ) -> None: """ Update this user in the database. """ if (db_user := models.User.from_db(db, self.name)) is None: return None for capability in db_user.capabilities: db.delete(capability) db_user.capabilities = [ capability.model for capability in self.capabilities ] db.commit() def delete( self, db: Session, ) -> bool: """ Delete this user from the database. """ if (db_user := models.User.from_db(db, self.name)) is None: # nonexistent user return False db.delete(db_user) db.commit() return True