Compare commits

..

No commits in common. "e2f916debcd669d38213ad7fdbbbaa65013e61ec" and "ae3627cbe0e055693a2bb7241d9f6378003aac84" have entirely different histories.

3 changed files with 3 additions and 130 deletions

View file

@ -1,32 +0,0 @@
from enum import Enum
from typing import TYPE_CHECKING
from sqlmodel import Field, Relationship, SQLModel
if TYPE_CHECKING:
from .user import User
class Capability(Enum):
admin = "admin"
login = "login"
issue = "issue"
renew = "renew"
class UserCapabilityBase(SQLModel):
user_name: str = Field(primary_key=True, foreign_key="user.name")
capability_name: str = Field(primary_key=True)
@property
def _(self) -> Capability:
return Capability(self.capability_name)
def __repr__(self) -> str:
return self.capability_name
class UserCapability(UserCapabilityBase, table=True):
user: "User" = Relationship(
back_populates="capabilities"
)

View file

@ -1,13 +1,12 @@
from __future__ import annotations
from typing import Any, Sequence
from typing import Any
from pydantic import root_validator
from sqlalchemy.exc import IntegrityError
from sqlmodel import Field, Relationship, SQLModel
from sqlmodel import Field, SQLModel
from ..config import Config
from .capabilities import Capability, UserCapability
from .connection import Connection
@ -25,14 +24,6 @@ class UserBase(SQLModel):
class User(UserBase, table=True):
password: str
capabilities: list[UserCapability] = Relationship(
back_populates="user",
sa_relationship_kwargs={
"lazy": "joined",
"cascade": "all, delete-orphan",
},
)
@classmethod
def create(cls, **kwargs) -> User | None:
"""
@ -55,84 +46,9 @@ class User(UserBase, table=True):
@classmethod
def get(cls, name: str) -> User | None:
"""
Load user from database by name.
"""
with Connection.session as db:
return db.get(cls, name)
@classmethod
def authenticate(
cls,
name: str,
password: str,
) -> User | None:
"""
Authenticate with name/password against users in database.
"""
crypt_context = Config.load_sync().crypto.crypt_context_sync
if (user := cls.get(name)) is None:
# nonexistent user, fake doing password verification
crypt_context.dummy_verify()
return None
if not crypt_context.verify(password, user.password):
# password hash mismatch
return None
return user
def update(self) -> None:
"""
Update this user in the database.
"""
with Connection.session as db:
db.add(self)
db.commit()
db.refresh(self)
def delete(self) -> bool:
"""
Delete this user from the database.
"""
with Connection.session as db:
db.delete(self)
db.commit()
def extend_capabilities(self, capabilities: Sequence[Capability]) -> None:
"""
Extend this user's capabilities
"""
for capability in capabilities:
user_capability = UserCapability(
user_name=self.name,
capability_name=capability.value,
)
if user_capability not in self.capabilities:
self.capabilities.append(user_capability)
def remove_capabilities(self, capabilities: Sequence[Capability]) -> None:
"""
Remove from this user's capabilities
"""
for capability in capabilities:
try:
self.capabilities.remove(UserCapability(
user_name=self.name,
capability_name=capability.value,
))
except ValueError:
pass
class UserCreate(UserBase):
password: str | None = Field(default=None)

View file

@ -15,7 +15,7 @@ from fastapi import FastAPI
from .config import Config, Settings
from .db import Connection
from .db.schemata import User
from .db_new import capabilities, connection, user
from .db_new import connection, user
from .routers import main_router
settings = Settings.get()
@ -61,17 +61,6 @@ async def on_startup() -> None:
)
print(user.User.get("Uwe"))
print(user.User.authenticate("Uwe", "uwe"))
uwe = user.User.authenticate("Uwe", "ulf")
uwe.extend_capabilities([capabilities.Capability.admin])
uwe.update()
print(uwe)
uwe.remove_capabilities([capabilities.Capability.admin])
uwe.update()
print(uwe)
def main() -> None: