2022-03-27 01:17:48 +00:00
|
|
|
from __future__ import annotations
|
|
|
|
|
|
|
|
from typing import Any
|
|
|
|
|
|
|
|
from pydantic import root_validator
|
|
|
|
from sqlalchemy.exc import IntegrityError
|
|
|
|
from sqlmodel import Field, SQLModel
|
|
|
|
|
|
|
|
from ..config import Config
|
|
|
|
from .connection import Connection
|
|
|
|
|
|
|
|
|
|
|
|
class UserBase(SQLModel):
|
|
|
|
name: str = Field(primary_key=True)
|
|
|
|
email: str
|
|
|
|
|
|
|
|
country: str | None = Field(default=None)
|
|
|
|
state: str | None = Field(default=None)
|
|
|
|
city: str | None = Field(default=None)
|
|
|
|
organization: str | None = Field(default=None)
|
|
|
|
organizational_unit: str | None = Field(default=None)
|
|
|
|
|
|
|
|
|
|
|
|
class User(UserBase, table=True):
|
|
|
|
password: str
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def create(cls, **kwargs) -> User | None:
|
|
|
|
"""
|
|
|
|
Create a new user in the database.
|
|
|
|
"""
|
|
|
|
|
|
|
|
try:
|
|
|
|
with Connection.session as db:
|
2022-03-27 01:22:28 +00:00
|
|
|
user = cls.from_orm(UserCreate(**kwargs))
|
2022-03-27 01:17:48 +00:00
|
|
|
|
|
|
|
db.add(user)
|
|
|
|
db.commit()
|
|
|
|
db.refresh(user)
|
|
|
|
|
|
|
|
return user
|
|
|
|
|
|
|
|
except IntegrityError:
|
|
|
|
# user already existed
|
|
|
|
return None
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def get(cls, name: str) -> User | None:
|
|
|
|
with Connection.session as db:
|
|
|
|
return db.get(cls, name)
|
|
|
|
|
|
|
|
|
|
|
|
class UserCreate(UserBase):
|
|
|
|
password: str | None = Field(default=None)
|
|
|
|
password_clear: str | None = Field(default=None)
|
|
|
|
|
|
|
|
@root_validator
|
|
|
|
@classmethod
|
|
|
|
def hash_password(cls, values: dict[str, Any]) -> dict[str, Any]:
|
|
|
|
if (values.get("password")) is not None:
|
|
|
|
# password is set
|
|
|
|
return values
|
|
|
|
|
|
|
|
if (password_clear := values.get("password_clear")) is None:
|
|
|
|
raise ValueError("No password to hash")
|
|
|
|
|
|
|
|
if (current_config := Config.load_sync()) is None:
|
|
|
|
raise ValueError("Not configured")
|
|
|
|
|
|
|
|
values["password"] = current_config.crypto.crypt_context_sync.hash(
|
|
|
|
password_clear)
|
|
|
|
|
|
|
|
return values
|
|
|
|
|
|
|
|
|
|
|
|
class UserRead(UserBase):
|
|
|
|
pass
|