split db.schemas -> db.schemata package

This commit is contained in:
Jörn-Michael Miehe 2022-03-25 23:54:19 +00:00
parent 02225cdf09
commit c47fa5a89b
10 changed files with 149 additions and 132 deletions

View file

@ -1,4 +1,4 @@
from . import models, schemas from . import models, schemata
from .connection import Connection from .connection import Connection
__all__ = ["Connection", "models", "schemas"] __all__ = ["Connection", "models", "schemata"]

View file

@ -0,0 +1,6 @@
from .device import Device, DeviceBase, DeviceCreate
from .user import User, UserBase, UserCreate
from .user_capability import UserCapability
__all__ = ["Device", "DeviceBase", "DeviceCreate",
"User", "UserBase", "UserCreate", "UserCapability"]

View file

@ -0,0 +1,79 @@
"""
Pydantic representation of database contents.
"""
from __future__ import annotations
from datetime import datetime
from pydantic import BaseModel
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import Session
from .. import models
class DeviceBase(BaseModel):
name: str
type: str
expiry: datetime
class DeviceCreate(DeviceBase):
owner_name: str
class Device(DeviceBase):
class Config:
orm_mode = True
@classmethod
def create(
cls,
db: Session,
device: DeviceCreate,
) -> Device | None:
"""
Create a new device in the database.
"""
try:
db_device = models.Device(
owner_name=device.owner_name,
name=device.name,
type=device.type,
expiry=device.expiry,
)
db.add(db_device)
db.commit()
db.refresh(db_device)
return cls.from_orm(db_device)
except IntegrityError:
# device already existed
return None
def delete(
self,
db: Session,
) -> bool:
"""
Delete this device from the database.
"""
db_device = models.Device(
# owner_name=
name=self.name,
)
db.refresh(db_device)
if db_device is None:
# nonexistent device
return False
db.delete(db_device)
db.commit()
return True

View file

@ -2,62 +2,18 @@
Pydantic representation of database contents. Pydantic representation of database contents.
""" """
from __future__ import annotations from __future__ import annotations
from datetime import datetime
from enum import Enum
from typing import Any from typing import Any
from passlib.context import CryptContext from passlib.context import CryptContext
from pydantic import BaseModel, Field, validator from pydantic import BaseModel, Field, validator
from sqlalchemy.exc import IntegrityError from sqlalchemy.exc import IntegrityError, InvalidRequestError
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from . import models from .. import models
from .device import Device
########## from .user_capability import UserCapability
# table: user_capabilities
##########
class UserCapability(Enum):
admin = "admin"
login = "login"
issue = "issue"
renew = "renew"
def __repr__(self) -> str:
return self.value
@classmethod
def from_value(cls, value) -> UserCapability:
"""
Create UserCapability from various formats
"""
if isinstance(value, cls):
# value is already a UserCapability, use that
return value
elif isinstance(value, models.UserCapability):
# create from db format
return cls(value.capability)
else:
# create from string representation
return cls(str(value))
@property
def model(self) -> models.UserCapability:
return models.UserCapability(
capability=self.value,
)
##########
# table: users
##########
class UserBase(BaseModel): class UserBase(BaseModel):
@ -71,14 +27,14 @@ class UserBase(BaseModel):
email: str email: str
capabilities: list[UserCapability] = []
class UserCreate(UserBase): class UserCreate(UserBase):
password: str password: str
class User(UserBase): class User(UserBase):
capabilities: list[UserCapability] = []
devices: list[Device] = Field( devices: list[Device] = Field(
default=[], repr=False default=[], repr=False
) )
@ -108,10 +64,14 @@ class User(UserBase):
Load user from database by name. Load user from database by name.
""" """
db_user = models.User(name=name) try:
db.refresh(db_user) db_user = models.User(name=name)
db.refresh(db_user)
return cls.from_orm(db_user) return cls.from_orm(db_user)
except InvalidRequestError:
return None
@classmethod @classmethod
def create( def create(
@ -142,7 +102,7 @@ class User(UserBase):
except IntegrityError: except IntegrityError:
# user already existed # user already existed
pass return None
def is_admin(self) -> bool: def is_admin(self) -> bool:
return UserCapability.admin in self.capabilities return UserCapability.admin in self.capabilities
@ -212,74 +172,3 @@ class User(UserBase):
db.delete(db_user) db.delete(db_user)
db.commit() db.commit()
return True return True
##########
# table: devices
##########
class DeviceBase(BaseModel):
name: str
type: str
expiry: datetime
class DeviceCreate(DeviceBase):
owner_name: str
class Device(DeviceBase):
class Config:
orm_mode = True
@classmethod
def create(
cls,
db: Session,
device: DeviceCreate,
) -> Device | None:
"""
Create a new device in the database.
"""
try:
db_device = models.Device(
owner_name=device.owner_name,
name=device.name,
type=device.type,
expiry=device.expiry,
)
db.add(db_device)
db.commit()
db.refresh(db_device)
return cls.from_orm(db_device)
except IntegrityError:
# device already existed
pass
def delete(
self,
db: Session,
) -> bool:
"""
Delete this device from the database.
"""
db_device = models.Device(
# owner_name=
name=self.name,
)
db.refresh(db_device)
if db_device is None:
# nonexistent device
return False
db.delete(db_device)
db.commit()
return True

View file

@ -0,0 +1,43 @@
"""
Pydantic representation of database contents.
"""
from __future__ import annotations
from enum import Enum
from .. import models
class UserCapability(Enum):
admin = "admin"
login = "login"
issue = "issue"
renew = "renew"
def __repr__(self) -> str:
return self.value
@classmethod
def from_value(cls, value) -> UserCapability:
"""
Create UserCapability from various formats
"""
if isinstance(value, cls):
# value is already a UserCapability, use that
return value
elif isinstance(value, models.UserCapability):
# create from db format
return cls(value.capability)
else:
# create from string representation
return cls(str(value))
@property
def model(self) -> models.UserCapability:
return models.UserCapability(
capability=self.value,
)

View file

@ -14,7 +14,7 @@ from fastapi import FastAPI
from .config import Config, Settings from .config import Config, Settings
from .db import Connection from .db import Connection
from .db.schemas import User from .db.schemata import User
from .routers import main_router from .routers import main_router
settings = Settings.get() settings = Settings.get()

View file

@ -9,7 +9,7 @@ from sqlalchemy.orm import Session
from ..config import Config from ..config import Config
from ..db import Connection from ..db import Connection
from ..db.schemas import User from ..db.schemata import User
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="user/authenticate") oauth2_scheme = OAuth2PasswordBearer(tokenUrl="user/authenticate")

View file

@ -7,7 +7,7 @@ from fastapi import APIRouter, Depends, HTTPException, status
from ..config import Config from ..config import Config
from ..db import Connection from ..db import Connection
from ..db.schemas import User, UserCapability, UserCreate from ..db.schemata import User, UserCapability, UserCreate
from ._common import Responses, get_current_user from ._common import Responses, get_current_user
router = APIRouter(prefix="/admin", tags=["admin"]) router = APIRouter(prefix="/admin", tags=["admin"])

View file

@ -7,7 +7,7 @@ from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from ..db import Connection from ..db import Connection
from ..db.schemas import DistinguishedName, DistinguishedNameCreate, User from ..db.schemata import DistinguishedName, DistinguishedNameCreate, User
from ._common import Responses, get_current_user_if_admin_or_self from ._common import Responses, get_current_user_if_admin_or_self
router = APIRouter(prefix="/dn") router = APIRouter(prefix="/dn")

View file

@ -9,7 +9,7 @@ from sqlalchemy.orm import Session
from ..config import Config from ..config import Config
from ..db import Connection from ..db import Connection
from ..db.schemas import User, UserCapability, UserCreate from ..db.schemata import User, UserCapability, UserCreate
from ._common import Responses, get_current_user, get_current_user_if_admin from ._common import Responses, get_current_user, get_current_user_if_admin
router = APIRouter(prefix="/user", tags=["user"]) router = APIRouter(prefix="/user", tags=["user"])