kiwi-vpn/api/kiwi_vpn_api/db/user.py

286 lines
6.1 KiB
Python

"""
Python representation of `user` table.
"""
from __future__ import annotations
from typing import Any, Sequence
from pydantic import root_validator
from sqlalchemy.exc import IntegrityError
from sqlmodel import Field, Relationship, SQLModel
from ..config import Config
from .connection import Connection
from .device import Device
from .tag import Tag, TagValue
class UserBase(SQLModel):
"""
Common to all representations of users
"""
name: str = Field(primary_key=True)
email: str | None = Field(default=None)
country: str | None = Field(default=None)
state: str | None = Field(default=None)
city: str | None = Field(default=None)
organization: str | None = Field(default=None)
organizational_unit: str | None = Field(default=None)
class UserCreate(UserBase):
"""
Representation of a newly created user
"""
password: str | None = Field(default=None)
password_clear: str | None = Field(default=None)
@root_validator
@classmethod
def hash_password(cls, values: dict[str, Any]) -> dict[str, Any]:
"""
Ensure the `password` value of this user gets set.
"""
if (values.get("password")) is not None:
# password is set
return values
if (password_clear := values.get("password_clear")) is None:
raise ValueError("No password to hash")
if (current_config := Config._) is None:
raise ValueError("Not configured")
values["password"] = current_config.crypto.crypt_context.hash(
password_clear)
return values
class UserRead(UserBase):
"""
Representation of a user read via the API
"""
pass
class User(UserBase, table=True):
"""
Representation of `user` table
"""
password: str
tags: list[Tag] = Relationship(
back_populates="user",
sa_relationship_kwargs={
"lazy": "joined",
"cascade": "all, delete-orphan",
},
)
devices: list[Device] = Relationship(
back_populates="owner",
)
@classmethod
def create(
cls,
*,
user: UserCreate,
) -> User | None:
"""
Create a new user in the database.
"""
try:
with Connection.session as db:
new_user = cls.from_orm(user)
db.add(new_user)
db.commit()
db.refresh(new_user)
return new_user
except IntegrityError:
# user already existed
return None
@classmethod
def get(cls, name: str) -> User | None:
"""
Load user from database by name.
"""
with Connection.session as db:
return db.get(cls, name)
@classmethod
def authenticate(
cls,
name: str,
password: str,
) -> User | None:
"""
Authenticate with name/password against users in database.
"""
crypt_context = Config._.crypto.crypt_context
if (user := cls.get(name)) is None:
# nonexistent user, fake doing password verification
crypt_context.dummy_verify()
return None
if not crypt_context.verify(password, user.password):
# password hash mismatch
return None
return user
def update(self) -> None:
"""
Update this user in the database.
"""
with Connection.session as db:
db.add(self)
db.commit()
db.refresh(self)
def delete(self) -> None:
"""
Delete this user from the database.
"""
with Connection.session as db:
db.delete(self)
db.commit()
def get_tags(self) -> set[TagValue]:
"""
Return the tags of this user.
"""
return set(
tag._
for tag in self.tags
)
def set_tags(
self,
tags: Sequence[TagValue],
) -> None:
"""
Change the tags of this user.
"""
self.tags = [
Tag(
user_name=self.name,
tag_value=tag.value,
) for tag in tags
]
def _can(
self,
tag: TagValue,
) -> bool:
"""
Check if this user has a tag.
"""
return (
tag in self.get_tags()
# admin can do everything
or TagValue.admin in self.get_tags()
)
def can_edit(
self,
user: User,
) -> bool:
"""
Check if this user can edit another user.
"""
return (
user.name == self.name
# admin can edit everything
or self._can(TagValue.admin)
)
def is_admin(
self,
) -> bool:
"""
Check if this user is an admin.
"""
# is admin with "admin" tag
return self._can(TagValue.admin)
def can_login(
self,
) -> bool:
"""
Check if this user can log in.
"""
return (
# can login with "login" tag
self._can(TagValue.login)
# admins can always login
or self.is_admin()
)
def can_be_edited_by(
self,
user: User,
) -> bool:
"""
Check if this user can be edited by another user.
"""
return (
# user can edit itself
self.name == user.name
# admin can edit every user
or user._can(TagValue.admin)
)
def can_be_deleted_by(
self,
user: User,
) -> bool:
"""
Check if this user can be deleted by another user.
"""
return (
# only admin can delete users
user._can(TagValue.admin)
# even admin cannot delete itself
and self.name != user.name
)
def owns(
self,
device: Device,
) -> bool:
"""
Check if this user owns a device.
"""
return (
device.owner_name == self.name
# admin owns everything
or self._can(TagValue.admin)
)