kiwi-vpn/api/kiwi_vpn_api/db/schemas.py

286 lines
5.7 KiB
Python
Raw Normal View History

2022-03-20 03:45:40 +00:00
"""
Pydantic representation of database contents.
"""
2022-03-18 23:45:09 +00:00
from __future__ import annotations
2022-03-17 17:06:00 +00:00
from datetime import datetime
2022-03-19 18:31:03 +00:00
from enum import Enum
2022-03-20 03:45:40 +00:00
from typing import Any
2022-03-18 23:45:09 +00:00
from passlib.context import CryptContext
2022-03-25 23:03:56 +00:00
from pydantic import BaseModel, Field, validator
2022-03-19 18:06:17 +00:00
from sqlalchemy.exc import IntegrityError
2022-03-18 23:45:09 +00:00
from sqlalchemy.orm import Session
from . import models
2022-03-17 17:06:00 +00:00
2022-03-20 03:45:40 +00:00
##########
# table: user_capabilities
##########
2022-03-17 17:06:00 +00:00
2022-03-19 18:31:03 +00:00
class UserCapability(Enum):
admin = "admin"
2022-03-25 23:03:56 +00:00
login = "login"
issue = "issue"
renew = "renew"
2022-03-19 18:31:03 +00:00
2022-03-23 15:44:35 +00:00
def __repr__(self) -> str:
return self.value
2022-03-19 23:56:11 +00:00
@classmethod
def from_value(cls, value) -> UserCapability:
2022-03-20 03:45:40 +00:00
"""
Create UserCapability from various formats
"""
2022-03-19 23:56:11 +00:00
if isinstance(value, cls):
2022-03-20 03:45:40 +00:00
# value is already a UserCapability, use that
2022-03-19 23:56:11 +00:00
return value
elif isinstance(value, models.UserCapability):
2022-03-20 03:45:40 +00:00
# create from db format
2022-03-19 23:56:11 +00:00
return cls(value.capability)
else:
# create from string representation
return cls(str(value))
2022-03-19 19:24:43 +00:00
2022-03-25 23:03:56 +00:00
@property
def model(self) -> models.UserCapability:
return models.UserCapability(
capability=self.value,
)
2022-03-20 03:45:40 +00:00
##########
# table: users
##########
2022-03-19 18:31:03 +00:00
2022-03-17 17:06:00 +00:00
class UserBase(BaseModel):
name: str
2022-03-25 23:03:56 +00:00
country: str
state: str
city: str
organization: str
organizational_unit: str
email: str
capabilities: list[UserCapability] = []
2022-03-17 17:06:00 +00:00
class UserCreate(UserBase):
password: str
class User(UserBase):
2022-03-25 23:03:56 +00:00
devices: list[Device] = Field(
2022-03-23 15:44:35 +00:00
default=[], repr=False
)
2022-03-17 17:06:00 +00:00
class Config:
orm_mode = True
2022-03-19 18:31:03 +00:00
@validator("capabilities", pre=True)
@classmethod
2022-03-20 03:45:40 +00:00
def unify_capabilities(cls, value: list[Any]) -> list[UserCapability]:
"""
Import the capabilities from various formats
"""
2022-03-19 18:31:03 +00:00
return [
2022-03-19 23:56:11 +00:00
UserCapability.from_value(capability)
2022-03-19 18:31:03 +00:00
for capability in value
]
2022-03-18 23:45:09 +00:00
@classmethod
2022-03-19 02:38:32 +00:00
def from_db(
2022-03-18 23:45:09 +00:00
cls,
db: Session,
name: str,
) -> User | None:
2022-03-20 03:45:40 +00:00
"""
Load user from database by name.
"""
2022-03-25 23:03:56 +00:00
db_user = models.User(name=name)
db.refresh(db_user)
2022-03-19 23:56:11 +00:00
return cls.from_orm(db_user)
2022-03-18 23:45:09 +00:00
@classmethod
def create(
cls,
db: Session,
user: UserCreate,
crypt_context: CryptContext,
2022-03-19 18:06:17 +00:00
) -> User | None:
2022-03-20 03:45:40 +00:00
"""
Create a new user in the database.
"""
2022-03-19 18:06:17 +00:00
try:
2022-03-25 23:03:56 +00:00
db_user = models.User(
2022-03-19 18:06:17 +00:00
name=user.name,
password=crypt_context.hash(user.password),
2022-03-25 23:03:56 +00:00
capabilities=[
capability.model
for capability in user.capabilities
],
2022-03-19 18:06:17 +00:00
)
2022-03-25 23:03:56 +00:00
db.add(db_user)
2022-03-19 18:06:17 +00:00
db.commit()
2022-03-25 23:03:56 +00:00
db.refresh(db_user)
2022-03-19 18:06:17 +00:00
2022-03-25 23:03:56 +00:00
return cls.from_orm(db_user)
2022-03-19 18:06:17 +00:00
except IntegrityError:
2022-03-20 03:45:40 +00:00
# user already existed
2022-03-19 18:06:17 +00:00
pass
2022-03-18 23:45:09 +00:00
2022-03-23 13:23:33 +00:00
def is_admin(self) -> bool:
return UserCapability.admin in self.capabilities
def authenticate(
self,
db: Session,
password: str,
crypt_context: CryptContext,
) -> User | None:
"""
Authenticate with name/password against users in database.
"""
2022-03-25 23:03:56 +00:00
db_user = models.User(name=self.name)
db.refresh(db_user)
if db_user is None:
# nonexistent user, fake doing password verification
crypt_context.dummy_verify()
return False
if not crypt_context.verify(password, db_user.password):
# password hash mismatch
return False
self.from_orm(db_user)
return True
2022-03-20 13:14:12 +00:00
def update(
2022-03-19 19:24:43 +00:00
self,
db: Session,
2022-03-20 00:12:37 +00:00
) -> None:
2022-03-20 03:45:40 +00:00
"""
2022-03-20 13:14:12 +00:00
Update this user in the database.
2022-03-20 03:45:40 +00:00
"""
2022-03-25 23:03:56 +00:00
db_user = models.User(name=self.name)
db.refresh(db_user)
2022-03-20 13:14:12 +00:00
2022-03-25 23:03:56 +00:00
for capability in db_user.capabilities:
db.delete(capability)
2022-03-20 00:12:37 +00:00
2022-03-25 23:03:56 +00:00
db_user.capabilities = [
capability.model
for capability in self.capabilities
]
2022-03-23 00:39:19 +00:00
2022-03-20 00:12:37 +00:00
db.commit()
2022-03-23 13:25:00 +00:00
def delete(
self,
db: Session,
) -> bool:
"""
Delete this user from the database.
"""
2022-03-25 23:03:56 +00:00
db_user = models.User(name=self.name)
db.refresh(db_user)
if db_user is None:
2022-03-23 13:25:00 +00:00
# nonexistent user
return False
db.delete(db_user)
db.commit()
return True
2022-03-25 23:03:56 +00:00
##########
# table: devices
##########
class DeviceBase(BaseModel):
name: str
type: str
expiry: datetime
class DeviceCreate(DeviceBase):
owner_name: str
class Device(DeviceBase):
class Config:
orm_mode = True
@classmethod
def create(
cls,
db: Session,
device: DeviceCreate,
) -> Device | None:
"""
Create a new device in the database.
"""
try:
db_device = models.Device(
owner_name=device.owner_name,
name=device.name,
type=device.type,
expiry=device.expiry,
)
db.add(db_device)
db.commit()
db.refresh(db_device)
return cls.from_orm(db_device)
except IntegrityError:
# device already existed
pass
def delete(
self,
db: Session,
) -> bool:
"""
Delete this device from the database.
"""
db_device = models.Device(
# owner_name=
name=self.name,
)
db.refresh(db_device)
if db_device is None:
# nonexistent device
return False
db.delete(db_device)
db.commit()
return True