rework common and admin router for new db

This commit is contained in:
Jörn-Michael Miehe 2022-03-28 01:31:37 +00:00
parent 6012998ecf
commit 0619f00f6a
4 changed files with 20 additions and 123 deletions

View file

@ -1,10 +1,12 @@
from fastapi import APIRouter
from . import admin, user
from . import admin
# from . import user
main_router = APIRouter(prefix="/api/v1")
main_router.include_router(admin.router)
main_router.include_router(user.router)
# main_router.include_router(user.router)
__all__ = ["main_router"]

View file

@ -5,11 +5,9 @@ Common dependencies for routers.
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from sqlalchemy.orm import Session
from ..config import Config
from ..db import Connection
from ..db.schemata import User
from ..db_new import Capability, User
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="user/authenticate")
@ -56,7 +54,6 @@ class Responses:
async def get_current_user(
token: str = Depends(oauth2_scheme),
db: Session | None = Depends(Connection.get),
current_config: Config | None = Depends(Config.load),
) -> User | None:
"""
@ -68,13 +65,11 @@ async def get_current_user(
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST)
username = await current_config.jwt.decode_token(token)
user = User.from_db(db, username)
return user
return User.get(username)
async def get_current_user_if_exists(
current_config: Config | None = Depends(Config.load),
current_user: User | None = Depends(get_current_user),
) -> User:
"""
@ -89,7 +84,6 @@ async def get_current_user_if_exists(
async def get_current_user_if_admin(
current_config: Config | None = Depends(Config.load),
current_user: User = Depends(get_current_user_if_exists),
) -> User:
"""
@ -97,7 +91,7 @@ async def get_current_user_if_admin(
"""
# fail if not requested by an admin
if not current_user.is_admin():
if not current_user.can(Capability.admin):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
return current_user
@ -105,7 +99,6 @@ async def get_current_user_if_admin(
async def get_current_user_if_admin_or_self(
user_name: str,
current_config: Config | None = Depends(Config.load),
current_user: User = Depends(get_current_user_if_exists),
) -> User:
"""
@ -116,7 +109,8 @@ async def get_current_user_if_admin_or_self(
"""
# fail if not requested by an admin or self
if not (current_user.is_admin() or current_user.name == user_name):
if not (current_user.can(Capability.admin)
or current_user.name == user_name):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
return current_user

View file

@ -6,9 +6,8 @@
from fastapi import APIRouter, Depends, HTTPException, status
from ..config import Config
from ..db import Connection
from ..db.schemata import User, UserCapability, UserCreate
from ._common import Responses, get_current_user
from ..db_new import Capability, Connection, User, UserCreate
from ._common import Responses, get_current_user_if_admin
router = APIRouter(prefix="/admin", tags=["admin"])
@ -22,7 +21,7 @@ router = APIRouter(prefix="/admin", tags=["admin"])
)
async def install(
config: Config,
admin_user: UserCreate,
# admin_user: UserCreate,
current_config: Config | None = Depends(Config.load),
):
"""
@ -35,18 +34,13 @@ async def install(
# create config file, connect to database
await config.save()
Connection.connect(await config.db.db_engine)
Connection.connect("sqlite:///tmp/v2.db")
# create an administrative user
with Connection.use() as db:
new_user = User.create(
db=db,
user=admin_user,
crypt_context=await config.crypto.crypt_context,
)
new_user.capabilities.append(UserCapability.admin)
new_user.update(db)
# # create an administrative user
# new_user = User.create(**admin_user)
# assert new_user is not None
# new_user.set_capabilities([Capability.login, Capability.admin])
# new_user.update()
@router.put(
@ -61,7 +55,7 @@ async def install(
async def set_config(
new_config: Config,
current_config: Config | None = Depends(Config.load),
current_user: User | None = Depends(get_current_user),
_: User | None = Depends(get_current_user_if_admin),
):
"""
PUT ./config: Edit `kiwi-vpn` main config.
@ -71,11 +65,6 @@ async def set_config(
if current_config is None:
raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST)
# fail if not requested by an admin
if (current_user is None
or UserCapability.admin not in current_user.capabilities):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
# update config file, reconnect to database
await new_config.save()
Connection.connect(await new_config.db.db_engine)
Connection.connect("sqlite:///tmp/v2.db")

View file

@ -1,88 +0,0 @@
"""
/dn endpoints.
"""
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from ..db import Connection
from ..db.schemata import DistinguishedName, DistinguishedNameCreate, User
from ._common import Responses, get_current_user_if_admin_or_self
router = APIRouter(prefix="/dn")
@router.post(
"",
responses={
status.HTTP_200_OK: Responses.OK,
status.HTTP_400_BAD_REQUEST: Responses.NOT_INSTALLED,
status.HTTP_401_UNAUTHORIZED: Responses.NEEDS_USER,
status.HTTP_403_FORBIDDEN: Responses.NEEDS_ADMIN,
status.HTTP_404_NOT_FOUND: Responses.ENTRY_DOESNT_EXIST,
status.HTTP_409_CONFLICT: Responses.ENTRY_EXISTS,
},
)
async def add_distinguished_name(
user_name: str,
distinguished_name: DistinguishedNameCreate,
_: User = Depends(get_current_user_if_admin_or_self),
db: Session | None = Depends(Connection.get),
):
"""
POST ./: Create a new distinguished name in the database.
"""
owner = User.from_db(
db=db,
name=user_name,
)
# fail if owner doesn't exist
if owner is None:
raise HTTPException(status_code=status.HTTP_409_CONFLICT)
# actually create the new user
new_dn = DistinguishedName.create(
db=db,
dn=distinguished_name,
owner=owner,
)
# fail if creation was unsuccessful
if new_dn is None:
raise HTTPException(status_code=status.HTTP_409_CONFLICT)
# return the created user on success
return new_dn
# @router.delete(
# "",
# responses={
# status.HTTP_200_OK: Responses.OK,
# status.HTTP_400_BAD_REQUEST: Responses.NOT_INSTALLED,
# status.HTTP_401_UNAUTHORIZED: Responses.NEEDS_USER,
# status.HTTP_403_FORBIDDEN: Responses.NEEDS_ADMIN,
# status.HTTP_404_NOT_FOUND: Responses.ENTRY_DOESNT_EXIST,
# },
# )
# async def remove_distinguished_name(
# user_name: str,
# _: User = Depends(get_current_user_if_admin),
# db: Session | None = Depends(Connection.get),
# ):
# """
# DELETE ./{user_name}: Remove a user from the database.
# """
# # get the user
# user = User.from_db(
# db=db,
# name=user_name,
# )
# # fail if deletion was unsuccessful
# if user is None or not user.delete(db):
# raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)