kiwi-vpn/api/kiwi_vpn_api/db/schemas.py

163 lines
3.4 KiB
Python
Raw Normal View History

2022-03-18 23:45:09 +00:00
from __future__ import annotations
2022-03-17 17:06:00 +00:00
from datetime import datetime
2022-03-19 18:31:03 +00:00
from enum import Enum
2022-03-18 23:45:09 +00:00
from passlib.context import CryptContext
from pydantic import BaseModel, validator
2022-03-19 18:06:17 +00:00
from sqlalchemy.exc import IntegrityError
2022-03-18 23:45:09 +00:00
from sqlalchemy.orm import Session
from . import models
2022-03-17 17:06:00 +00:00
class CertificateBase(BaseModel):
expiry: datetime
class CertificateCreate(CertificateBase):
owner_name: str
dn_id: int
class Certificate(CertificateBase):
id: int
class Config:
orm_mode = True
2022-03-19 18:31:03 +00:00
class UserCapability(Enum):
admin = "admin"
2022-03-19 23:56:11 +00:00
@classmethod
def from_value(cls, value) -> UserCapability:
if isinstance(value, cls):
# use simple value
return value
elif isinstance(value, models.UserCapability):
# create from db
return cls(value.capability)
else:
# create from string representation
return cls(str(value))
2022-03-19 19:24:43 +00:00
2022-03-19 18:31:03 +00:00
2022-03-17 17:06:00 +00:00
class UserBase(BaseModel):
name: str
class UserCreate(UserBase):
password: str
class User(UserBase):
certificates: list[Certificate]
2022-03-19 18:31:03 +00:00
capabilities: list[UserCapability]
2022-03-17 17:06:00 +00:00
class Config:
orm_mode = True
2022-03-19 18:31:03 +00:00
@validator("capabilities", pre=True)
@classmethod
def unify_capabilities(
cls,
2022-03-19 23:56:11 +00:00
value: list[models.UserCapability | UserCapability | str]
2022-03-19 18:31:03 +00:00
) -> list[UserCapability]:
return [
2022-03-19 23:56:11 +00:00
UserCapability.from_value(capability)
2022-03-19 18:31:03 +00:00
for capability in value
]
2022-03-18 23:45:09 +00:00
@classmethod
2022-03-19 02:38:32 +00:00
def from_db(
2022-03-18 23:45:09 +00:00
cls,
db: Session,
name: str,
) -> User | None:
2022-03-19 23:56:11 +00:00
if (db_user := models.User.load(db, name)) is None:
return None
2022-03-19 23:56:11 +00:00
return cls.from_orm(db_user)
@classmethod
2022-03-19 02:38:32 +00:00
def login(
cls,
db: Session,
name: str,
password: str,
crypt_context: CryptContext,
) -> User | None:
2022-03-19 23:56:11 +00:00
if (db_user := models.User.load(db, name)) is None:
2022-03-19 18:06:17 +00:00
# inexistent user, fake doing password verification
crypt_context.dummy_verify()
return None
2022-03-19 23:56:11 +00:00
if not crypt_context.verify(password, db_user.password):
2022-03-19 18:06:17 +00:00
# password hash mismatch
return None
2022-03-19 23:56:11 +00:00
return cls.from_orm(db_user)
2022-03-18 23:45:09 +00:00
@classmethod
def create(
cls,
db: Session,
user: UserCreate,
crypt_context: CryptContext,
2022-03-19 18:06:17 +00:00
) -> User | None:
try:
user = models.User(
name=user.name,
password=crypt_context.hash(user.password),
2022-03-19 19:24:43 +00:00
capabilities=[],
2022-03-19 18:06:17 +00:00
)
db.add(user)
db.commit()
db.refresh(user)
return cls.from_orm(user)
except IntegrityError:
pass
2022-03-18 23:45:09 +00:00
2022-03-19 19:24:43 +00:00
def add_capabilities(
self,
db: Session,
capabilities: list[UserCapability],
2022-03-20 00:12:37 +00:00
) -> None:
2022-03-19 23:56:11 +00:00
for capability in capabilities:
2022-03-20 00:12:37 +00:00
if capability not in self.capabilities:
2022-03-20 01:37:27 +00:00
db.add(models.UserCapability(
2022-03-20 00:12:37 +00:00
user_name=self.name,
capability=capability.value,
2022-03-20 01:37:27 +00:00
))
2022-03-20 00:12:37 +00:00
db.commit()
2022-03-19 19:24:43 +00:00
2022-03-17 17:06:00 +00:00
class DistinguishedNameBase(BaseModel):
cn_only: bool
country: str
state: str
city: str
organization: str
organizational_unit: str
email: str
common_name: str
class DistinguishedNameCreate(DistinguishedNameBase):
pass
class DistinguishedName(DistinguishedNameBase):
id: int
certificates: list[Certificate]
class Config:
orm_mode = True