kiwi-vpn/api/kiwi_vpn_api/db_new/user.py

77 lines
1.9 KiB
Python

from __future__ import annotations
from typing import Any
from pydantic import root_validator
from sqlalchemy.exc import IntegrityError
from sqlmodel import Field, SQLModel
from ..config import Config
from .connection import Connection
class UserBase(SQLModel):
name: str = Field(primary_key=True)
email: str
country: str | None = Field(default=None)
state: str | None = Field(default=None)
city: str | None = Field(default=None)
organization: str | None = Field(default=None)
organizational_unit: str | None = Field(default=None)
class User(UserBase, table=True):
password: str
@classmethod
def create(cls, **kwargs) -> User | None:
"""
Create a new user in the database.
"""
try:
with Connection.session as db:
user = User.from_orm(UserCreate(**kwargs))
db.add(user)
db.commit()
db.refresh(user)
return user
except IntegrityError:
# user already existed
return None
@classmethod
def get(cls, name: str) -> User | None:
with Connection.session as db:
return db.get(cls, name)
class UserCreate(UserBase):
password: str | None = Field(default=None)
password_clear: str | None = Field(default=None)
@root_validator
@classmethod
def hash_password(cls, values: dict[str, Any]) -> dict[str, Any]:
if (values.get("password")) is not None:
# password is set
return values
if (password_clear := values.get("password_clear")) is None:
raise ValueError("No password to hash")
if (current_config := Config.load_sync()) is None:
raise ValueError("Not configured")
values["password"] = current_config.crypto.crypt_context_sync.hash(
password_clear)
return values
class UserRead(UserBase):
pass