Compare commits
4 commits
3b79efaa80
...
821d72a773
| Author | SHA1 | Date | |
|---|---|---|---|
| 821d72a773 | |||
| 78e0515042 | |||
| 72fc209349 | |||
| b291c20ed6 |
7 changed files with 64 additions and 19 deletions
|
|
@ -143,8 +143,7 @@ class User(UserBase, table=True):
|
|||
# password hash mismatch
|
||||
return None
|
||||
|
||||
if not (user.has_tag(TagValue.login)
|
||||
or user.has_tag(TagValue.admin)):
|
||||
if not (user.has_tag(TagValue.login) or user.is_admin):
|
||||
# no login permission
|
||||
return None
|
||||
|
||||
|
|
@ -169,7 +168,8 @@ class User(UserBase, table=True):
|
|||
db.delete(self)
|
||||
db.commit()
|
||||
|
||||
def _get_tags(self) -> Iterable[TagValue]:
|
||||
@property
|
||||
def __tags(self) -> Iterable[TagValue]:
|
||||
"""
|
||||
Return the tags of this user.
|
||||
"""
|
||||
|
|
@ -184,7 +184,15 @@ class User(UserBase, table=True):
|
|||
Check if this user has a tag.
|
||||
"""
|
||||
|
||||
return tag in self._get_tags()
|
||||
return tag in self.__tags
|
||||
|
||||
@property
|
||||
def is_admin(self) -> bool:
|
||||
"""
|
||||
Shorthand for checking if this user has the `admin` tag.
|
||||
"""
|
||||
|
||||
return TagValue.admin in self.__tags
|
||||
|
||||
def add_tags(
|
||||
self,
|
||||
|
|
@ -196,7 +204,7 @@ class User(UserBase, table=True):
|
|||
|
||||
self.tags = [
|
||||
tag._(self)
|
||||
for tag in (set(self._get_tags()) | set(tags))
|
||||
for tag in (set(self.__tags) | set(tags))
|
||||
]
|
||||
|
||||
def remove_tags(
|
||||
|
|
@ -209,7 +217,7 @@ class User(UserBase, table=True):
|
|||
|
||||
self.tags = [
|
||||
tag._(self)
|
||||
for tag in (set(self._get_tags()) - set(tags))
|
||||
for tag in (set(self.__tags) - set(tags))
|
||||
]
|
||||
|
||||
def can_edit(
|
||||
|
|
@ -221,7 +229,7 @@ class User(UserBase, table=True):
|
|||
"""
|
||||
|
||||
# admin can "edit" everything
|
||||
if self.has_tag(TagValue.admin):
|
||||
if self.is_admin:
|
||||
return True
|
||||
|
||||
# user can "edit" itself
|
||||
|
|
@ -240,7 +248,7 @@ class User(UserBase, table=True):
|
|||
"""
|
||||
|
||||
# only admin can "admin" anything
|
||||
if not self.has_tag(TagValue.admin):
|
||||
if not self.is_admin:
|
||||
return False
|
||||
|
||||
# admin canot "admin itself"!
|
||||
|
|
@ -264,7 +272,7 @@ class User(UserBase, table=True):
|
|||
return False
|
||||
|
||||
# admin can "create" everything
|
||||
if self.has_tag(TagValue.admin):
|
||||
if self.is_admin:
|
||||
return True
|
||||
|
||||
# user can only create devices for itself
|
||||
|
|
|
|||
|
|
@ -6,6 +6,7 @@ from __future__ import annotations
|
|||
|
||||
import subprocess
|
||||
from datetime import datetime
|
||||
from enum import Enum, auto
|
||||
from pathlib import Path
|
||||
|
||||
from OpenSSL import crypto
|
||||
|
|
@ -97,6 +98,19 @@ class DistinguishedName(BaseModel):
|
|||
]
|
||||
|
||||
|
||||
class CertificateType(Enum):
|
||||
"""
|
||||
Possible types of certificates
|
||||
"""
|
||||
|
||||
ca = auto()
|
||||
client = auto()
|
||||
server = auto()
|
||||
|
||||
def __str__(self) -> str:
|
||||
return self._name_
|
||||
|
||||
|
||||
class EasyRSA:
|
||||
"""
|
||||
Represents an EasyRSA PKI.
|
||||
|
|
@ -225,6 +239,7 @@ class EasyRSA:
|
|||
Path("ca.crt"),
|
||||
Config._.crypto.ca_expiry_days,
|
||||
|
||||
"--dn-mode=cn_only",
|
||||
"--req-cn=kiwi-vpn-ca",
|
||||
|
||||
"build-ca",
|
||||
|
|
@ -236,13 +251,17 @@ class EasyRSA:
|
|||
|
||||
def issue(
|
||||
self,
|
||||
cert_type: str = "client",
|
||||
cert_type: CertificateType = CertificateType.client,
|
||||
dn: DistinguishedName = DistinguishedName.build(),
|
||||
) -> crypto.X509:
|
||||
) -> crypto.X509 | None:
|
||||
"""
|
||||
Issue a client or server certificate
|
||||
"""
|
||||
|
||||
if not (cert_type is CertificateType.client
|
||||
or cert_type is CertificateType.server):
|
||||
return None
|
||||
|
||||
return self.__build_cert(
|
||||
Path(f"issued/{dn.common_name}.crt"),
|
||||
Config._.crypto.cert_expiry_days,
|
||||
|
|
@ -262,7 +281,7 @@ if __name__ == "__main__":
|
|||
easy_rsa.init_pki()
|
||||
|
||||
ca = easy_rsa.build_ca()
|
||||
server = easy_rsa.issue("server")
|
||||
server = easy_rsa.issue(CertificateType.server)
|
||||
client = None
|
||||
|
||||
# check if configured
|
||||
|
|
@ -275,7 +294,7 @@ if __name__ == "__main__":
|
|||
db.add(device)
|
||||
dn = DistinguishedName.build(device)
|
||||
|
||||
client = easy_rsa.issue("client", dn)
|
||||
client = easy_rsa.issue(dn=dn)
|
||||
|
||||
date_format, encoding = "%Y%m%d%H%M%SZ", "ascii"
|
||||
|
||||
|
|
|
|||
|
|
@ -39,6 +39,10 @@ class Responses:
|
|||
"description": "Operation not permitted",
|
||||
"content": None,
|
||||
}
|
||||
ENTRY_ADDED = {
|
||||
"description": "Entry added to database",
|
||||
"content": None,
|
||||
}
|
||||
ENTRY_EXISTS = {
|
||||
"description": "Entry exists in database",
|
||||
"content": None,
|
||||
|
|
|
|||
|
|
@ -83,7 +83,7 @@ async def set_config(
|
|||
"""
|
||||
|
||||
# check permissions
|
||||
if not current_user.has_tag(TagValue.admin):
|
||||
if not current_user.is_admin:
|
||||
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
|
||||
|
||||
# update config file, reconnect to database
|
||||
|
|
|
|||
|
|
@ -14,7 +14,7 @@ router = APIRouter(prefix="/device", tags=["device"])
|
|||
@router.post(
|
||||
"/{user_name}",
|
||||
responses={
|
||||
status.HTTP_200_OK: Responses.OK,
|
||||
status.HTTP_201_CREATED: Responses.ENTRY_ADDED,
|
||||
status.HTTP_400_BAD_REQUEST: Responses.NOT_INSTALLED,
|
||||
status.HTTP_401_UNAUTHORIZED: Responses.NEEDS_USER,
|
||||
status.HTTP_403_FORBIDDEN: Responses.NEEDS_PERMISSION,
|
||||
|
|
@ -22,6 +22,7 @@ router = APIRouter(prefix="/device", tags=["device"])
|
|||
status.HTTP_409_CONFLICT: Responses.ENTRY_EXISTS,
|
||||
},
|
||||
response_model=DeviceRead,
|
||||
status_code=status.HTTP_201_CREATED,
|
||||
)
|
||||
async def add_device(
|
||||
device: DeviceCreate,
|
||||
|
|
|
|||
|
|
@ -5,7 +5,9 @@
|
|||
from fastapi import APIRouter, Depends, HTTPException, status
|
||||
|
||||
from ..config import Config
|
||||
from ._common import Responses, get_current_config
|
||||
from ..db import User
|
||||
from ..easyrsa import CertificateType, EasyRSA
|
||||
from ._common import Responses, get_current_config, get_current_user
|
||||
|
||||
router = APIRouter(prefix="/service", tags=["service"])
|
||||
|
||||
|
|
@ -20,5 +22,14 @@ router = APIRouter(prefix="/service", tags=["service"])
|
|||
)
|
||||
async def init_pki(
|
||||
_: Config = Depends(get_current_config),
|
||||
current_user: User = Depends(get_current_user),
|
||||
) -> None:
|
||||
pass
|
||||
|
||||
if not current_user.is_admin:
|
||||
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
|
||||
|
||||
easy_rsa = EasyRSA()
|
||||
|
||||
easy_rsa.init_pki()
|
||||
easy_rsa.build_ca()
|
||||
easy_rsa.issue(CertificateType.server)
|
||||
|
|
|
|||
|
|
@ -63,13 +63,14 @@ async def get_current_user_route(
|
|||
@router.post(
|
||||
"",
|
||||
responses={
|
||||
status.HTTP_200_OK: Responses.OK,
|
||||
status.HTTP_201_CREATED: Responses.ENTRY_ADDED,
|
||||
status.HTTP_400_BAD_REQUEST: Responses.NOT_INSTALLED,
|
||||
status.HTTP_401_UNAUTHORIZED: Responses.NEEDS_USER,
|
||||
status.HTTP_403_FORBIDDEN: Responses.NEEDS_PERMISSION,
|
||||
status.HTTP_409_CONFLICT: Responses.ENTRY_EXISTS,
|
||||
},
|
||||
response_model=UserRead,
|
||||
status_code=status.HTTP_201_CREATED,
|
||||
)
|
||||
async def add_user(
|
||||
user: UserCreate,
|
||||
|
|
@ -127,11 +128,12 @@ async def remove_user(
|
|||
@router.post(
|
||||
"/{user_name}/tags",
|
||||
responses={
|
||||
status.HTTP_200_OK: Responses.OK,
|
||||
status.HTTP_201_CREATED: Responses.ENTRY_ADDED,
|
||||
status.HTTP_400_BAD_REQUEST: Responses.NOT_INSTALLED,
|
||||
status.HTTP_401_UNAUTHORIZED: Responses.NEEDS_USER,
|
||||
status.HTTP_403_FORBIDDEN: Responses.NEEDS_PERMISSION,
|
||||
},
|
||||
status_code=status.HTTP_201_CREATED,
|
||||
)
|
||||
async def extend_tags(
|
||||
tags: list[TagValue],
|
||||
|
|
|
|||
Loading…
Reference in a new issue