kiwi-vpn/api/kiwi_vpn_api/db/user.py

276 lines
6.2 KiB
Python
Raw Normal View History

2022-03-28 20:58:40 +00:00
"""
2022-03-28 21:54:39 +00:00
Python representation of `user` table.
2022-03-28 20:58:40 +00:00
"""
2022-03-27 01:17:48 +00:00
from __future__ import annotations
2022-03-30 01:51:43 +00:00
from typing import Any, Iterable, Sequence
2022-03-27 01:17:48 +00:00
from pydantic import root_validator
from sqlalchemy.exc import IntegrityError
2022-03-27 13:47:38 +00:00
from sqlmodel import Field, Relationship, SQLModel
2022-03-27 01:17:48 +00:00
from ..config import Config
from .connection import Connection
2022-03-28 00:43:28 +00:00
from .device import Device
2022-03-29 19:57:33 +00:00
from .tag import Tag, TagValue
2022-03-27 01:17:48 +00:00
class UserBase(SQLModel):
2022-03-28 20:58:40 +00:00
"""
Common to all representations of users
"""
2022-03-27 01:17:48 +00:00
name: str = Field(primary_key=True)
2022-03-28 01:00:07 +00:00
email: str | None = Field(default=None)
2022-03-27 01:17:48 +00:00
country: str | None = Field(default=None)
state: str | None = Field(default=None)
city: str | None = Field(default=None)
organization: str | None = Field(default=None)
organizational_unit: str | None = Field(default=None)
2022-03-28 20:58:40 +00:00
class UserCreate(UserBase):
"""
Representation of a newly created user
"""
password: str | None = Field(default=None)
password_clear: str | None = Field(default=None)
@root_validator
@classmethod
def hash_password(cls, values: dict[str, Any]) -> dict[str, Any]:
"""
Ensure the `password` value of this user gets set.
"""
if (values.get("password")) is not None:
# password is set
return values
if (password_clear := values.get("password_clear")) is None:
raise ValueError("No password to hash")
if (current_config := Config._) is None:
raise ValueError("Not configured")
values["password"] = current_config.crypto.crypt_context.hash(
password_clear)
return values
class UserRead(UserBase):
"""
Representation of a user read via the API
"""
pass
2022-03-27 01:17:48 +00:00
class User(UserBase, table=True):
2022-03-28 20:58:40 +00:00
"""
2022-03-28 21:54:39 +00:00
Representation of `user` table
2022-03-28 20:58:40 +00:00
"""
2022-03-27 01:17:48 +00:00
password: str
2022-03-29 19:57:33 +00:00
tags: list[Tag] = Relationship(
2022-03-27 13:47:38 +00:00
back_populates="user",
sa_relationship_kwargs={
"lazy": "joined",
"cascade": "all, delete-orphan",
},
)
2022-03-28 00:43:28 +00:00
devices: list[Device] = Relationship(
back_populates="owner",
)
2022-03-27 01:17:48 +00:00
@classmethod
2022-03-29 00:01:28 +00:00
def create(
cls,
*,
user: UserCreate,
) -> User | None:
2022-03-27 01:17:48 +00:00
"""
Create a new user in the database.
"""
try:
with Connection.session as db:
2022-03-29 00:01:28 +00:00
new_user = cls.from_orm(user)
2022-03-27 01:17:48 +00:00
2022-03-29 00:01:28 +00:00
db.add(new_user)
2022-03-27 01:17:48 +00:00
db.commit()
2022-03-29 00:01:28 +00:00
db.refresh(new_user)
2022-03-27 01:17:48 +00:00
2022-03-29 00:01:28 +00:00
return new_user
2022-03-27 01:17:48 +00:00
except IntegrityError:
# user already existed
return None
@classmethod
def get(cls, name: str) -> User | None:
2022-03-27 13:47:18 +00:00
"""
Load user from database by name.
"""
2022-03-27 01:17:48 +00:00
with Connection.session as db:
return db.get(cls, name)
2022-03-27 13:47:18 +00:00
@classmethod
def authenticate(
cls,
name: str,
password: str,
) -> User | None:
"""
Authenticate with name/password against users in database.
"""
2022-03-28 02:23:00 +00:00
crypt_context = Config._.crypto.crypt_context
2022-03-27 13:47:18 +00:00
if (user := cls.get(name)) is None:
# nonexistent user, fake doing password verification
crypt_context.dummy_verify()
return None
if not crypt_context.verify(password, user.password):
# password hash mismatch
return None
2022-03-27 01:17:48 +00:00
2022-03-30 02:16:06 +00:00
if not (user.has_tag(TagValue.login)
or user.has_tag(TagValue.admin)):
2022-03-29 23:36:23 +00:00
# no login permission
return None
2022-03-27 13:47:18 +00:00
return user
def update(self) -> None:
"""
Update this user in the database.
"""
with Connection.session as db:
db.add(self)
db.commit()
db.refresh(self)
2022-03-28 20:18:00 +00:00
def delete(self) -> None:
2022-03-27 13:47:18 +00:00
"""
Delete this user from the database.
"""
with Connection.session as db:
db.delete(self)
db.commit()
2022-03-27 13:47:38 +00:00
2022-03-30 01:51:43 +00:00
def _get_tags(self) -> Iterable[TagValue]:
2022-03-28 20:58:40 +00:00
"""
2022-03-29 19:57:33 +00:00
Return the tags of this user.
2022-03-28 20:58:40 +00:00
"""
2022-03-30 01:51:43 +00:00
return (
2022-03-29 19:57:33 +00:00
tag._
for tag in self.tags
2022-03-28 00:48:44 +00:00
)
2022-03-27 13:47:38 +00:00
2022-03-30 01:51:43 +00:00
def has_tag(self, tag: TagValue) -> bool:
"""
Check if this user has a tag.
"""
2022-03-30 02:11:04 +00:00
return tag in self._get_tags()
2022-03-30 01:51:43 +00:00
def add_tags(
self,
tags: Sequence[TagValue],
) -> None:
"""
Add tags to this user.
"""
self.tags = [
tag._(self)
for tag in (set(self._get_tags()) | set(tags))
]
def remove_tags(
self,
2022-03-29 19:57:33 +00:00
tags: Sequence[TagValue],
) -> None:
2022-03-28 20:58:40 +00:00
"""
2022-03-30 01:51:43 +00:00
remove tags from this user.
2022-03-28 20:58:40 +00:00
"""
2022-03-29 19:57:33 +00:00
self.tags = [
2022-03-30 01:51:43 +00:00
tag._(self)
for tag in (set(self._get_tags()) - set(tags))
2022-03-28 00:48:44 +00:00
]
2022-03-29 15:56:12 +00:00
2022-03-30 01:51:43 +00:00
def can_edit(
2022-03-29 16:35:41 +00:00
self,
2022-03-29 23:36:23 +00:00
target: User | Device,
2022-03-29 16:35:41 +00:00
) -> bool:
"""
2022-03-29 23:36:23 +00:00
Check if this user can edit another user or a device.
2022-03-29 16:35:41 +00:00
"""
2022-03-29 23:36:23 +00:00
# admin can "edit" everything
2022-03-30 01:51:43 +00:00
if self.has_tag(TagValue.admin):
2022-03-29 23:36:23 +00:00
return True
2022-03-29 16:35:41 +00:00
2022-03-29 23:36:23 +00:00
# user can "edit" itself
if isinstance(target, User) and target != self:
return False
2022-03-29 16:12:55 +00:00
2022-03-29 23:36:23 +00:00
# user can edit its owned devices
return target.owner == self
2022-03-29 16:35:41 +00:00
2022-03-30 01:51:43 +00:00
def can_admin(
2022-03-29 16:35:41 +00:00
self,
2022-03-29 23:36:23 +00:00
target: User | Device,
2022-03-29 16:35:41 +00:00
) -> bool:
"""
2022-03-29 23:36:23 +00:00
Check if this user can administer another user or a device.
2022-03-29 16:35:41 +00:00
"""
2022-03-29 23:36:23 +00:00
# only admin can "admin" anything
2022-03-30 01:51:43 +00:00
if not self.has_tag(TagValue.admin):
2022-03-29 23:36:23 +00:00
return False
2022-03-29 16:35:41 +00:00
2022-03-29 23:36:23 +00:00
# admin canot "admin itself"!
if isinstance(target, User) and target == self:
return False
2022-03-29 16:35:41 +00:00
2022-03-29 23:36:23 +00:00
# admin can "admin" everything else
return True
2022-03-29 16:35:41 +00:00
2022-03-30 01:51:43 +00:00
def can_create(
2022-03-29 16:35:41 +00:00
self,
2022-03-29 23:36:23 +00:00
target: type,
owner: User | None = None,
2022-03-29 16:35:41 +00:00
) -> bool:
"""
2022-03-29 23:36:23 +00:00
Check if this user can create another user or a device.
2022-03-29 16:35:41 +00:00
"""
2022-03-29 23:36:23 +00:00
# can never create anything but users or devices
if not issubclass(target, (User, Device)):
return False
2022-03-29 16:35:41 +00:00
2022-03-29 23:36:23 +00:00
# admin can "create" everything
2022-03-30 01:51:43 +00:00
if self.has_tag(TagValue.admin):
2022-03-29 23:36:23 +00:00
return True
2022-03-29 16:12:55 +00:00
2022-03-29 23:36:23 +00:00
# user can only create devices for itself
if target is Device and owner == self:
return True
2022-03-29 15:56:12 +00:00
2022-03-29 23:36:23 +00:00
# deny be default
return False