Compare commits

..

3 commits

11 changed files with 172 additions and 149 deletions

View file

@ -1,4 +1,4 @@
from . import models, schemas from . import models, schemata
from .connection import Connection from .connection import Connection
__all__ = ["Connection", "models", "schemas"] __all__ = ["Connection", "models", "schemata"]

View file

@ -7,7 +7,7 @@ from __future__ import annotations
from sqlalchemy import (Column, DateTime, ForeignKey, Integer, String, from sqlalchemy import (Column, DateTime, ForeignKey, Integer, String,
UniqueConstraint) UniqueConstraint)
from sqlalchemy.ext.declarative import declarative_base from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship from sqlalchemy.orm import Session, relationship
ORMBaseModel = declarative_base() ORMBaseModel = declarative_base()
@ -29,6 +29,7 @@ class User(ORMBaseModel):
name = Column(String, primary_key=True, index=True) name = Column(String, primary_key=True, index=True)
password = Column(String, nullable=False) password = Column(String, nullable=False)
email = Column(String)
country = Column(String(2)) country = Column(String(2))
state = Column(String) state = Column(String)
@ -36,8 +37,6 @@ class User(ORMBaseModel):
organization = Column(String) organization = Column(String)
organizational_unit = Column(String) organizational_unit = Column(String)
email = Column(String)
capabilities: list[UserCapability] = relationship( capabilities: list[UserCapability] = relationship(
"UserCapability", lazy="joined", cascade="all, delete-orphan" "UserCapability", lazy="joined", cascade="all, delete-orphan"
) )
@ -45,6 +44,23 @@ class User(ORMBaseModel):
"Device", lazy="select", back_populates="owner" "Device", lazy="select", back_populates="owner"
) )
@classmethod
def from_db(
cls,
db: Session,
name: str,
) -> User | None:
"""
Load user from database by name.
"""
return (
db
.query(cls)
.filter(cls.name == name)
.first()
)
class Device(ORMBaseModel): class Device(ORMBaseModel):
__tablename__ = "devices" __tablename__ = "devices"
@ -57,7 +73,7 @@ class Device(ORMBaseModel):
expiry = Column(DateTime) expiry = Column(DateTime)
owner: User = relationship( owner: User = relationship(
"User", lazy="joined", back_populates="distinguished_names" "User", lazy="joined", back_populates="devices"
) )
UniqueConstraint( UniqueConstraint(

View file

@ -0,0 +1,6 @@
from .device import Device, DeviceBase, DeviceCreate
from .user import User, UserBase, UserCreate
from .user_capability import UserCapability
__all__ = ["Device", "DeviceBase", "DeviceCreate",
"User", "UserBase", "UserCreate", "UserCapability"]

View file

@ -0,0 +1,79 @@
"""
Pydantic representation of database contents.
"""
from __future__ import annotations
from datetime import datetime
from pydantic import BaseModel
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import Session
from .. import models
class DeviceBase(BaseModel):
name: str
type: str
expiry: datetime
class DeviceCreate(DeviceBase):
owner_name: str
class Device(DeviceBase):
class Config:
orm_mode = True
@classmethod
def create(
cls,
db: Session,
device: DeviceCreate,
) -> Device | None:
"""
Create a new device in the database.
"""
try:
db_device = models.Device(
owner_name=device.owner_name,
name=device.name,
type=device.type,
expiry=device.expiry,
)
db.add(db_device)
db.commit()
db.refresh(db_device)
return cls.from_orm(db_device)
except IntegrityError:
# device already existed
return None
def delete(
self,
db: Session,
) -> bool:
"""
Delete this device from the database.
"""
db_device = models.Device(
# owner_name=
name=self.name,
)
db.refresh(db_device)
if db_device is None:
# nonexistent device
return False
db.delete(db_device)
db.commit()
return True

View file

@ -2,11 +2,8 @@
Pydantic representation of database contents. Pydantic representation of database contents.
""" """
from __future__ import annotations from __future__ import annotations
from datetime import datetime
from enum import Enum
from typing import Any from typing import Any
from passlib.context import CryptContext from passlib.context import CryptContext
@ -14,65 +11,23 @@ from pydantic import BaseModel, Field, validator
from sqlalchemy.exc import IntegrityError from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from . import models from .. import models
from .device import Device
########## from .user_capability import UserCapability
# table: user_capabilities
##########
class UserCapability(Enum):
admin = "admin"
login = "login"
issue = "issue"
renew = "renew"
def __repr__(self) -> str:
return self.value
@classmethod
def from_value(cls, value) -> UserCapability:
"""
Create UserCapability from various formats
"""
if isinstance(value, cls):
# value is already a UserCapability, use that
return value
elif isinstance(value, models.UserCapability):
# create from db format
return cls(value.capability)
else:
# create from string representation
return cls(str(value))
@property
def model(self) -> models.UserCapability:
return models.UserCapability(
capability=self.value,
)
##########
# table: users
##########
class UserBase(BaseModel): class UserBase(BaseModel):
name: str name: str
country: str
state: str
city: str
organization: str
organizational_unit: str
email: str email: str
capabilities: list[UserCapability] = [] capabilities: list[UserCapability] = []
country: str | None = Field(default=None, repr=False)
state: str | None = Field(default=None, repr=False)
city: str | None = Field(default=None, repr=False)
organization: str | None = Field(default=None, repr=False)
organizational_unit: str | None = Field(default=None, repr=False)
class UserCreate(UserBase): class UserCreate(UserBase):
password: str password: str
@ -108,8 +63,8 @@ class User(UserBase):
Load user from database by name. Load user from database by name.
""" """
db_user = models.User(name=name) if (db_user := models.User.from_db(db, name)) is None:
db.refresh(db_user) return None
return cls.from_orm(db_user) return cls.from_orm(db_user)
@ -128,6 +83,7 @@ class User(UserBase):
db_user = models.User( db_user = models.User(
name=user.name, name=user.name,
password=crypt_context.hash(user.password), password=crypt_context.hash(user.password),
email=user.email,
capabilities=[ capabilities=[
capability.model capability.model
for capability in user.capabilities for capability in user.capabilities
@ -142,7 +98,7 @@ class User(UserBase):
except IntegrityError: except IntegrityError:
# user already existed # user already existed
pass return None
def is_admin(self) -> bool: def is_admin(self) -> bool:
return UserCapability.admin in self.capabilities return UserCapability.admin in self.capabilities
@ -157,10 +113,7 @@ class User(UserBase):
Authenticate with name/password against users in database. Authenticate with name/password against users in database.
""" """
db_user = models.User(name=self.name) if (db_user := models.User.from_db(db, self.name)) is None:
db.refresh(db_user)
if db_user is None:
# nonexistent user, fake doing password verification # nonexistent user, fake doing password verification
crypt_context.dummy_verify() crypt_context.dummy_verify()
return False return False
@ -181,8 +134,8 @@ class User(UserBase):
Update this user in the database. Update this user in the database.
""" """
db_user = models.User(name=self.name) if (db_user := models.User.from_db(db, self.name)) is None:
db.refresh(db_user) return None
for capability in db_user.capabilities: for capability in db_user.capabilities:
db.delete(capability) db.delete(capability)
@ -202,84 +155,10 @@ class User(UserBase):
Delete this user from the database. Delete this user from the database.
""" """
db_user = models.User(name=self.name) if (db_user := models.User.from_db(db, self.name)) is None:
db.refresh(db_user)
if db_user is None:
# nonexistent user # nonexistent user
return False return False
db.delete(db_user) db.delete(db_user)
db.commit() db.commit()
return True return True
##########
# table: devices
##########
class DeviceBase(BaseModel):
name: str
type: str
expiry: datetime
class DeviceCreate(DeviceBase):
owner_name: str
class Device(DeviceBase):
class Config:
orm_mode = True
@classmethod
def create(
cls,
db: Session,
device: DeviceCreate,
) -> Device | None:
"""
Create a new device in the database.
"""
try:
db_device = models.Device(
owner_name=device.owner_name,
name=device.name,
type=device.type,
expiry=device.expiry,
)
db.add(db_device)
db.commit()
db.refresh(db_device)
return cls.from_orm(db_device)
except IntegrityError:
# device already existed
pass
def delete(
self,
db: Session,
) -> bool:
"""
Delete this device from the database.
"""
db_device = models.Device(
# owner_name=
name=self.name,
)
db.refresh(db_device)
if db_device is None:
# nonexistent device
return False
db.delete(db_device)
db.commit()
return True

View file

@ -0,0 +1,43 @@
"""
Pydantic representation of database contents.
"""
from __future__ import annotations
from enum import Enum
from .. import models
class UserCapability(Enum):
admin = "admin"
login = "login"
issue = "issue"
renew = "renew"
def __repr__(self) -> str:
return self.value
@classmethod
def from_value(cls, value) -> UserCapability:
"""
Create UserCapability from various formats
"""
if isinstance(value, cls):
# value is already a UserCapability, use that
return value
elif isinstance(value, models.UserCapability):
# create from db format
return cls(value.capability)
else:
# create from string representation
return cls(str(value))
@property
def model(self) -> models.UserCapability:
return models.UserCapability(
capability=self.value,
)

View file

@ -14,7 +14,7 @@ from fastapi import FastAPI
from .config import Config, Settings from .config import Config, Settings
from .db import Connection from .db import Connection
from .db.schemas import User from .db.schemata import User
from .routers import main_router from .routers import main_router
settings = Settings.get() settings = Settings.get()

View file

@ -9,7 +9,7 @@ from sqlalchemy.orm import Session
from ..config import Config from ..config import Config
from ..db import Connection from ..db import Connection
from ..db.schemas import User from ..db.schemata import User
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="user/authenticate") oauth2_scheme = OAuth2PasswordBearer(tokenUrl="user/authenticate")

View file

@ -7,7 +7,7 @@ from fastapi import APIRouter, Depends, HTTPException, status
from ..config import Config from ..config import Config
from ..db import Connection from ..db import Connection
from ..db.schemas import User, UserCapability, UserCreate from ..db.schemata import User, UserCapability, UserCreate
from ._common import Responses, get_current_user from ._common import Responses, get_current_user
router = APIRouter(prefix="/admin", tags=["admin"]) router = APIRouter(prefix="/admin", tags=["admin"])

View file

@ -7,7 +7,7 @@ from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
from ..db import Connection from ..db import Connection
from ..db.schemas import DistinguishedName, DistinguishedNameCreate, User from ..db.schemata import DistinguishedName, DistinguishedNameCreate, User
from ._common import Responses, get_current_user_if_admin_or_self from ._common import Responses, get_current_user_if_admin_or_self
router = APIRouter(prefix="/dn") router = APIRouter(prefix="/dn")

View file

@ -9,7 +9,7 @@ from sqlalchemy.orm import Session
from ..config import Config from ..config import Config
from ..db import Connection from ..db import Connection
from ..db.schemas import User, UserCapability, UserCreate from ..db.schemata import User, UserCapability, UserCreate
from ._common import Responses, get_current_user, get_current_user_if_admin from ._common import Responses, get_current_user, get_current_user_if_admin
router = APIRouter(prefix="/user", tags=["user"]) router = APIRouter(prefix="/user", tags=["user"])