kiwi-vpn/api/kiwi_vpn_api/db/schemata/user.py

164 lines
3.8 KiB
Python

"""
Pydantic representation of database contents.
"""
from __future__ import annotations
from typing import Any
from passlib.context import CryptContext
from pydantic import BaseModel, Field, validator
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import Session
from .. import models
from .device import Device
from .user_capability import UserCapability
class UserBase(BaseModel):
name: str
email: str
capabilities: list[UserCapability] = []
country: str | None = Field(default=None, repr=False)
state: str | None = Field(default=None, repr=False)
city: str | None = Field(default=None, repr=False)
organization: str | None = Field(default=None, repr=False)
organizational_unit: str | None = Field(default=None, repr=False)
class UserCreate(UserBase):
password: str
class User(UserBase):
devices: list[Device] = Field(
default=[], repr=False
)
class Config:
orm_mode = True
@validator("capabilities", pre=True)
@classmethod
def unify_capabilities(cls, value: list[Any]) -> list[UserCapability]:
"""
Import the capabilities from various formats
"""
return [
UserCapability.from_value(capability)
for capability in value
]
@classmethod
def from_db(
cls,
db: Session,
name: str,
) -> User | None:
"""
Load user from database by name.
"""
if (db_user := models.User.from_db(db, name)) is None:
return None
return cls.from_orm(db_user)
@classmethod
def create(
cls,
db: Session,
user: UserCreate,
crypt_context: CryptContext,
) -> User | None:
"""
Create a new user in the database.
"""
try:
db_user = models.User(
name=user.name,
password=crypt_context.hash(user.password),
email=user.email,
capabilities=[
capability.model
for capability in user.capabilities
],
)
db.add(db_user)
db.commit()
db.refresh(db_user)
return cls.from_orm(db_user)
except IntegrityError:
# user already existed
return None
def is_admin(self) -> bool:
return UserCapability.admin in self.capabilities
def authenticate(
self,
db: Session,
password: str,
crypt_context: CryptContext,
) -> User | None:
"""
Authenticate with name/password against users in database.
"""
if (db_user := models.User.from_db(db, self.name)) is None:
# nonexistent user, fake doing password verification
crypt_context.dummy_verify()
return False
if not crypt_context.verify(password, db_user.password):
# password hash mismatch
return False
self.from_orm(db_user)
return True
def update(
self,
db: Session,
) -> None:
"""
Update this user in the database.
"""
if (db_user := models.User.from_db(db, self.name)) is None:
return None
for capability in db_user.capabilities:
db.delete(capability)
db_user.capabilities = [
capability.model
for capability in self.capabilities
]
db.commit()
def delete(
self,
db: Session,
) -> bool:
"""
Delete this user from the database.
"""
if (db_user := models.User.from_db(db, self.name)) is None:
# nonexistent user
return False
db.delete(db_user)
db.commit()
return True