286 lines
6.4 KiB
Python
286 lines
6.4 KiB
Python
"""
|
|
Python representation of `user` table.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from typing import Any, Sequence
|
|
|
|
from pydantic import root_validator
|
|
from sqlalchemy.exc import IntegrityError
|
|
from sqlmodel import Field, Relationship, SQLModel
|
|
|
|
from ..config import Config
|
|
from .connection import Connection
|
|
from .device import Device
|
|
from .user_capability import UserCapability, UserCapabilityType
|
|
|
|
|
|
class UserBase(SQLModel):
|
|
"""
|
|
Common to all representations of users
|
|
"""
|
|
|
|
name: str = Field(primary_key=True)
|
|
email: str | None = Field(default=None)
|
|
|
|
country: str | None = Field(default=None)
|
|
state: str | None = Field(default=None)
|
|
city: str | None = Field(default=None)
|
|
organization: str | None = Field(default=None)
|
|
organizational_unit: str | None = Field(default=None)
|
|
|
|
|
|
class UserCreate(UserBase):
|
|
"""
|
|
Representation of a newly created user
|
|
"""
|
|
|
|
password: str | None = Field(default=None)
|
|
password_clear: str | None = Field(default=None)
|
|
|
|
@root_validator
|
|
@classmethod
|
|
def hash_password(cls, values: dict[str, Any]) -> dict[str, Any]:
|
|
"""
|
|
Ensure the `password` value of this user gets set.
|
|
"""
|
|
|
|
if (values.get("password")) is not None:
|
|
# password is set
|
|
return values
|
|
|
|
if (password_clear := values.get("password_clear")) is None:
|
|
raise ValueError("No password to hash")
|
|
|
|
if (current_config := Config._) is None:
|
|
raise ValueError("Not configured")
|
|
|
|
values["password"] = current_config.crypto.crypt_context.hash(
|
|
password_clear)
|
|
|
|
return values
|
|
|
|
|
|
class UserRead(UserBase):
|
|
"""
|
|
Representation of a user read via the API
|
|
"""
|
|
|
|
pass
|
|
|
|
|
|
class User(UserBase, table=True):
|
|
"""
|
|
Representation of `user` table
|
|
"""
|
|
|
|
password: str
|
|
|
|
capabilities: list[UserCapability] = Relationship(
|
|
back_populates="user",
|
|
sa_relationship_kwargs={
|
|
"lazy": "joined",
|
|
"cascade": "all, delete-orphan",
|
|
},
|
|
)
|
|
|
|
devices: list[Device] = Relationship(
|
|
back_populates="owner",
|
|
)
|
|
|
|
@classmethod
|
|
def create(
|
|
cls,
|
|
*,
|
|
user: UserCreate,
|
|
) -> User | None:
|
|
"""
|
|
Create a new user in the database.
|
|
"""
|
|
|
|
try:
|
|
with Connection.session as db:
|
|
new_user = cls.from_orm(user)
|
|
|
|
db.add(new_user)
|
|
db.commit()
|
|
db.refresh(new_user)
|
|
|
|
return new_user
|
|
|
|
except IntegrityError:
|
|
# user already existed
|
|
return None
|
|
|
|
@classmethod
|
|
def get(cls, name: str) -> User | None:
|
|
"""
|
|
Load user from database by name.
|
|
"""
|
|
|
|
with Connection.session as db:
|
|
return db.get(cls, name)
|
|
|
|
@classmethod
|
|
def authenticate(
|
|
cls,
|
|
name: str,
|
|
password: str,
|
|
) -> User | None:
|
|
"""
|
|
Authenticate with name/password against users in database.
|
|
"""
|
|
|
|
crypt_context = Config._.crypto.crypt_context
|
|
|
|
if (user := cls.get(name)) is None:
|
|
# nonexistent user, fake doing password verification
|
|
crypt_context.dummy_verify()
|
|
return None
|
|
|
|
if not crypt_context.verify(password, user.password):
|
|
# password hash mismatch
|
|
return None
|
|
|
|
return user
|
|
|
|
def update(self) -> None:
|
|
"""
|
|
Update this user in the database.
|
|
"""
|
|
|
|
with Connection.session as db:
|
|
db.add(self)
|
|
db.commit()
|
|
db.refresh(self)
|
|
|
|
def delete(self) -> None:
|
|
"""
|
|
Delete this user from the database.
|
|
"""
|
|
|
|
with Connection.session as db:
|
|
db.delete(self)
|
|
db.commit()
|
|
|
|
def get_capabilities(self) -> set[UserCapabilityType]:
|
|
"""
|
|
Return the capabilities of this user.
|
|
"""
|
|
|
|
return set(
|
|
capability._
|
|
for capability in self.capabilities
|
|
)
|
|
|
|
def set_capabilities(
|
|
self,
|
|
capabilities: Sequence[UserCapabilityType],
|
|
) -> None:
|
|
"""
|
|
Change the capabilities of this user.
|
|
"""
|
|
|
|
self.capabilities = [
|
|
UserCapability(
|
|
user_name=self.name,
|
|
capability_name=capability.value,
|
|
) for capability in capabilities
|
|
]
|
|
|
|
def _can(
|
|
self,
|
|
capability: UserCapabilityType,
|
|
) -> bool:
|
|
"""
|
|
Check if this user has a capability.
|
|
"""
|
|
|
|
return (
|
|
capability in self.get_capabilities()
|
|
# admin can do everything
|
|
or UserCapabilityType.admin in self.get_capabilities()
|
|
)
|
|
|
|
def can_edit(
|
|
self,
|
|
user: User,
|
|
) -> bool:
|
|
"""
|
|
Check if this user can edit another user.
|
|
"""
|
|
|
|
return (
|
|
user.name == self.name
|
|
# admin can edit everything
|
|
or self._can(UserCapabilityType.admin)
|
|
)
|
|
|
|
def is_admin(
|
|
self,
|
|
) -> bool:
|
|
"""
|
|
Check if this user is an admin.
|
|
"""
|
|
|
|
# is admin with "admin" capability
|
|
return self._can(UserCapabilityType.admin)
|
|
|
|
def can_login(
|
|
self,
|
|
) -> bool:
|
|
"""
|
|
Check if this user can log in.
|
|
"""
|
|
|
|
return (
|
|
# can login with "login" capability
|
|
self._can(UserCapabilityType.login)
|
|
# admins can always login
|
|
or self.is_admin()
|
|
)
|
|
|
|
def can_be_edited_by(
|
|
self,
|
|
user: User,
|
|
) -> bool:
|
|
"""
|
|
Check if this user can be edited by another user.
|
|
"""
|
|
|
|
return (
|
|
# user can edit itself
|
|
self.name == user.name
|
|
# admin can edit every user
|
|
or user._can(UserCapabilityType.admin)
|
|
)
|
|
|
|
def can_be_deleted_by(
|
|
self,
|
|
user: User,
|
|
) -> bool:
|
|
"""
|
|
Check if this user can be deleted by another user.
|
|
"""
|
|
|
|
return (
|
|
# only admin can delete users
|
|
user._can(UserCapabilityType.admin)
|
|
# even admin cannot delete itself
|
|
and self.name != user.name
|
|
)
|
|
|
|
def owns(
|
|
self,
|
|
device: Device,
|
|
) -> bool:
|
|
"""
|
|
Check if this user owns a device.
|
|
"""
|
|
|
|
return (
|
|
device.owner_name == self.name
|
|
# admin owns everything
|
|
or self._can(UserCapabilityType.admin)
|
|
)
|