check: user can login, "admin" can do everything

This commit is contained in:
Jörn-Michael Miehe 2022-03-28 22:17:31 +00:00
parent a465dba92e
commit 6254daa51d
3 changed files with 19 additions and 8 deletions

View file

@ -4,7 +4,7 @@ Python representation of `user` table.
from __future__ import annotations from __future__ import annotations
from typing import Any from typing import Any, Sequence
from pydantic import root_validator from pydantic import root_validator
from sqlalchemy.exc import IntegrityError from sqlalchemy.exc import IntegrityError
@ -170,14 +170,23 @@ class User(UserBase, table=True):
for capability in self.capabilities for capability in self.capabilities
) )
def can(self, capability: UserCapabilityType) -> bool: def can(
self,
capability: UserCapabilityType,
) -> bool:
""" """
Check if this user has a capability. Check if this user has a capability.
""" """
return capability in self.get_capabilities() return (
capability in self.get_capabilities()
or UserCapabilityType.admin in self.get_capabilities()
)
def set_capabilities(self, capabilities: set[UserCapabilityType]) -> None: def set_capabilities(
self,
capabilities: Sequence[UserCapabilityType],
) -> None:
""" """
Change the capabilities of this user. Change the capabilities of this user.
""" """

View file

@ -63,10 +63,7 @@ async def create_initial_admin(
# create an administrative user # create an administrative user
new_user = User.create(**admin_user.dict()) new_user = User.create(**admin_user.dict())
new_user.set_capabilities([ new_user.set_capabilities((UserCapabilityType.admin))
UserCapabilityType.login,
UserCapabilityType.admin,
])
new_user.update() new_user.update()

View file

@ -47,6 +47,10 @@ async def login(
headers={"WWW-Authenticate": "Bearer"}, headers={"WWW-Authenticate": "Bearer"},
) )
if not user.can(UserCapabilityType.login):
# user cannot login
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
# authentication succeeded # authentication succeeded
access_token = await current_config.jwt.create_token(user.name) access_token = await current_config.jwt.create_token(user.name)
return {"access_token": access_token, "token_type": "bearer"} return {"access_token": access_token, "token_type": "bearer"}
@ -84,6 +88,7 @@ async def add_user(
# actually create the new user # actually create the new user
new_user = User.create(**user.dict()) new_user = User.create(**user.dict())
new_user.set_capabilities((UserCapabilityType.login))
# fail if creation was unsuccessful # fail if creation was unsuccessful
if new_user is None: if new_user is None: