2022-03-20 03:45:40 +00:00
|
|
|
"""
|
|
|
|
Pydantic representation of database contents.
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
2022-03-18 23:45:09 +00:00
|
|
|
from __future__ import annotations
|
|
|
|
|
2022-03-17 17:06:00 +00:00
|
|
|
from datetime import datetime
|
2022-03-19 18:31:03 +00:00
|
|
|
from enum import Enum
|
2022-03-20 03:45:40 +00:00
|
|
|
from typing import Any
|
2022-03-18 23:45:09 +00:00
|
|
|
|
|
|
|
from passlib.context import CryptContext
|
2022-03-23 15:44:35 +00:00
|
|
|
from pydantic import BaseModel, Field, constr, validator
|
2022-03-19 18:06:17 +00:00
|
|
|
from sqlalchemy.exc import IntegrityError
|
2022-03-18 23:45:09 +00:00
|
|
|
from sqlalchemy.orm import Session
|
|
|
|
|
|
|
|
from . import models
|
2022-03-17 17:06:00 +00:00
|
|
|
|
2022-03-22 16:29:02 +00:00
|
|
|
##########
|
|
|
|
# table: distinguished_names
|
|
|
|
##########
|
|
|
|
|
|
|
|
|
|
|
|
class DistinguishedNameBase(BaseModel):
|
|
|
|
cn_only: bool
|
2022-03-23 15:44:35 +00:00
|
|
|
country: constr(max_length=2) | None
|
|
|
|
state: str | None
|
|
|
|
city: str | None
|
|
|
|
organization: str | None
|
|
|
|
organizational_unit: str | None
|
|
|
|
email: str | None
|
2022-03-22 16:29:02 +00:00
|
|
|
common_name: str
|
|
|
|
|
|
|
|
|
|
|
|
class DistinguishedNameCreate(DistinguishedNameBase):
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
class DistinguishedName(DistinguishedNameBase):
|
|
|
|
class Config:
|
|
|
|
orm_mode = True
|
|
|
|
|
2022-03-23 15:44:35 +00:00
|
|
|
@classmethod
|
|
|
|
def create(
|
|
|
|
cls,
|
|
|
|
db: Session,
|
|
|
|
dn: DistinguishedNameCreate,
|
|
|
|
owner: User,
|
|
|
|
) -> User | None:
|
|
|
|
"""
|
|
|
|
Create a new distinguished name in the database.
|
|
|
|
"""
|
|
|
|
|
|
|
|
try:
|
|
|
|
db_owner = models.User.load(
|
|
|
|
db=db,
|
|
|
|
name=owner.name,
|
|
|
|
)
|
|
|
|
|
|
|
|
dn = models.DistinguishedName(
|
|
|
|
cn_only=dn.cn_only,
|
|
|
|
country=dn.country,
|
|
|
|
state=dn.state,
|
|
|
|
city=dn.city,
|
|
|
|
organization=dn.organization,
|
|
|
|
organizational_unit=dn.organizational_unit,
|
|
|
|
email=dn.email,
|
|
|
|
common_name=dn.common_name,
|
|
|
|
owner=db_owner,
|
|
|
|
)
|
|
|
|
|
|
|
|
db.add(dn)
|
|
|
|
db.commit()
|
|
|
|
db.refresh(dn)
|
|
|
|
|
|
|
|
return cls.from_orm(dn)
|
|
|
|
|
|
|
|
except IntegrityError:
|
|
|
|
# distinguished name already existed
|
|
|
|
pass
|
|
|
|
|
2022-03-20 03:45:40 +00:00
|
|
|
##########
|
|
|
|
# table: certificates
|
|
|
|
##########
|
|
|
|
|
2022-03-17 17:06:00 +00:00
|
|
|
|
|
|
|
class CertificateBase(BaseModel):
|
|
|
|
expiry: datetime
|
|
|
|
|
|
|
|
|
|
|
|
class CertificateCreate(CertificateBase):
|
2022-03-22 16:29:02 +00:00
|
|
|
pass
|
2022-03-17 17:06:00 +00:00
|
|
|
|
|
|
|
|
|
|
|
class Certificate(CertificateBase):
|
2022-03-22 16:29:02 +00:00
|
|
|
distinguished_name: DistinguishedName
|
2022-03-17 17:06:00 +00:00
|
|
|
|
|
|
|
class Config:
|
|
|
|
orm_mode = True
|
|
|
|
|
2022-03-20 03:45:40 +00:00
|
|
|
##########
|
|
|
|
# table: user_capabilities
|
|
|
|
##########
|
|
|
|
|
2022-03-17 17:06:00 +00:00
|
|
|
|
2022-03-19 18:31:03 +00:00
|
|
|
class UserCapability(Enum):
|
|
|
|
admin = "admin"
|
|
|
|
|
2022-03-23 15:44:35 +00:00
|
|
|
def __repr__(self) -> str:
|
|
|
|
return self.value
|
|
|
|
|
2022-03-19 23:56:11 +00:00
|
|
|
@classmethod
|
|
|
|
def from_value(cls, value) -> UserCapability:
|
2022-03-20 03:45:40 +00:00
|
|
|
"""
|
|
|
|
Create UserCapability from various formats
|
|
|
|
"""
|
|
|
|
|
2022-03-19 23:56:11 +00:00
|
|
|
if isinstance(value, cls):
|
2022-03-20 03:45:40 +00:00
|
|
|
# value is already a UserCapability, use that
|
2022-03-19 23:56:11 +00:00
|
|
|
return value
|
|
|
|
|
|
|
|
elif isinstance(value, models.UserCapability):
|
2022-03-20 03:45:40 +00:00
|
|
|
# create from db format
|
2022-03-19 23:56:11 +00:00
|
|
|
return cls(value.capability)
|
|
|
|
|
|
|
|
else:
|
|
|
|
# create from string representation
|
|
|
|
return cls(str(value))
|
2022-03-19 19:24:43 +00:00
|
|
|
|
2022-03-20 03:45:40 +00:00
|
|
|
##########
|
|
|
|
# table: users
|
|
|
|
##########
|
|
|
|
|
2022-03-19 18:31:03 +00:00
|
|
|
|
2022-03-17 17:06:00 +00:00
|
|
|
class UserBase(BaseModel):
|
|
|
|
name: str
|
|
|
|
|
|
|
|
|
|
|
|
class UserCreate(UserBase):
|
|
|
|
password: str
|
|
|
|
|
|
|
|
|
|
|
|
class User(UserBase):
|
2022-03-20 04:00:14 +00:00
|
|
|
capabilities: list[UserCapability] = []
|
2022-03-23 15:44:35 +00:00
|
|
|
|
|
|
|
distinguished_names: list[DistinguishedName] = Field(
|
|
|
|
default=[], repr=False
|
|
|
|
)
|
|
|
|
|
|
|
|
certificates: list[Certificate] = Field(
|
|
|
|
default=[], repr=False
|
|
|
|
)
|
2022-03-17 17:06:00 +00:00
|
|
|
|
|
|
|
class Config:
|
|
|
|
orm_mode = True
|
|
|
|
|
2022-03-19 18:31:03 +00:00
|
|
|
@validator("capabilities", pre=True)
|
|
|
|
@classmethod
|
2022-03-20 03:45:40 +00:00
|
|
|
def unify_capabilities(cls, value: list[Any]) -> list[UserCapability]:
|
|
|
|
"""
|
|
|
|
Import the capabilities from various formats
|
|
|
|
"""
|
|
|
|
|
2022-03-19 18:31:03 +00:00
|
|
|
return [
|
2022-03-19 23:56:11 +00:00
|
|
|
UserCapability.from_value(capability)
|
2022-03-19 18:31:03 +00:00
|
|
|
for capability in value
|
|
|
|
]
|
|
|
|
|
2022-03-18 23:45:09 +00:00
|
|
|
@classmethod
|
2022-03-19 02:38:32 +00:00
|
|
|
def from_db(
|
2022-03-18 23:45:09 +00:00
|
|
|
cls,
|
|
|
|
db: Session,
|
|
|
|
name: str,
|
2022-03-19 00:38:57 +00:00
|
|
|
) -> User | None:
|
2022-03-20 03:45:40 +00:00
|
|
|
"""
|
|
|
|
Load user from database by name.
|
|
|
|
"""
|
|
|
|
|
2022-03-19 23:56:11 +00:00
|
|
|
if (db_user := models.User.load(db, name)) is None:
|
2022-03-19 00:38:57 +00:00
|
|
|
return None
|
|
|
|
|
2022-03-19 23:56:11 +00:00
|
|
|
return cls.from_orm(db_user)
|
2022-03-19 00:38:57 +00:00
|
|
|
|
2022-03-18 23:45:09 +00:00
|
|
|
@classmethod
|
|
|
|
def create(
|
|
|
|
cls,
|
|
|
|
db: Session,
|
|
|
|
user: UserCreate,
|
|
|
|
crypt_context: CryptContext,
|
2022-03-19 18:06:17 +00:00
|
|
|
) -> User | None:
|
2022-03-20 03:45:40 +00:00
|
|
|
"""
|
|
|
|
Create a new user in the database.
|
|
|
|
"""
|
|
|
|
|
2022-03-19 18:06:17 +00:00
|
|
|
try:
|
|
|
|
user = models.User(
|
|
|
|
name=user.name,
|
|
|
|
password=crypt_context.hash(user.password),
|
2022-03-19 19:24:43 +00:00
|
|
|
capabilities=[],
|
2022-03-19 18:06:17 +00:00
|
|
|
)
|
|
|
|
|
|
|
|
db.add(user)
|
|
|
|
db.commit()
|
|
|
|
db.refresh(user)
|
|
|
|
|
|
|
|
return cls.from_orm(user)
|
|
|
|
|
|
|
|
except IntegrityError:
|
2022-03-20 03:45:40 +00:00
|
|
|
# user already existed
|
2022-03-19 18:06:17 +00:00
|
|
|
pass
|
2022-03-18 23:45:09 +00:00
|
|
|
|
2022-03-23 13:23:33 +00:00
|
|
|
def is_admin(self) -> bool:
|
|
|
|
return UserCapability.admin in self.capabilities
|
|
|
|
|
2022-03-20 04:00:14 +00:00
|
|
|
def authenticate(
|
|
|
|
self,
|
|
|
|
db: Session,
|
|
|
|
password: str,
|
|
|
|
crypt_context: CryptContext,
|
|
|
|
) -> User | None:
|
|
|
|
"""
|
|
|
|
Authenticate with name/password against users in database.
|
|
|
|
"""
|
|
|
|
|
|
|
|
if (db_user := models.User.load(db, self.name)) is None:
|
|
|
|
# nonexistent user, fake doing password verification
|
|
|
|
crypt_context.dummy_verify()
|
|
|
|
return False
|
|
|
|
|
|
|
|
if not crypt_context.verify(password, db_user.password):
|
|
|
|
# password hash mismatch
|
|
|
|
return False
|
|
|
|
|
|
|
|
self.from_orm(db_user)
|
|
|
|
|
|
|
|
return True
|
|
|
|
|
2022-03-20 13:14:12 +00:00
|
|
|
def update(
|
2022-03-19 19:24:43 +00:00
|
|
|
self,
|
|
|
|
db: Session,
|
2022-03-20 00:12:37 +00:00
|
|
|
) -> None:
|
2022-03-20 03:45:40 +00:00
|
|
|
"""
|
2022-03-20 13:14:12 +00:00
|
|
|
Update this user in the database.
|
2022-03-20 03:45:40 +00:00
|
|
|
"""
|
|
|
|
|
2022-03-20 13:14:12 +00:00
|
|
|
old_dbuser = models.User.load(db, self.name)
|
|
|
|
old_user = self.from_orm(old_dbuser)
|
|
|
|
|
|
|
|
for capability in self.capabilities:
|
|
|
|
if capability not in old_user.capabilities:
|
|
|
|
old_dbuser.capabilities.append(
|
|
|
|
models.UserCapability(capability=capability.value)
|
|
|
|
)
|
2022-03-20 00:12:37 +00:00
|
|
|
|
2022-03-23 00:39:19 +00:00
|
|
|
for capability in old_dbuser.capabilities:
|
|
|
|
if UserCapability.from_value(capability) not in self.capabilities:
|
|
|
|
db.delete(capability)
|
|
|
|
|
2022-03-20 00:12:37 +00:00
|
|
|
db.commit()
|
2022-03-23 13:25:00 +00:00
|
|
|
|
|
|
|
def delete(
|
|
|
|
self,
|
|
|
|
db: Session,
|
|
|
|
) -> bool:
|
|
|
|
"""
|
|
|
|
Delete this user from the database.
|
|
|
|
"""
|
|
|
|
|
|
|
|
if (db_user := models.User.load(db, self.name)) is None:
|
|
|
|
# nonexistent user
|
|
|
|
return False
|
|
|
|
|
|
|
|
db.delete(db_user)
|
|
|
|
db.commit()
|
|
|
|
return True
|