basic permissions system

This commit is contained in:
Jörn-Michael Miehe 2022-03-29 23:36:23 +00:00
parent 0d02c24b64
commit 03d3a86668
7 changed files with 99 additions and 185 deletions

View file

@ -143,6 +143,10 @@ class User(UserBase, table=True):
# password hash mismatch
return None
if TagValue.login in user.get_tags():
# no login permission
return None
return user
def update(self) -> None:
@ -189,98 +193,64 @@ class User(UserBase, table=True):
) for tag in tags
]
def _can(
def may_edit(
self,
tag: TagValue,
target: User | Device,
) -> bool:
"""
Check if this user has a tag.
Check if this user can edit another user or a device.
"""
return (
tag in self.get_tags()
# admin can do everything
or TagValue.admin in self.get_tags()
)
# admin can "edit" everything
if TagValue.admin in self.get_tags():
return True
def can_edit(
# user can "edit" itself
if isinstance(target, User) and target != self:
return False
# user can edit its owned devices
return target.owner == self
def may_admin(
self,
user: User,
target: User | Device,
) -> bool:
"""
Check if this user can edit another user.
Check if this user can administer another user or a device.
"""
return (
user.name == self.name
# admin can edit everything
or self._can(TagValue.admin)
)
# only admin can "admin" anything
if TagValue.admin not in self.get_tags():
return False
def is_admin(
# admin canot "admin itself"!
if isinstance(target, User) and target == self:
return False
# admin can "admin" everything else
return True
def may_create(
self,
target: type,
owner: User | None = None,
) -> bool:
"""
Check if this user is an admin.
Check if this user can create another user or a device.
"""
# is admin with "admin" tag
return self._can(TagValue.admin)
# can never create anything but users or devices
if not issubclass(target, (User, Device)):
return False
def can_login(
self,
) -> bool:
"""
Check if this user can log in.
"""
# admin can "create" everything
if TagValue.admin in self.get_tags():
return True
return (
# can login with "login" tag
self._can(TagValue.login)
# admins can always login
or self.is_admin()
)
# user can only create devices for itself
if target is Device and owner == self:
return True
def can_be_edited_by(
self,
user: User,
) -> bool:
"""
Check if this user can be edited by another user.
"""
return (
# user can edit itself
self.name == user.name
# admin can edit every user
or user._can(TagValue.admin)
)
def can_be_deleted_by(
self,
user: User,
) -> bool:
"""
Check if this user can be deleted by another user.
"""
return (
# only admin can delete users
user._can(TagValue.admin)
# even admin cannot delete itself
and self.name != user.name
)
def owns(
self,
device: Device,
) -> bool:
"""
Check if this user owns a device.
"""
return (
device.owner_name == self.name
# admin owns everything
or self._can(TagValue.admin)
)
# deny be default
return False

View file

@ -14,7 +14,6 @@ from fastapi import FastAPI
from .config import Config, Settings
from .db import Connection, User
from .permission import Permission
from .routers import main_router
app = FastAPI(
@ -44,11 +43,9 @@ async def on_startup() -> None:
Connection.connect(current_config.db.uri)
# some testing
print(admin := User.get("admin"))
print(User.get("admin"))
print(User.get("nonexistent"))
print(Permission._(admin, admin))
def main() -> None:
uvicorn.run(

View file

@ -1,35 +0,0 @@
from __future__ import annotations
from enum import Enum, auto
from .db import User
class Permission(Enum):
tag = auto()
untag = auto()
edit = auto()
delete = auto()
def __repr__(self) -> str:
return self.name
@classmethod
def _(
cls,
actor: User | None,
target: User,
) -> set[Permission]:
result = set()
if actor is None:
return result
if isinstance(target, User):
if actor.is_admin():
if target != actor:
result |= set([cls.tag, cls.untag, cls.delete])
result.add(cls.edit)
return result

View file

@ -5,6 +5,7 @@ Common dependencies for routers.
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from kiwi_vpn_api.db.tag import TagValue
from ..config import Config, Settings
from ..db import Device, User
@ -40,8 +41,8 @@ class Responses:
"description": "Must be admin",
"content": None,
}
NEEDS_REQUESTED_USER = {
"description": "Must be the requested user",
PERMISSION_ERROR = {
"description": "You're not allowed that action",
"content": None,
}
ENTRY_EXISTS = {
@ -61,9 +62,9 @@ class Responses:
async def get_current_user(
token: str = Depends(oauth2_scheme),
current_config: Config | None = Depends(Config.load),
) -> User | None:
) -> User:
"""
Get the currently logged-in user from the database.
Get the currently logged-in user if it exists.
"""
# can't connect to an unconfigured database
@ -72,32 +73,22 @@ async def get_current_user(
username = await current_config.jwt.decode_token(token)
return User.get(username)
async def get_current_user_if_exists(
current_user: User | None = Depends(get_current_user),
) -> User:
"""
Get the currently logged-in user if it exists.
"""
# fail if not requested by a user
if current_user is None:
if (user := User.get(username)) is None:
# don't use error 404 here: possible user enumeration
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
return current_user
return user
async def get_current_user_if_admin(
current_user: User = Depends(get_current_user_if_exists),
current_user: User = Depends(get_current_user),
) -> User:
"""
Fail if the currently logged-in user is not an admin.
"""
if not current_user.is_admin():
if TagValue.admin not in current_user.get_tags():
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
return current_user
@ -118,25 +109,6 @@ async def get_user_by_name(
return User.get(user_name)
async def get_user_by_name_if_editable(
user: User | None = Depends(get_user_by_name),
current_user: User = Depends(get_current_user_if_exists),
) -> User:
"""
Get a user by name if it can be edited by the current user.
"""
# fail if user doesn't exist
if user is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
# fail if user isn't editable by the current user
if not current_user.can_edit(user):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
return user
async def get_device_by_id(
device_id: int,
current_config: Config | None = Depends(Config.load),
@ -146,20 +118,8 @@ async def get_device_by_id(
if current_config is None:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST)
return Device.get(device_id)
async def get_device_by_id_if_editable(
device: Device | None = Depends(get_device_by_id),
current_user: User = Depends(get_current_user_if_exists),
) -> Device:
# fail if device doesn't exist
if device is None:
if (device := Device.get(device_id)) is None:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
# fail if device is not owned by current user
if not current_user.owns(device):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
return device

View file

@ -5,19 +5,19 @@
from fastapi import APIRouter, Depends, HTTPException, status
from ..db import Device, DeviceCreate, DeviceRead, User
from ._common import (Responses, get_device_by_id_if_editable,
get_user_by_name_if_editable)
from ._common import (Responses, get_current_user, get_device_by_id,
get_user_by_name)
router = APIRouter(prefix="/device", tags=["device"])
@router.post(
"",
"/{user_name}",
responses={
status.HTTP_200_OK: Responses.OK,
status.HTTP_400_BAD_REQUEST: Responses.NOT_INSTALLED,
status.HTTP_401_UNAUTHORIZED: Responses.NEEDS_USER,
status.HTTP_403_FORBIDDEN: Responses.NEEDS_REQUESTED_USER,
status.HTTP_403_FORBIDDEN: Responses.PERMISSION_ERROR,
status.HTTP_404_NOT_FOUND: Responses.ENTRY_DOESNT_EXIST,
status.HTTP_409_CONFLICT: Responses.ENTRY_EXISTS,
},
@ -25,15 +25,20 @@ router = APIRouter(prefix="/device", tags=["device"])
)
async def add_device(
device: DeviceCreate,
user: User = Depends(get_user_by_name_if_editable),
current_user: User = Depends(get_current_user),
owner: User = Depends(get_user_by_name),
) -> Device:
"""
POST ./: Create a new device in the database.
"""
# check permission
if not current_user.may_create(Device, owner):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
# create the new device
new_device = Device.create(
owner=user,
owner=current_user,
device=device,
)
@ -57,11 +62,16 @@ async def add_device(
response_model=User,
)
async def remove_device(
device: Device = Depends(get_device_by_id_if_editable),
current_user: User = Depends(get_current_user),
device: Device = Depends(get_device_by_id),
):
"""
DELETE ./{device_id}: Remove a device from the database.
"""
# check permission
if not current_user.may_edit(device):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
# delete device
device.delete()

View file

@ -9,7 +9,7 @@ from pydantic import BaseModel
from ..config import Config
from ..db import TagValue, User, UserCreate, UserRead
from ._common import (Responses, get_current_user_if_admin,
get_current_user_if_exists, get_user_by_name)
get_current_user, get_user_by_name)
router = APIRouter(prefix="/user", tags=["user"])
@ -37,10 +37,10 @@ async def login(
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST)
# try logging in
if not (user := User.authenticate(
if (user := User.authenticate(
name=form_data.username,
password=form_data.password,
)):
)) is None:
# authentication failed
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
@ -48,10 +48,6 @@ async def login(
headers={"WWW-Authenticate": "Bearer"},
)
if not user.can_login():
# user cannot login
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
# authentication succeeded
access_token = await current_config.jwt.create_token(user.name)
return {"access_token": access_token, "token_type": "bearer"}
@ -59,7 +55,7 @@ async def login(
@router.get("/current", response_model=UserRead)
async def get_current_user(
current_user: User = Depends(get_current_user_if_exists),
current_user: User = Depends(get_current_user),
):
"""
GET ./current: Respond with the currently logged-in user.

View file

@ -4,22 +4,38 @@
- flag: use client-to-client
- force cipher, tls-cipher, auth params
- server name
- default certification length
- default certification duration
- default certificate algo
## User props
- username
- username (CN part)
- password
- custom DN parts: country, state, city, org, OU
- email
- email (DN part)
- tags
## User tags
- admin: administrator
- login: can log into the web interface
- issue: can certify own devices without approval
- renew: can renew certificates for own devices
- issue: can certify own devices (without approval)
- renew: can renew certificates for own devices (without approval)
## Device props
- name
- name (CN part)
- type (icon)
- approved: bool
- expiry
## Permissions
- admin cannot "admin" itself (to prevent self decapitation)
- admin can "edit", "admin" and "create" everything else
- user can "edit" itself and its devices
- user can "create" devices for itself
### User
- edit: change DN parts, password
- admin: add or remove tag, delete, generate password
### Device
- edit: change type, delete
- admin: approve