from __future__ import annotations from typing import Any, Sequence from pydantic import root_validator from sqlalchemy.exc import IntegrityError from sqlmodel import Field, Relationship, SQLModel from ..config import Config from .capabilities import Capability, UserCapability from .connection import Connection class UserBase(SQLModel): name: str = Field(primary_key=True) email: str country: str | None = Field(default=None) state: str | None = Field(default=None) city: str | None = Field(default=None) organization: str | None = Field(default=None) organizational_unit: str | None = Field(default=None) class User(UserBase, table=True): password: str capabilities: list[UserCapability] = Relationship( back_populates="user", sa_relationship_kwargs={ "lazy": "joined", "cascade": "all, delete-orphan", }, ) @classmethod def create(cls, **kwargs) -> User | None: """ Create a new user in the database. """ try: with Connection.session as db: user = cls.from_orm(UserCreate(**kwargs)) db.add(user) db.commit() db.refresh(user) return user except IntegrityError: # user already existed return None @classmethod def get(cls, name: str) -> User | None: """ Load user from database by name. """ with Connection.session as db: return db.get(cls, name) @classmethod def authenticate( cls, name: str, password: str, ) -> User | None: """ Authenticate with name/password against users in database. """ crypt_context = Config.load_sync().crypto.crypt_context_sync if (user := cls.get(name)) is None: # nonexistent user, fake doing password verification crypt_context.dummy_verify() return None if not crypt_context.verify(password, user.password): # password hash mismatch return None return user def update(self) -> None: """ Update this user in the database. """ with Connection.session as db: db.add(self) db.commit() db.refresh(self) def delete(self) -> bool: """ Delete this user from the database. """ with Connection.session as db: db.delete(self) db.commit() def extend_capabilities(self, capabilities: Sequence[Capability]) -> None: """ Extend this user's capabilities """ for capability in capabilities: user_capability = UserCapability( user_name=self.name, capability_name=capability.value, ) if user_capability not in self.capabilities: self.capabilities.append(user_capability) def remove_capabilities(self, capabilities: Sequence[Capability]) -> None: """ Remove from this user's capabilities """ for capability in capabilities: try: self.capabilities.remove(UserCapability( user_name=self.name, capability_name=capability.value, )) except ValueError: pass class UserCreate(UserBase): password: str | None = Field(default=None) password_clear: str | None = Field(default=None) @root_validator @classmethod def hash_password(cls, values: dict[str, Any]) -> dict[str, Any]: if (values.get("password")) is not None: # password is set return values if (password_clear := values.get("password_clear")) is None: raise ValueError("No password to hash") if (current_config := Config.load_sync()) is None: raise ValueError("Not configured") values["password"] = current_config.crypto.crypt_context_sync.hash( password_clear) return values class UserRead(UserBase): pass