""" Python representation of `user` table. """ from __future__ import annotations from typing import Any, Iterable, Sequence from pydantic import root_validator from sqlalchemy.exc import IntegrityError from sqlmodel import Field, Relationship, SQLModel from ..config import Config from .connection import Connection from .device import Device from .tag import Tag, TagValue class UserBase(SQLModel): """ Common to all representations of users """ name: str = Field(primary_key=True) email: str country: str | None = Field(default=None, max_length=2) state: str | None = Field(default=None) city: str | None = Field(default=None) organization: str | None = Field(default=None) organizational_unit: str | None = Field(default=None) class UserCreate(UserBase): """ Representation of a newly created user """ password: str | None = Field(default=None) password_clear: str | None = Field(default=None) @root_validator @classmethod def hash_password(cls, values: dict[str, Any]) -> dict[str, Any]: """ Ensure the `password` value of this user gets set. """ if (values.get("password")) is not None: # password is set return values if (password_clear := values.get("password_clear")) is None: raise ValueError("No password to hash") if (current_config := Config.load()) is None: raise ValueError("Not configured") values["password"] = current_config.crypto.context.hash( password_clear) return values class UserRead(UserBase): """ Representation of a user read via the API """ pass class User(UserBase, table=True): """ Representation of `user` table """ password: str tags: list[Tag] = Relationship( back_populates="user", sa_relationship_kwargs={ "lazy": "joined", "cascade": "all, delete-orphan", }, ) devices: list[Device] = Relationship( back_populates="owner", ) @classmethod def create( cls, *, user: UserCreate, ) -> User | None: """ Create a new user in the database. """ try: with Connection.session as db: new_user = cls.from_orm(user) db.add(new_user) db.commit() db.refresh(new_user) return new_user except IntegrityError: # user already existed return None @classmethod def get(cls, name: str) -> User | None: """ Load user from database by name. """ with Connection.session as db: return db.get(cls, name) @classmethod def authenticate( cls, name: str, password: str, ) -> User | None: """ Authenticate with name/password against users in database. """ crypt_context = Config._.crypto.context if (user := cls.get(name)) is None: # nonexistent user, fake doing password verification crypt_context.dummy_verify() return None if not crypt_context.verify(password, user.password): # password hash mismatch return None if not (user.has_tag(TagValue.login) or user.is_admin): # no login permission return None return user def update(self) -> None: """ Update this user in the database. """ with Connection.session as db: db.add(self) db.commit() db.refresh(self) def delete(self) -> None: """ Delete this user from the database. """ with Connection.session as db: db.delete(self) db.commit() @property def __tags(self) -> Iterable[TagValue]: """ Return the tags of this user. """ return ( tag._ for tag in self.tags ) def has_tag(self, tag: TagValue) -> bool: """ Check if this user has a tag. """ return tag in self.__tags @property def is_admin(self) -> bool: """ Shorthand for checking if this user has the `admin` tag. """ return TagValue.admin in self.__tags def add_tags( self, tags: Sequence[TagValue], ) -> None: """ Add tags to this user. """ self.tags = [ tag._(self) for tag in (set(self.__tags) | set(tags)) ] def remove_tags( self, tags: Sequence[TagValue], ) -> None: """ remove tags from this user. """ self.tags = [ tag._(self) for tag in (set(self.__tags) - set(tags)) ] def check_edit( self, target: User | Device, ) -> None: """ Check if this user can edit another user or a device. """ # admin can "edit" everything if self.is_admin: return None # user can only "edit" itself if isinstance(target, User) and target == self: return None # user can edit its owned devices if isinstance(target, Device) and target.owner == self: return None # deny by default raise PermissionError() def check_admin( self, target: User | Device, ) -> None: """ Check if this user can administer another user or a device. """ # only admin can "admin" anything if not self.is_admin: raise PermissionError("Must be admin") # admin cannot "admin" itself! if isinstance(target, User) and target == self: raise PermissionError("Can't administer self") # admin can "admin" everything else return None def check_create( self, target: type, owner: User | None = None, ) -> None: """ Check if this user can create another user or a device. """ # can never create anything but users or devices if not issubclass(target, (User, Device)): raise PermissionError(f"Cannot create target type {target}") # admin can "create" everything if self.is_admin: return None # user can only create devices for itself if target is Device and owner == self: return None # deny by default raise PermissionError() @property def can_issue(self) -> bool: """ Check if this user can issue a certificate without approval. """ return ( self.is_admin or self.has_tag(TagValue.issue) ) @property def can_renew(self) -> bool: """ Check if this user can renew a certificate without approval. """ return ( self.is_admin or self.has_tag(TagValue.renew) )