Compare commits

...

4 commits

4 changed files with 179 additions and 18 deletions

View file

@ -20,9 +20,15 @@ class User(ORMBaseModel):
name = Column(String, primary_key=True, index=True)
password = Column(String)
capabilities = relationship("UserCapability", lazy="joined")
certificates = relationship("Certificate", lazy="select")
distinguished_names = relationship("DistinguishedName", lazy="select")
capabilities: list[UserCapability] = relationship(
"UserCapability", lazy="joined", cascade="all, delete-orphan"
)
certificates: list[Certificate] = relationship(
"Certificate", lazy="select"
)
distinguished_names: list[DistinguishedName] = relationship(
"DistinguishedName", lazy="select"
)
@classmethod
def load(cls, db: Session, name: str) -> User | None:

View file

@ -161,6 +161,9 @@ class User(UserBase):
# user already existed
pass
def is_admin(self) -> bool:
return UserCapability.admin in self.capabilities
def authenticate(
self,
db: Session,
@ -201,4 +204,24 @@ class User(UserBase):
models.UserCapability(capability=capability.value)
)
for capability in old_dbuser.capabilities:
if UserCapability.from_value(capability) not in self.capabilities:
db.delete(capability)
db.commit()
def delete(
self,
db: Session,
) -> bool:
"""
Delete this user from the database.
"""
if (db_user := models.User.load(db, self.name)) is None:
# nonexistent user
return False
db.delete(db_user)
db.commit()
return True

View file

@ -3,7 +3,7 @@ Common dependencies for routers.
"""
from fastapi import Depends
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.orm import Session
@ -44,13 +44,17 @@ class Responses:
"description": "Entry exists in database",
"content": None,
}
ENTRY_DOESNT_EXIST = {
"description": "Entry does not exist in database",
"content": None,
}
async def get_current_user(
token: str = Depends(oauth2_scheme),
db: Session | None = Depends(Connection.get),
current_config: Config | None = Depends(Config.load),
):
) -> User | None:
"""
Get the currently logged-in user from the database.
"""
@ -63,3 +67,49 @@ async def get_current_user(
user = User.from_db(db, username)
return user
async def get_current_user_if_admin(
current_config: Config | None = Depends(Config.load),
current_user: User | None = Depends(get_current_user),
) -> User:
"""
Get the currently logged-in user if it is an admin.
"""
# fail if not installed
if current_config is None:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST)
# fail if not requested by a user
if current_user is None:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
# fail if not requested by an admin
if not current_user.is_admin():
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
async def get_current_user_if_admin_or_self(
user_name: str,
current_config: Config | None = Depends(Config.load),
current_user: User | None = Depends(get_current_user),
) -> User:
"""
Get the currently logged-in user.
Fails a) if the currently logged-in user is not the requested user,
and b) if it is not an admin.
"""
# fail if not installed
if current_config is None:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST)
# fail if not requested by a user
if current_user is None:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
# fail if not requested by an admin or self
if not (current_user.is_admin() or current_user.name == user_name):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)

View file

@ -10,7 +10,7 @@ from sqlalchemy.orm import Session
from ..config import Config
from ..db import Connection
from ..db.schemas import User, UserCapability, UserCreate
from ._common import Responses, get_current_user
from ._common import Responses, get_current_user, get_current_user_if_admin
router = APIRouter(prefix="/user")
@ -69,7 +69,7 @@ async def get_current_user(
@router.post(
"/new",
"/",
responses={
status.HTTP_200_OK: Responses.OK,
status.HTTP_400_BAD_REQUEST: Responses.NOT_INSTALLED,
@ -82,22 +82,13 @@ async def get_current_user(
async def add_user(
user: UserCreate,
current_config: Config | None = Depends(Config.load),
current_user: User | None = Depends(get_current_user),
_: User = Depends(get_current_user_if_admin),
db: Session | None = Depends(Connection.get),
):
"""
POST ./new: Create a new user in the database.
POST ./: Create a new user in the database.
"""
# fail if not installed
if current_config is None:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST)
# fail if not requested by an admin
if (current_user is None
or UserCapability.admin not in current_user.capabilities):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
# actually create the new user
new_user = User.create(
db=db,
@ -111,3 +102,94 @@ async def add_user(
# return the created user on success
return new_user
@router.delete(
"/{user_name}",
responses={
status.HTTP_200_OK: Responses.OK,
status.HTTP_400_BAD_REQUEST: Responses.NOT_INSTALLED,
status.HTTP_401_UNAUTHORIZED: Responses.NEEDS_USER,
status.HTTP_403_FORBIDDEN: Responses.NEEDS_ADMIN,
status.HTTP_409_CONFLICT: Responses.ENTRY_DOESNT_EXIST,
},
response_model=User,
)
async def remove_user(
user_name: str,
_: User = Depends(get_current_user_if_admin),
db: Session | None = Depends(Connection.get),
):
"""
DELETE ./{user_name}: Remove a user from the database.
"""
# get the user
user = User.from_db(
db=db,
name=user_name,
)
# fail if deletion was unsuccessful
if not user.delete():
raise HTTPException(status_code=status.HTTP_409_CONFLICT)
@router.post(
"/{user_name}/capabilities",
responses={
status.HTTP_200_OK: Responses.OK,
status.HTTP_400_BAD_REQUEST: Responses.NOT_INSTALLED,
status.HTTP_401_UNAUTHORIZED: Responses.NEEDS_USER,
status.HTTP_403_FORBIDDEN: Responses.NEEDS_ADMIN,
},
)
async def extend_capabilities(
user_name: str,
capabilities: list[UserCapability],
_: User = Depends(get_current_user_if_admin),
db: Session | None = Depends(Connection.get),
):
"""
POST ./{user_name}/capabilities: Add capabilities to a user.
"""
# get and change the user
user = User.from_db(
db=db,
name=user_name,
)
user.capabilities.extend(capabilities)
user.update(db)
@router.delete(
"/{user_name}/capabilities",
responses={
status.HTTP_200_OK: Responses.OK,
status.HTTP_400_BAD_REQUEST: Responses.NOT_INSTALLED,
status.HTTP_401_UNAUTHORIZED: Responses.NEEDS_USER,
status.HTTP_403_FORBIDDEN: Responses.NEEDS_ADMIN,
},
)
async def remove_capabilities(
user_name: str,
capabilities: list[UserCapability],
_: User = Depends(get_current_user_if_admin),
db: Session | None = Depends(Connection.get),
):
"""
DELETE ./{user_name}/capabilities: Remove capabilities from a user.
"""
# get and change the user
user = User.from_db(
db=db,
name=user_name,
)
for capability in capabilities:
user.capabilities.remove(capability)
user.update(db)