/{device_id}/issue mostly done

This commit is contained in:
Jörn-Michael Miehe 2022-04-05 01:55:35 +00:00
parent d89409f973
commit 2d755b8e3d

View file

@ -2,12 +2,10 @@
/device endpoints.
"""
from datetime import datetime
from fastapi import APIRouter, Depends, HTTPException, status
from ..db import Connection, Device, DeviceCreate, DeviceRead, User
from ..easyrsa import CertificateType, DistinguishedName, EasyRSA
from ..db import Device, DeviceCreate, DeviceRead, User
from ..easyrsa import DistinguishedName, EasyRSA
from ._common import (Responses, get_current_user, get_device_by_id,
get_user_by_name)
@ -81,47 +79,43 @@ async def remove_device(
@router.post(
"/{device_id}/csr",
"/{device_id}/issue",
responses={
status.HTTP_200_OK: Responses.OK,
status.HTTP_400_BAD_REQUEST: Responses.NOT_INSTALLED,
status.HTTP_401_UNAUTHORIZED: Responses.NEEDS_USER,
status.HTTP_403_FORBIDDEN: Responses.NEEDS_PERMISSION,
status.HTTP_404_NOT_FOUND: Responses.ENTRY_DOESNT_EXIST,
status.HTTP_409_CONFLICT: Responses.ENTRY_EXISTS,
},
response_model=DeviceRead,
)
async def request_certificate(
current_user: User = Depends(get_current_user),
device: Device = Depends(get_device_by_id),
):
) -> Device:
"""
POST ./{device_id}/csr: Request certificate for a device.
POST ./{device_id}/issue: Request certificate for a device.
"""
# check permission
if not current_user.can_edit(device):
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
easy_rsa = EasyRSA()
# cannot request for a newly created device
if device.approved is not None:
raise HTTPException(status_code=status.HTTP_409_CONFLICT)
with Connection.session as db:
db.add(device)
dn = DistinguishedName.build(device)
# check if we must wait for approval
device.approved = current_user.can_issue
if current_user.can_issue(device):
device.approved = True
if device.approved:
# issue the certificate immediately
if (certificate := EasyRSA._.issue(
dn=DistinguishedName.build(device)
)) is not None:
device.expiry = certificate.not_valid_after
if (cert := easy_rsa.issue(
dn=dn,
cert_type=CertificateType.server,
)) is not None:
assert (expiry := cert.get_notAfter()) is not None
date_format, encoding = "%Y%m%d%H%M%SZ", "ascii"
expiry = datetime.strptime(
expiry.decode(encoding),
date_format,
)
device.expiry = expiry
db.commit()
# return updated device
device.update()
return device