Compare commits

...

4 commits

4 changed files with 179 additions and 18 deletions

View file

@ -20,9 +20,15 @@ class User(ORMBaseModel):
name = Column(String, primary_key=True, index=True) name = Column(String, primary_key=True, index=True)
password = Column(String) password = Column(String)
capabilities = relationship("UserCapability", lazy="joined") capabilities: list[UserCapability] = relationship(
certificates = relationship("Certificate", lazy="select") "UserCapability", lazy="joined", cascade="all, delete-orphan"
distinguished_names = relationship("DistinguishedName", lazy="select") )
certificates: list[Certificate] = relationship(
"Certificate", lazy="select"
)
distinguished_names: list[DistinguishedName] = relationship(
"DistinguishedName", lazy="select"
)
@classmethod @classmethod
def load(cls, db: Session, name: str) -> User | None: def load(cls, db: Session, name: str) -> User | None:

View file

@ -161,6 +161,9 @@ class User(UserBase):
# user already existed # user already existed
pass pass
def is_admin(self) -> bool:
return UserCapability.admin in self.capabilities
def authenticate( def authenticate(
self, self,
db: Session, db: Session,
@ -201,4 +204,24 @@ class User(UserBase):
models.UserCapability(capability=capability.value) models.UserCapability(capability=capability.value)
) )
for capability in old_dbuser.capabilities:
if UserCapability.from_value(capability) not in self.capabilities:
db.delete(capability)
db.commit() db.commit()
def delete(
self,
db: Session,
) -> bool:
"""
Delete this user from the database.
"""
if (db_user := models.User.load(db, self.name)) is None:
# nonexistent user
return False
db.delete(db_user)
db.commit()
return True

View file

@ -3,7 +3,7 @@ Common dependencies for routers.
""" """
from fastapi import Depends from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.orm import Session from sqlalchemy.orm import Session
@ -44,13 +44,17 @@ class Responses:
"description": "Entry exists in database", "description": "Entry exists in database",
"content": None, "content": None,
} }
ENTRY_DOESNT_EXIST = {
"description": "Entry does not exist in database",
"content": None,
}
async def get_current_user( async def get_current_user(
token: str = Depends(oauth2_scheme), token: str = Depends(oauth2_scheme),
db: Session | None = Depends(Connection.get), db: Session | None = Depends(Connection.get),
current_config: Config | None = Depends(Config.load), current_config: Config | None = Depends(Config.load),
): ) -> User | None:
""" """
Get the currently logged-in user from the database. Get the currently logged-in user from the database.
""" """
@ -63,3 +67,49 @@ async def get_current_user(
user = User.from_db(db, username) user = User.from_db(db, username)
return user return user
async def get_current_user_if_admin(
current_config: Config | None = Depends(Config.load),
current_user: User | None = Depends(get_current_user),
) -> User:
"""
Get the currently logged-in user if it is an admin.
"""
# fail if not installed
if current_config is None:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST)
# fail if not requested by a user
if current_user is None:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
# fail if not requested by an admin
if not current_user.is_admin():
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
async def get_current_user_if_admin_or_self(
user_name: str,
current_config: Config | None = Depends(Config.load),
current_user: User | None = Depends(get_current_user),
) -> User:
"""
Get the currently logged-in user.
Fails a) if the currently logged-in user is not the requested user,
and b) if it is not an admin.
"""
# fail if not installed
if current_config is None:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST)
# fail if not requested by a user
if current_user is None:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
# fail if not requested by an admin or self
if not (current_user.is_admin() or current_user.name == user_name):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)

View file

@ -10,7 +10,7 @@ from sqlalchemy.orm import Session
from ..config import Config from ..config import Config
from ..db import Connection from ..db import Connection
from ..db.schemas import User, UserCapability, UserCreate from ..db.schemas import User, UserCapability, UserCreate
from ._common import Responses, get_current_user from ._common import Responses, get_current_user, get_current_user_if_admin
router = APIRouter(prefix="/user") router = APIRouter(prefix="/user")
@ -69,7 +69,7 @@ async def get_current_user(
@router.post( @router.post(
"/new", "/",
responses={ responses={
status.HTTP_200_OK: Responses.OK, status.HTTP_200_OK: Responses.OK,
status.HTTP_400_BAD_REQUEST: Responses.NOT_INSTALLED, status.HTTP_400_BAD_REQUEST: Responses.NOT_INSTALLED,
@ -82,22 +82,13 @@ async def get_current_user(
async def add_user( async def add_user(
user: UserCreate, user: UserCreate,
current_config: Config | None = Depends(Config.load), current_config: Config | None = Depends(Config.load),
current_user: User | None = Depends(get_current_user), _: User = Depends(get_current_user_if_admin),
db: Session | None = Depends(Connection.get), db: Session | None = Depends(Connection.get),
): ):
""" """
POST ./new: Create a new user in the database. POST ./: Create a new user in the database.
""" """
# fail if not installed
if current_config is None:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST)
# fail if not requested by an admin
if (current_user is None
or UserCapability.admin not in current_user.capabilities):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
# actually create the new user # actually create the new user
new_user = User.create( new_user = User.create(
db=db, db=db,
@ -111,3 +102,94 @@ async def add_user(
# return the created user on success # return the created user on success
return new_user return new_user
@router.delete(
"/{user_name}",
responses={
status.HTTP_200_OK: Responses.OK,
status.HTTP_400_BAD_REQUEST: Responses.NOT_INSTALLED,
status.HTTP_401_UNAUTHORIZED: Responses.NEEDS_USER,
status.HTTP_403_FORBIDDEN: Responses.NEEDS_ADMIN,
status.HTTP_409_CONFLICT: Responses.ENTRY_DOESNT_EXIST,
},
response_model=User,
)
async def remove_user(
user_name: str,
_: User = Depends(get_current_user_if_admin),
db: Session | None = Depends(Connection.get),
):
"""
DELETE ./{user_name}: Remove a user from the database.
"""
# get the user
user = User.from_db(
db=db,
name=user_name,
)
# fail if deletion was unsuccessful
if not user.delete():
raise HTTPException(status_code=status.HTTP_409_CONFLICT)
@router.post(
"/{user_name}/capabilities",
responses={
status.HTTP_200_OK: Responses.OK,
status.HTTP_400_BAD_REQUEST: Responses.NOT_INSTALLED,
status.HTTP_401_UNAUTHORIZED: Responses.NEEDS_USER,
status.HTTP_403_FORBIDDEN: Responses.NEEDS_ADMIN,
},
)
async def extend_capabilities(
user_name: str,
capabilities: list[UserCapability],
_: User = Depends(get_current_user_if_admin),
db: Session | None = Depends(Connection.get),
):
"""
POST ./{user_name}/capabilities: Add capabilities to a user.
"""
# get and change the user
user = User.from_db(
db=db,
name=user_name,
)
user.capabilities.extend(capabilities)
user.update(db)
@router.delete(
"/{user_name}/capabilities",
responses={
status.HTTP_200_OK: Responses.OK,
status.HTTP_400_BAD_REQUEST: Responses.NOT_INSTALLED,
status.HTTP_401_UNAUTHORIZED: Responses.NEEDS_USER,
status.HTTP_403_FORBIDDEN: Responses.NEEDS_ADMIN,
},
)
async def remove_capabilities(
user_name: str,
capabilities: list[UserCapability],
_: User = Depends(get_current_user_if_admin),
db: Session | None = Depends(Connection.get),
):
"""
DELETE ./{user_name}/capabilities: Remove capabilities from a user.
"""
# get and change the user
user = User.from_db(
db=db,
name=user_name,
)
for capability in capabilities:
user.capabilities.remove(capability)
user.update(db)