Compare commits

..

2 commits

Author SHA1 Message Date
e2f916debc Capabilities 2022-03-27 13:47:38 +00:00
9625336df9 User CRUD 2022-03-27 13:47:18 +00:00
3 changed files with 130 additions and 3 deletions

View file

@ -0,0 +1,32 @@
from enum import Enum
from typing import TYPE_CHECKING
from sqlmodel import Field, Relationship, SQLModel
if TYPE_CHECKING:
from .user import User
class Capability(Enum):
admin = "admin"
login = "login"
issue = "issue"
renew = "renew"
class UserCapabilityBase(SQLModel):
user_name: str = Field(primary_key=True, foreign_key="user.name")
capability_name: str = Field(primary_key=True)
@property
def _(self) -> Capability:
return Capability(self.capability_name)
def __repr__(self) -> str:
return self.capability_name
class UserCapability(UserCapabilityBase, table=True):
user: "User" = Relationship(
back_populates="capabilities"
)

View file

@ -1,12 +1,13 @@
from __future__ import annotations from __future__ import annotations
from typing import Any from typing import Any, Sequence
from pydantic import root_validator from pydantic import root_validator
from sqlalchemy.exc import IntegrityError from sqlalchemy.exc import IntegrityError
from sqlmodel import Field, SQLModel from sqlmodel import Field, Relationship, SQLModel
from ..config import Config from ..config import Config
from .capabilities import Capability, UserCapability
from .connection import Connection from .connection import Connection
@ -24,6 +25,14 @@ class UserBase(SQLModel):
class User(UserBase, table=True): class User(UserBase, table=True):
password: str password: str
capabilities: list[UserCapability] = Relationship(
back_populates="user",
sa_relationship_kwargs={
"lazy": "joined",
"cascade": "all, delete-orphan",
},
)
@classmethod @classmethod
def create(cls, **kwargs) -> User | None: def create(cls, **kwargs) -> User | None:
""" """
@ -46,9 +55,84 @@ class User(UserBase, table=True):
@classmethod @classmethod
def get(cls, name: str) -> User | None: def get(cls, name: str) -> User | None:
"""
Load user from database by name.
"""
with Connection.session as db: with Connection.session as db:
return db.get(cls, name) return db.get(cls, name)
@classmethod
def authenticate(
cls,
name: str,
password: str,
) -> User | None:
"""
Authenticate with name/password against users in database.
"""
crypt_context = Config.load_sync().crypto.crypt_context_sync
if (user := cls.get(name)) is None:
# nonexistent user, fake doing password verification
crypt_context.dummy_verify()
return None
if not crypt_context.verify(password, user.password):
# password hash mismatch
return None
return user
def update(self) -> None:
"""
Update this user in the database.
"""
with Connection.session as db:
db.add(self)
db.commit()
db.refresh(self)
def delete(self) -> bool:
"""
Delete this user from the database.
"""
with Connection.session as db:
db.delete(self)
db.commit()
def extend_capabilities(self, capabilities: Sequence[Capability]) -> None:
"""
Extend this user's capabilities
"""
for capability in capabilities:
user_capability = UserCapability(
user_name=self.name,
capability_name=capability.value,
)
if user_capability not in self.capabilities:
self.capabilities.append(user_capability)
def remove_capabilities(self, capabilities: Sequence[Capability]) -> None:
"""
Remove from this user's capabilities
"""
for capability in capabilities:
try:
self.capabilities.remove(UserCapability(
user_name=self.name,
capability_name=capability.value,
))
except ValueError:
pass
class UserCreate(UserBase): class UserCreate(UserBase):
password: str | None = Field(default=None) password: str | None = Field(default=None)

View file

@ -15,7 +15,7 @@ from fastapi import FastAPI
from .config import Config, Settings from .config import Config, Settings
from .db import Connection from .db import Connection
from .db.schemata import User from .db.schemata import User
from .db_new import connection, user from .db_new import capabilities, connection, user
from .routers import main_router from .routers import main_router
settings = Settings.get() settings = Settings.get()
@ -61,6 +61,17 @@ async def on_startup() -> None:
) )
print(user.User.get("Uwe")) print(user.User.get("Uwe"))
print(user.User.authenticate("Uwe", "uwe"))
uwe = user.User.authenticate("Uwe", "ulf")
uwe.extend_capabilities([capabilities.Capability.admin])
uwe.update()
print(uwe)
uwe.remove_capabilities([capabilities.Capability.admin])
uwe.update()
print(uwe)
def main() -> None: def main() -> None: