Compare commits

..

No commits in common. "f899e0c0dfb17b105911b3493dc5381371e2faf6" and "617ae92d72e13edecfee0d0d49d234cbac02f8a9" have entirely different histories.

9 changed files with 219 additions and 270 deletions

View file

@ -4,8 +4,8 @@ Package `db`: ORM and schemas for database content.
from .connection import Connection from .connection import Connection
from .device import Device, DeviceBase, DeviceCreate, DeviceRead from .device import Device, DeviceBase, DeviceCreate, DeviceRead
from .tag import TagValue
from .user import User, UserBase, UserCreate, UserRead from .user import User, UserBase, UserCreate, UserRead
from .user_capability import UserCapabilityType
__all__ = [ __all__ = [
"Connection", "Connection",
@ -17,5 +17,5 @@ __all__ = [
"UserBase", "UserBase",
"UserCreate", "UserCreate",
"UserRead", "UserRead",
"TagValue", "UserCapabilityType",
] ]

View file

@ -1,68 +0,0 @@
"""
Python representation of `tag` table.
"""
from __future__ import annotations
from enum import Enum
from typing import TYPE_CHECKING
from sqlmodel import Field, Relationship, SQLModel
if TYPE_CHECKING:
from .user import User
class TagValue(Enum):
"""
Allowed values for tags
"""
admin = "admin"
login = "login"
issue = "issue"
renew = "renew"
def __repr__(self) -> str:
return self.value
def _(self, user: User) -> Tag:
"""
Transform into a `Tag`.
"""
return Tag(
user=user,
tag_value=self.value,
)
class TagBase(SQLModel):
"""
Common to all representations of tags
"""
tag_value: str = Field(primary_key=True)
@property
def _(self) -> TagValue:
"""
Transform into a `TagValue`.
"""
return TagValue(self.tag_value)
def __repr__(self) -> str:
return self.tag_value
class Tag(TagBase, table=True):
"""
Representation of `tag` table
"""
user_name: str = Field(primary_key=True, foreign_key="user.name")
user: User = Relationship(
back_populates="tags",
)

View file

@ -4,7 +4,7 @@ Python representation of `user` table.
from __future__ import annotations from __future__ import annotations
from typing import Any, Iterable, Sequence from typing import Any, Sequence
from pydantic import root_validator from pydantic import root_validator
from sqlalchemy.exc import IntegrityError from sqlalchemy.exc import IntegrityError
@ -13,7 +13,7 @@ from sqlmodel import Field, Relationship, SQLModel
from ..config import Config from ..config import Config
from .connection import Connection from .connection import Connection
from .device import Device from .device import Device
from .tag import Tag, TagValue from .user_capability import UserCapability, UserCapabilityType
class UserBase(SQLModel): class UserBase(SQLModel):
@ -77,7 +77,7 @@ class User(UserBase, table=True):
password: str password: str
tags: list[Tag] = Relationship( capabilities: list[UserCapability] = Relationship(
back_populates="user", back_populates="user",
sa_relationship_kwargs={ sa_relationship_kwargs={
"lazy": "joined", "lazy": "joined",
@ -143,11 +143,6 @@ class User(UserBase, table=True):
# password hash mismatch # password hash mismatch
return None return None
if not (user.has_tag(TagValue.login)
or user.has_tag(TagValue.admin)):
# no login permission
return None
return user return user
def update(self) -> None: def update(self) -> None:
@ -169,107 +164,55 @@ class User(UserBase, table=True):
db.delete(self) db.delete(self)
db.commit() db.commit()
def _get_tags(self) -> Iterable[TagValue]: def get_capabilities(self) -> set[UserCapabilityType]:
""" """
Return the tags of this user. Return the capabilities of this user.
"""
return set(
capability._
for capability in self.capabilities
)
def can(
self,
capability: UserCapabilityType,
) -> bool:
"""
Check if this user has a capability.
""" """
return ( return (
tag._ capability in self.get_capabilities()
for tag in self.tags # admin can do everything
or UserCapabilityType.admin in self.get_capabilities()
) )
def has_tag(self, tag: TagValue) -> bool: def set_capabilities(
"""
Check if this user has a tag.
"""
return tag in self._get_tags()
def add_tags(
self, self,
tags: Sequence[TagValue], capabilities: Sequence[UserCapabilityType],
) -> None: ) -> None:
""" """
Add tags to this user. Change the capabilities of this user.
""" """
self.tags = [ self.capabilities = [
tag._(self) UserCapability(
for tag in (set(self._get_tags()) | set(tags)) user_name=self.name,
capability_name=capability.value,
) for capability in capabilities
] ]
def remove_tags( def owns(
self, self,
tags: Sequence[TagValue], device: Device,
) -> None:
"""
remove tags from this user.
"""
self.tags = [
tag._(self)
for tag in (set(self._get_tags()) - set(tags))
]
def can_edit(
self,
target: User | Device,
) -> bool: ) -> bool:
""" """
Check if this user can edit another user or a device. Check if this user owns a device.
""" """
# admin can "edit" everything return (
if self.has_tag(TagValue.admin): device.owner_name == self.name
return True # admin owns everything
or self.can(UserCapabilityType.admin)
# user can "edit" itself )
if isinstance(target, User) and target != self:
return False
# user can edit its owned devices
return target.owner == self
def can_admin(
self,
target: User | Device,
) -> bool:
"""
Check if this user can administer another user or a device.
"""
# only admin can "admin" anything
if not self.has_tag(TagValue.admin):
return False
# admin canot "admin itself"!
if isinstance(target, User) and target == self:
return False
# admin can "admin" everything else
return True
def can_create(
self,
target: type,
owner: User | None = None,
) -> bool:
"""
Check if this user can create another user or a device.
"""
# can never create anything but users or devices
if not issubclass(target, (User, Device)):
return False
# admin can "create" everything
if self.has_tag(TagValue.admin):
return True
# user can only create devices for itself
if target is Device and owner == self:
return True
# deny be default
return False

View file

@ -0,0 +1,56 @@
"""
Python representation of `user_capability` table.
"""
from enum import Enum
from typing import TYPE_CHECKING
from sqlmodel import Field, Relationship, SQLModel
if TYPE_CHECKING:
from .user import User
class UserCapabilityType(Enum):
"""
Allowed values for capabilities
"""
admin = "admin"
login = "login"
issue = "issue"
renew = "renew"
def __repr__(self) -> str:
return self.value
class UserCapabilityBase(SQLModel):
"""
Common to all representations of capabilities
"""
capability_name: str = Field(primary_key=True)
@property
def _(self) -> UserCapabilityType:
"""
Transform into a `Capability`.
"""
return UserCapabilityType(self.capability_name)
def __repr__(self) -> str:
return self.capability_name
class UserCapability(UserCapabilityBase, table=True):
"""
Representation of `user_capability` table
"""
user_name: str = Field(primary_key=True, foreign_key="user.name")
user: "User" = Relationship(
back_populates="capabilities",
)

View file

@ -7,7 +7,7 @@ from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer from fastapi.security import OAuth2PasswordBearer
from ..config import Config, Settings from ..config import Config, Settings
from ..db import Device, User from ..db import Device, User, UserCapabilityType
oauth2_scheme = OAuth2PasswordBearer( oauth2_scheme = OAuth2PasswordBearer(
tokenUrl=f"{Settings._.api_v1_prefix}/user/authenticate" tokenUrl=f"{Settings._.api_v1_prefix}/user/authenticate"
@ -36,8 +36,12 @@ class Responses:
"description": "Must be logged in", "description": "Must be logged in",
"content": None, "content": None,
} }
NEEDS_PERMISSION = { NEEDS_ADMIN = {
"description": "You're not allowed that operation", "description": "Must be admin",
"content": None,
}
NEEDS_REQUESTED_USER = {
"description": "Must be the requested user",
"content": None, "content": None,
} }
ENTRY_EXISTS = { ENTRY_EXISTS = {
@ -48,14 +52,18 @@ class Responses:
"description": "Entry does not exist in database", "description": "Entry does not exist in database",
"content": None, "content": None,
} }
CANT_TARGET_SELF = {
"description": "Operation can't target yourself",
"content": None,
}
async def get_current_user( async def get_current_user(
token: str = Depends(oauth2_scheme), token: str = Depends(oauth2_scheme),
current_config: Config | None = Depends(Config.load), current_config: Config | None = Depends(Config.load),
) -> User: ) -> User | None:
""" """
Get the currently logged-in user if it exists. Get the currently logged-in user from the database.
""" """
# can't connect to an unconfigured database # can't connect to an unconfigured database
@ -64,27 +72,62 @@ async def get_current_user(
username = await current_config.jwt.decode_token(token) username = await current_config.jwt.decode_token(token)
return User.get(username)
async def get_current_user_if_exists(
current_user: User | None = Depends(get_current_user),
) -> User:
"""
Get the currently logged-in user if it exists.
"""
# fail if not requested by a user # fail if not requested by a user
if (user := User.get(username)) is None: if current_user is None:
# don't use error 404 here: possible user enumeration raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
return current_user
async def get_current_user_if_admin(
current_user: User = Depends(get_current_user_if_exists),
) -> User:
"""
Fail if the currently logged-in user is not an admin.
"""
if not current_user.can(UserCapabilityType.admin):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
return user return current_user
async def get_user_by_name( async def get_user_by_name(
user_name: str, user_name: str,
current_config: Config | None = Depends(Config.load), current_user: User = Depends(get_current_user_if_exists),
) -> User | None: ) -> User:
""" """
Get a user by name. Get a user by name.
Works if a) the currently logged-in user is an admin,
or b) if it is the requested user.
""" """
# can't connect to an unconfigured database # check if current user is admin
if current_config is None: if current_user.can(UserCapabilityType.admin):
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST) # fail if requested user doesn't exist
if (user := User.get(user_name)) is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
return User.get(user_name) # check if current user is requested user
elif current_user.name == user_name:
pass
# current user is neither admin nor the requested user
else:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
return user
async def get_device_by_id( async def get_device_by_id(
@ -96,8 +139,20 @@ async def get_device_by_id(
if current_config is None: if current_config is None:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST) raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST)
return Device.get(device_id)
async def get_device_by_id_if_editable(
device: Device | None = Depends(get_device_by_id),
current_user: User = Depends(get_current_user_if_exists),
) -> Device:
# fail if device doesn't exist # fail if device doesn't exist
if (device := Device.get(device_id)) is None: if device is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND) raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
# fail if device is not owned by current user
if not current_user.owns(device):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
return device return device

View file

@ -7,8 +7,8 @@ from fastapi import APIRouter, Depends, HTTPException, status
from sqlmodel import select from sqlmodel import select
from ..config import Config from ..config import Config
from ..db import Connection, TagValue, User, UserCreate from ..db import Connection, User, UserCapabilityType, UserCreate
from ._common import Responses, get_current_user from ._common import Responses, get_current_user_if_admin
router = APIRouter(prefix="/admin", tags=["admin"]) router = APIRouter(prefix="/admin", tags=["admin"])
@ -64,7 +64,7 @@ async def create_initial_admin(
# create an administrative user # create an administrative user
new_user = User.create(user=admin_user) new_user = User.create(user=admin_user)
new_user.add_tags([TagValue.admin]) new_user.set_capabilities([UserCapabilityType.admin])
new_user.update() new_user.update()
@ -74,21 +74,17 @@ async def create_initial_admin(
status.HTTP_200_OK: Responses.OK, status.HTTP_200_OK: Responses.OK,
status.HTTP_400_BAD_REQUEST: Responses.NOT_INSTALLED, status.HTTP_400_BAD_REQUEST: Responses.NOT_INSTALLED,
status.HTTP_401_UNAUTHORIZED: Responses.NEEDS_USER, status.HTTP_401_UNAUTHORIZED: Responses.NEEDS_USER,
status.HTTP_403_FORBIDDEN: Responses.NEEDS_PERMISSION, status.HTTP_403_FORBIDDEN: Responses.NEEDS_ADMIN,
}, },
) )
async def set_config( async def set_config(
config: Config, config: Config,
current_user: User = Depends(get_current_user), _: User = Depends(get_current_user_if_admin),
): ):
""" """
PUT ./config: Edit `kiwi-vpn` main config. PUT ./config: Edit `kiwi-vpn` main config.
""" """
# check permissions
if not current_user.has_tag(TagValue.admin):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
# update config file, reconnect to database # update config file, reconnect to database
config.save() config.save()
Connection.connect(config.db.uri) Connection.connect(config.db.uri)

View file

@ -5,19 +5,18 @@
from fastapi import APIRouter, Depends, HTTPException, status from fastapi import APIRouter, Depends, HTTPException, status
from ..db import Device, DeviceCreate, DeviceRead, User from ..db import Device, DeviceCreate, DeviceRead, User
from ._common import (Responses, get_current_user, get_device_by_id, from ._common import Responses, get_device_by_id_if_editable, get_user_by_name
get_user_by_name)
router = APIRouter(prefix="/device", tags=["device"]) router = APIRouter(prefix="/device", tags=["device"])
@router.post( @router.post(
"/{user_name}", "",
responses={ responses={
status.HTTP_200_OK: Responses.OK, status.HTTP_200_OK: Responses.OK,
status.HTTP_400_BAD_REQUEST: Responses.NOT_INSTALLED, status.HTTP_400_BAD_REQUEST: Responses.NOT_INSTALLED,
status.HTTP_401_UNAUTHORIZED: Responses.NEEDS_USER, status.HTTP_401_UNAUTHORIZED: Responses.NEEDS_USER,
status.HTTP_403_FORBIDDEN: Responses.NEEDS_PERMISSION, status.HTTP_403_FORBIDDEN: Responses.NEEDS_REQUESTED_USER,
status.HTTP_404_NOT_FOUND: Responses.ENTRY_DOESNT_EXIST, status.HTTP_404_NOT_FOUND: Responses.ENTRY_DOESNT_EXIST,
status.HTTP_409_CONFLICT: Responses.ENTRY_EXISTS, status.HTTP_409_CONFLICT: Responses.ENTRY_EXISTS,
}, },
@ -25,20 +24,15 @@ router = APIRouter(prefix="/device", tags=["device"])
) )
async def add_device( async def add_device(
device: DeviceCreate, device: DeviceCreate,
current_user: User = Depends(get_current_user), user: User = Depends(get_user_by_name),
owner: User = Depends(get_user_by_name),
) -> Device: ) -> Device:
""" """
POST ./: Create a new device in the database. POST ./: Create a new device in the database.
""" """
# check permission
if not current_user.can_create(Device, owner):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
# create the new device # create the new device
new_device = Device.create( new_device = Device.create(
owner=current_user, owner=user,
device=device, device=device,
) )
@ -56,22 +50,17 @@ async def add_device(
status.HTTP_200_OK: Responses.OK, status.HTTP_200_OK: Responses.OK,
status.HTTP_400_BAD_REQUEST: Responses.NOT_INSTALLED, status.HTTP_400_BAD_REQUEST: Responses.NOT_INSTALLED,
status.HTTP_401_UNAUTHORIZED: Responses.NEEDS_USER, status.HTTP_401_UNAUTHORIZED: Responses.NEEDS_USER,
status.HTTP_403_FORBIDDEN: Responses.NEEDS_PERMISSION, status.HTTP_403_FORBIDDEN: Responses.NEEDS_ADMIN,
status.HTTP_404_NOT_FOUND: Responses.ENTRY_DOESNT_EXIST, status.HTTP_404_NOT_FOUND: Responses.ENTRY_DOESNT_EXIST,
}, },
response_model=User, response_model=User,
) )
async def remove_device( async def remove_device(
current_user: User = Depends(get_current_user), device: Device = Depends(get_device_by_id_if_editable),
device: Device = Depends(get_device_by_id),
): ):
""" """
DELETE ./{device_id}: Remove a device from the database. DELETE ./{device_id}: Remove a device from the database.
""" """
# check permission
if not current_user.can_edit(device):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
# delete device # delete device
device.delete() device.delete()

View file

@ -7,8 +7,9 @@ from fastapi.security import OAuth2PasswordRequestForm
from pydantic import BaseModel from pydantic import BaseModel
from ..config import Config from ..config import Config
from ..db import TagValue, User, UserCreate, UserRead from ..db import User, UserCapabilityType, UserCreate, UserRead
from ._common import Responses, get_current_user, get_user_by_name from ._common import (Responses, get_current_user_if_admin,
get_current_user_if_exists, get_user_by_name)
router = APIRouter(prefix="/user", tags=["user"]) router = APIRouter(prefix="/user", tags=["user"])
@ -36,10 +37,10 @@ async def login(
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST) raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST)
# try logging in # try logging in
if (user := User.authenticate( if not (user := User.authenticate(
name=form_data.username, name=form_data.username,
password=form_data.password, password=form_data.password,
)) is None: )):
# authentication failed # authentication failed
raise HTTPException( raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, status_code=status.HTTP_401_UNAUTHORIZED,
@ -47,14 +48,18 @@ async def login(
headers={"WWW-Authenticate": "Bearer"}, headers={"WWW-Authenticate": "Bearer"},
) )
if not user.can(UserCapabilityType.login):
# user cannot login
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
# authentication succeeded # authentication succeeded
access_token = await current_config.jwt.create_token(user.name) access_token = await current_config.jwt.create_token(user.name)
return {"access_token": access_token, "token_type": "bearer"} return {"access_token": access_token, "token_type": "bearer"}
@router.get("/current", response_model=UserRead) @router.get("/current", response_model=UserRead)
async def get_current_user_route( async def get_current_user(
current_user: User = Depends(get_current_user), current_user: User = Depends(get_current_user_if_exists),
): ):
""" """
GET ./current: Respond with the currently logged-in user. GET ./current: Respond with the currently logged-in user.
@ -69,31 +74,27 @@ async def get_current_user_route(
status.HTTP_200_OK: Responses.OK, status.HTTP_200_OK: Responses.OK,
status.HTTP_400_BAD_REQUEST: Responses.NOT_INSTALLED, status.HTTP_400_BAD_REQUEST: Responses.NOT_INSTALLED,
status.HTTP_401_UNAUTHORIZED: Responses.NEEDS_USER, status.HTTP_401_UNAUTHORIZED: Responses.NEEDS_USER,
status.HTTP_403_FORBIDDEN: Responses.NEEDS_PERMISSION, status.HTTP_403_FORBIDDEN: Responses.NEEDS_ADMIN,
status.HTTP_409_CONFLICT: Responses.ENTRY_EXISTS, status.HTTP_409_CONFLICT: Responses.ENTRY_EXISTS,
}, },
response_model=UserRead, response_model=UserRead,
) )
async def add_user( async def add_user(
user: UserCreate, user: UserCreate,
current_user: User = Depends(get_current_user), _: User = Depends(get_current_user_if_admin),
) -> User: ) -> User:
""" """
POST ./: Create a new user in the database. POST ./: Create a new user in the database.
""" """
# check permissions # actually create the new user
if not current_user.can_create(User):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
# create the new user
new_user = User.create(user=user) new_user = User.create(user=user)
# fail if creation was unsuccessful # fail if creation was unsuccessful
if new_user is None: if new_user is None:
raise HTTPException(status_code=status.HTTP_409_CONFLICT) raise HTTPException(status_code=status.HTTP_409_CONFLICT)
new_user.add_tags([TagValue.login]) new_user.set_capabilities([UserCapabilityType.login])
new_user.update() new_user.update()
# return the created user on success # return the created user on success
@ -106,76 +107,69 @@ async def add_user(
status.HTTP_200_OK: Responses.OK, status.HTTP_200_OK: Responses.OK,
status.HTTP_400_BAD_REQUEST: Responses.NOT_INSTALLED, status.HTTP_400_BAD_REQUEST: Responses.NOT_INSTALLED,
status.HTTP_401_UNAUTHORIZED: Responses.NEEDS_USER, status.HTTP_401_UNAUTHORIZED: Responses.NEEDS_USER,
status.HTTP_403_FORBIDDEN: Responses.NEEDS_PERMISSION, status.HTTP_403_FORBIDDEN: Responses.NEEDS_ADMIN,
status.HTTP_404_NOT_FOUND: Responses.ENTRY_DOESNT_EXIST, status.HTTP_404_NOT_FOUND: Responses.ENTRY_DOESNT_EXIST,
status.HTTP_406_NOT_ACCEPTABLE: Responses.CANT_TARGET_SELF,
}, },
response_model=User, response_model=User,
) )
async def remove_user( async def remove_user(
current_user: User = Depends(get_current_user), current_user: User = Depends(get_current_user_if_admin),
user: User = Depends(get_user_by_name), user: User = Depends(get_user_by_name),
): ):
""" """
DELETE ./{user_name}: Remove a user from the database. DELETE ./{user_name}: Remove a user from the database.
""" """
# check permissions # stop inting
if not current_user.can_admin(user): if current_user.name == user.name:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) raise HTTPException(status_code=status.HTTP_406_NOT_ACCEPTABLE)
# delete user # delete user
user.delete() user.delete()
@router.post( @router.post(
"/{user_name}/tags", "/{user_name}/capabilities",
responses={ responses={
status.HTTP_200_OK: Responses.OK, status.HTTP_200_OK: Responses.OK,
status.HTTP_400_BAD_REQUEST: Responses.NOT_INSTALLED, status.HTTP_400_BAD_REQUEST: Responses.NOT_INSTALLED,
status.HTTP_401_UNAUTHORIZED: Responses.NEEDS_USER, status.HTTP_401_UNAUTHORIZED: Responses.NEEDS_USER,
status.HTTP_403_FORBIDDEN: Responses.NEEDS_PERMISSION, status.HTTP_403_FORBIDDEN: Responses.NEEDS_ADMIN,
}, },
) )
async def extend_tags( async def extend_capabilities(
tags: list[TagValue], capabilities: list[UserCapabilityType],
current_user: User = Depends(get_current_user), _: User = Depends(get_current_user_if_admin),
user: User = Depends(get_user_by_name), user: User = Depends(get_user_by_name),
): ):
""" """
POST ./{user_name}/tags: Add tags to a user. POST ./{user_name}/capabilities: Add capabilities to a user.
""" """
# check permissions user.set_capabilities(user.get_capabilities() | set(capabilities))
if not current_user.can_admin(user):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
# change user
user.add_tags(tags)
user.update() user.update()
@router.delete( @router.delete(
"/{user_name}/tags", "/{user_name}/capabilities",
responses={ responses={
status.HTTP_200_OK: Responses.OK, status.HTTP_200_OK: Responses.OK,
status.HTTP_400_BAD_REQUEST: Responses.NOT_INSTALLED, status.HTTP_400_BAD_REQUEST: Responses.NOT_INSTALLED,
status.HTTP_401_UNAUTHORIZED: Responses.NEEDS_USER, status.HTTP_401_UNAUTHORIZED: Responses.NEEDS_USER,
status.HTTP_403_FORBIDDEN: Responses.NEEDS_PERMISSION, status.HTTP_403_FORBIDDEN: Responses.NEEDS_ADMIN,
}, },
) )
async def remove_tags( async def remove_capabilities(
tags: list[TagValue], capabilities: list[UserCapabilityType],
current_user: User = Depends(get_current_user), _: User = Depends(get_current_user_if_admin),
user: User = Depends(get_user_by_name), user: User = Depends(get_user_by_name),
): ):
""" """
DELETE ./{user_name}/tags: Remove tags from a user. DELETE ./{user_name}/capabilities: Remove capabilities from a user.
""" """
# check permissions user.set_capabilities(user.get_capabilities() - set(capabilities))
if not current_user.can_admin(user):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
# change user
user.remove_tags(tags)
user.update() user.update()

View file

@ -4,38 +4,22 @@
- flag: use client-to-client - flag: use client-to-client
- force cipher, tls-cipher, auth params - force cipher, tls-cipher, auth params
- server name - server name
- default certification duration - default certification length
- default certificate algo - default certificate algo
## User props ## User props
- username (CN part) - username
- password - password
- custom DN parts: country, state, city, org, OU - custom DN parts: country, state, city, org, OU
- email (DN part) - email
- tags
## User tags ## User caps
- admin: administrator - admin: administrator
- login: can log into the web interface - login: can log into the web interface
- issue: can certify own devices (without approval) - issue: can certify own devices without approval
- renew: can renew certificates for own devices (without approval) - renew: can renew certificates for own devices
## Device props ## Device props
- name (CN part) - name
- type (icon) - type (icon)
- approved: bool
- expiry - expiry
## Permissions
- admin cannot "admin" itself (to prevent self decapitation)
- admin can "edit", "admin" and "create" everything else
- user can "edit" itself and its devices
- user can "create" devices for itself
### User
- edit: change DN parts, password
- admin: add or remove tag, delete, generate password
### Device
- edit: change type, delete
- admin: approve