Compare commits

..

3 commits

11 changed files with 172 additions and 149 deletions

View file

@ -1,4 +1,4 @@
from . import models, schemas
from . import models, schemata
from .connection import Connection
__all__ = ["Connection", "models", "schemas"]
__all__ = ["Connection", "models", "schemata"]

View file

@ -7,7 +7,7 @@ from __future__ import annotations
from sqlalchemy import (Column, DateTime, ForeignKey, Integer, String,
UniqueConstraint)
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import relationship
from sqlalchemy.orm import Session, relationship
ORMBaseModel = declarative_base()
@ -29,6 +29,7 @@ class User(ORMBaseModel):
name = Column(String, primary_key=True, index=True)
password = Column(String, nullable=False)
email = Column(String)
country = Column(String(2))
state = Column(String)
@ -36,8 +37,6 @@ class User(ORMBaseModel):
organization = Column(String)
organizational_unit = Column(String)
email = Column(String)
capabilities: list[UserCapability] = relationship(
"UserCapability", lazy="joined", cascade="all, delete-orphan"
)
@ -45,6 +44,23 @@ class User(ORMBaseModel):
"Device", lazy="select", back_populates="owner"
)
@classmethod
def from_db(
cls,
db: Session,
name: str,
) -> User | None:
"""
Load user from database by name.
"""
return (
db
.query(cls)
.filter(cls.name == name)
.first()
)
class Device(ORMBaseModel):
__tablename__ = "devices"
@ -57,7 +73,7 @@ class Device(ORMBaseModel):
expiry = Column(DateTime)
owner: User = relationship(
"User", lazy="joined", back_populates="distinguished_names"
"User", lazy="joined", back_populates="devices"
)
UniqueConstraint(

View file

@ -0,0 +1,6 @@
from .device import Device, DeviceBase, DeviceCreate
from .user import User, UserBase, UserCreate
from .user_capability import UserCapability
__all__ = ["Device", "DeviceBase", "DeviceCreate",
"User", "UserBase", "UserCreate", "UserCapability"]

View file

@ -0,0 +1,79 @@
"""
Pydantic representation of database contents.
"""
from __future__ import annotations
from datetime import datetime
from pydantic import BaseModel
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import Session
from .. import models
class DeviceBase(BaseModel):
name: str
type: str
expiry: datetime
class DeviceCreate(DeviceBase):
owner_name: str
class Device(DeviceBase):
class Config:
orm_mode = True
@classmethod
def create(
cls,
db: Session,
device: DeviceCreate,
) -> Device | None:
"""
Create a new device in the database.
"""
try:
db_device = models.Device(
owner_name=device.owner_name,
name=device.name,
type=device.type,
expiry=device.expiry,
)
db.add(db_device)
db.commit()
db.refresh(db_device)
return cls.from_orm(db_device)
except IntegrityError:
# device already existed
return None
def delete(
self,
db: Session,
) -> bool:
"""
Delete this device from the database.
"""
db_device = models.Device(
# owner_name=
name=self.name,
)
db.refresh(db_device)
if db_device is None:
# nonexistent device
return False
db.delete(db_device)
db.commit()
return True

View file

@ -2,11 +2,8 @@
Pydantic representation of database contents.
"""
from __future__ import annotations
from datetime import datetime
from enum import Enum
from typing import Any
from passlib.context import CryptContext
@ -14,65 +11,23 @@ from pydantic import BaseModel, Field, validator
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import Session
from . import models
##########
# table: user_capabilities
##########
class UserCapability(Enum):
admin = "admin"
login = "login"
issue = "issue"
renew = "renew"
def __repr__(self) -> str:
return self.value
@classmethod
def from_value(cls, value) -> UserCapability:
"""
Create UserCapability from various formats
"""
if isinstance(value, cls):
# value is already a UserCapability, use that
return value
elif isinstance(value, models.UserCapability):
# create from db format
return cls(value.capability)
else:
# create from string representation
return cls(str(value))
@property
def model(self) -> models.UserCapability:
return models.UserCapability(
capability=self.value,
)
##########
# table: users
##########
from .. import models
from .device import Device
from .user_capability import UserCapability
class UserBase(BaseModel):
name: str
country: str
state: str
city: str
organization: str
organizational_unit: str
email: str
capabilities: list[UserCapability] = []
country: str | None = Field(default=None, repr=False)
state: str | None = Field(default=None, repr=False)
city: str | None = Field(default=None, repr=False)
organization: str | None = Field(default=None, repr=False)
organizational_unit: str | None = Field(default=None, repr=False)
class UserCreate(UserBase):
password: str
@ -108,8 +63,8 @@ class User(UserBase):
Load user from database by name.
"""
db_user = models.User(name=name)
db.refresh(db_user)
if (db_user := models.User.from_db(db, name)) is None:
return None
return cls.from_orm(db_user)
@ -128,6 +83,7 @@ class User(UserBase):
db_user = models.User(
name=user.name,
password=crypt_context.hash(user.password),
email=user.email,
capabilities=[
capability.model
for capability in user.capabilities
@ -142,7 +98,7 @@ class User(UserBase):
except IntegrityError:
# user already existed
pass
return None
def is_admin(self) -> bool:
return UserCapability.admin in self.capabilities
@ -157,10 +113,7 @@ class User(UserBase):
Authenticate with name/password against users in database.
"""
db_user = models.User(name=self.name)
db.refresh(db_user)
if db_user is None:
if (db_user := models.User.from_db(db, self.name)) is None:
# nonexistent user, fake doing password verification
crypt_context.dummy_verify()
return False
@ -181,8 +134,8 @@ class User(UserBase):
Update this user in the database.
"""
db_user = models.User(name=self.name)
db.refresh(db_user)
if (db_user := models.User.from_db(db, self.name)) is None:
return None
for capability in db_user.capabilities:
db.delete(capability)
@ -202,84 +155,10 @@ class User(UserBase):
Delete this user from the database.
"""
db_user = models.User(name=self.name)
db.refresh(db_user)
if db_user is None:
if (db_user := models.User.from_db(db, self.name)) is None:
# nonexistent user
return False
db.delete(db_user)
db.commit()
return True
##########
# table: devices
##########
class DeviceBase(BaseModel):
name: str
type: str
expiry: datetime
class DeviceCreate(DeviceBase):
owner_name: str
class Device(DeviceBase):
class Config:
orm_mode = True
@classmethod
def create(
cls,
db: Session,
device: DeviceCreate,
) -> Device | None:
"""
Create a new device in the database.
"""
try:
db_device = models.Device(
owner_name=device.owner_name,
name=device.name,
type=device.type,
expiry=device.expiry,
)
db.add(db_device)
db.commit()
db.refresh(db_device)
return cls.from_orm(db_device)
except IntegrityError:
# device already existed
pass
def delete(
self,
db: Session,
) -> bool:
"""
Delete this device from the database.
"""
db_device = models.Device(
# owner_name=
name=self.name,
)
db.refresh(db_device)
if db_device is None:
# nonexistent device
return False
db.delete(db_device)
db.commit()
return True

View file

@ -0,0 +1,43 @@
"""
Pydantic representation of database contents.
"""
from __future__ import annotations
from enum import Enum
from .. import models
class UserCapability(Enum):
admin = "admin"
login = "login"
issue = "issue"
renew = "renew"
def __repr__(self) -> str:
return self.value
@classmethod
def from_value(cls, value) -> UserCapability:
"""
Create UserCapability from various formats
"""
if isinstance(value, cls):
# value is already a UserCapability, use that
return value
elif isinstance(value, models.UserCapability):
# create from db format
return cls(value.capability)
else:
# create from string representation
return cls(str(value))
@property
def model(self) -> models.UserCapability:
return models.UserCapability(
capability=self.value,
)

View file

@ -14,7 +14,7 @@ from fastapi import FastAPI
from .config import Config, Settings
from .db import Connection
from .db.schemas import User
from .db.schemata import User
from .routers import main_router
settings = Settings.get()

View file

@ -9,7 +9,7 @@ from sqlalchemy.orm import Session
from ..config import Config
from ..db import Connection
from ..db.schemas import User
from ..db.schemata import User
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="user/authenticate")

View file

@ -7,7 +7,7 @@ from fastapi import APIRouter, Depends, HTTPException, status
from ..config import Config
from ..db import Connection
from ..db.schemas import User, UserCapability, UserCreate
from ..db.schemata import User, UserCapability, UserCreate
from ._common import Responses, get_current_user
router = APIRouter(prefix="/admin", tags=["admin"])

View file

@ -7,7 +7,7 @@ from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from ..db import Connection
from ..db.schemas import DistinguishedName, DistinguishedNameCreate, User
from ..db.schemata import DistinguishedName, DistinguishedNameCreate, User
from ._common import Responses, get_current_user_if_admin_or_self
router = APIRouter(prefix="/dn")

View file

@ -9,7 +9,7 @@ from sqlalchemy.orm import Session
from ..config import Config
from ..db import Connection
from ..db.schemas import User, UserCapability, UserCreate
from ..db.schemata import User, UserCapability, UserCreate
from ._common import Responses, get_current_user, get_current_user_if_admin
router = APIRouter(prefix="/user", tags=["user"])