Compare commits
No commits in common. "e2f916debcd669d38213ad7fdbbbaa65013e61ec" and "ae3627cbe0e055693a2bb7241d9f6378003aac84" have entirely different histories.
e2f916debc
...
ae3627cbe0
3 changed files with 3 additions and 130 deletions
|
|
@ -1,32 +0,0 @@
|
||||||
from enum import Enum
|
|
||||||
from typing import TYPE_CHECKING
|
|
||||||
|
|
||||||
from sqlmodel import Field, Relationship, SQLModel
|
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
|
||||||
from .user import User
|
|
||||||
|
|
||||||
|
|
||||||
class Capability(Enum):
|
|
||||||
admin = "admin"
|
|
||||||
login = "login"
|
|
||||||
issue = "issue"
|
|
||||||
renew = "renew"
|
|
||||||
|
|
||||||
|
|
||||||
class UserCapabilityBase(SQLModel):
|
|
||||||
user_name: str = Field(primary_key=True, foreign_key="user.name")
|
|
||||||
capability_name: str = Field(primary_key=True)
|
|
||||||
|
|
||||||
@property
|
|
||||||
def _(self) -> Capability:
|
|
||||||
return Capability(self.capability_name)
|
|
||||||
|
|
||||||
def __repr__(self) -> str:
|
|
||||||
return self.capability_name
|
|
||||||
|
|
||||||
|
|
||||||
class UserCapability(UserCapabilityBase, table=True):
|
|
||||||
user: "User" = Relationship(
|
|
||||||
back_populates="capabilities"
|
|
||||||
)
|
|
||||||
|
|
@ -1,13 +1,12 @@
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
from typing import Any, Sequence
|
from typing import Any
|
||||||
|
|
||||||
from pydantic import root_validator
|
from pydantic import root_validator
|
||||||
from sqlalchemy.exc import IntegrityError
|
from sqlalchemy.exc import IntegrityError
|
||||||
from sqlmodel import Field, Relationship, SQLModel
|
from sqlmodel import Field, SQLModel
|
||||||
|
|
||||||
from ..config import Config
|
from ..config import Config
|
||||||
from .capabilities import Capability, UserCapability
|
|
||||||
from .connection import Connection
|
from .connection import Connection
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -25,14 +24,6 @@ class UserBase(SQLModel):
|
||||||
class User(UserBase, table=True):
|
class User(UserBase, table=True):
|
||||||
password: str
|
password: str
|
||||||
|
|
||||||
capabilities: list[UserCapability] = Relationship(
|
|
||||||
back_populates="user",
|
|
||||||
sa_relationship_kwargs={
|
|
||||||
"lazy": "joined",
|
|
||||||
"cascade": "all, delete-orphan",
|
|
||||||
},
|
|
||||||
)
|
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def create(cls, **kwargs) -> User | None:
|
def create(cls, **kwargs) -> User | None:
|
||||||
"""
|
"""
|
||||||
|
|
@ -55,84 +46,9 @@ class User(UserBase, table=True):
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def get(cls, name: str) -> User | None:
|
def get(cls, name: str) -> User | None:
|
||||||
"""
|
|
||||||
Load user from database by name.
|
|
||||||
"""
|
|
||||||
|
|
||||||
with Connection.session as db:
|
with Connection.session as db:
|
||||||
return db.get(cls, name)
|
return db.get(cls, name)
|
||||||
|
|
||||||
@classmethod
|
|
||||||
def authenticate(
|
|
||||||
cls,
|
|
||||||
name: str,
|
|
||||||
password: str,
|
|
||||||
) -> User | None:
|
|
||||||
"""
|
|
||||||
Authenticate with name/password against users in database.
|
|
||||||
"""
|
|
||||||
|
|
||||||
crypt_context = Config.load_sync().crypto.crypt_context_sync
|
|
||||||
|
|
||||||
if (user := cls.get(name)) is None:
|
|
||||||
# nonexistent user, fake doing password verification
|
|
||||||
crypt_context.dummy_verify()
|
|
||||||
return None
|
|
||||||
|
|
||||||
if not crypt_context.verify(password, user.password):
|
|
||||||
# password hash mismatch
|
|
||||||
return None
|
|
||||||
|
|
||||||
return user
|
|
||||||
|
|
||||||
def update(self) -> None:
|
|
||||||
"""
|
|
||||||
Update this user in the database.
|
|
||||||
"""
|
|
||||||
|
|
||||||
with Connection.session as db:
|
|
||||||
db.add(self)
|
|
||||||
db.commit()
|
|
||||||
db.refresh(self)
|
|
||||||
|
|
||||||
def delete(self) -> bool:
|
|
||||||
"""
|
|
||||||
Delete this user from the database.
|
|
||||||
"""
|
|
||||||
|
|
||||||
with Connection.session as db:
|
|
||||||
db.delete(self)
|
|
||||||
db.commit()
|
|
||||||
|
|
||||||
def extend_capabilities(self, capabilities: Sequence[Capability]) -> None:
|
|
||||||
"""
|
|
||||||
Extend this user's capabilities
|
|
||||||
"""
|
|
||||||
|
|
||||||
for capability in capabilities:
|
|
||||||
user_capability = UserCapability(
|
|
||||||
user_name=self.name,
|
|
||||||
capability_name=capability.value,
|
|
||||||
)
|
|
||||||
|
|
||||||
if user_capability not in self.capabilities:
|
|
||||||
self.capabilities.append(user_capability)
|
|
||||||
|
|
||||||
def remove_capabilities(self, capabilities: Sequence[Capability]) -> None:
|
|
||||||
"""
|
|
||||||
Remove from this user's capabilities
|
|
||||||
"""
|
|
||||||
|
|
||||||
for capability in capabilities:
|
|
||||||
try:
|
|
||||||
self.capabilities.remove(UserCapability(
|
|
||||||
user_name=self.name,
|
|
||||||
capability_name=capability.value,
|
|
||||||
))
|
|
||||||
|
|
||||||
except ValueError:
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
class UserCreate(UserBase):
|
class UserCreate(UserBase):
|
||||||
password: str | None = Field(default=None)
|
password: str | None = Field(default=None)
|
||||||
|
|
|
||||||
|
|
@ -15,7 +15,7 @@ from fastapi import FastAPI
|
||||||
from .config import Config, Settings
|
from .config import Config, Settings
|
||||||
from .db import Connection
|
from .db import Connection
|
||||||
from .db.schemata import User
|
from .db.schemata import User
|
||||||
from .db_new import capabilities, connection, user
|
from .db_new import connection, user
|
||||||
from .routers import main_router
|
from .routers import main_router
|
||||||
|
|
||||||
settings = Settings.get()
|
settings = Settings.get()
|
||||||
|
|
@ -61,17 +61,6 @@ async def on_startup() -> None:
|
||||||
)
|
)
|
||||||
|
|
||||||
print(user.User.get("Uwe"))
|
print(user.User.get("Uwe"))
|
||||||
print(user.User.authenticate("Uwe", "uwe"))
|
|
||||||
|
|
||||||
uwe = user.User.authenticate("Uwe", "ulf")
|
|
||||||
|
|
||||||
uwe.extend_capabilities([capabilities.Capability.admin])
|
|
||||||
uwe.update()
|
|
||||||
print(uwe)
|
|
||||||
|
|
||||||
uwe.remove_capabilities([capabilities.Capability.admin])
|
|
||||||
uwe.update()
|
|
||||||
print(uwe)
|
|
||||||
|
|
||||||
|
|
||||||
def main() -> None:
|
def main() -> None:
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue