ovdashboard/api/ovdashboard_api/routers/v1/aggregate.py

65 lines
1.7 KiB
Python

"""
Router "aggregate" provides:
- listing aggregate calendars
- finding aggregate calendars by name prefix
- getting aggregate calendar events by name prefix
"""
from logging import getLogger
from typing import Iterator
from fastapi import APIRouter, Depends
from ...core.caldav import CalDAV
from ...core.calevent import CalEvent
from ...core.config import Config
from ._common import AggregateNameLister, PrefixFinder, PrefixUnique
from .calendar import calendar_unique
_logger = getLogger(__name__)
router = APIRouter(prefix="/aggregate", tags=["calendar"])
aggregate_lister = AggregateNameLister()
aggregate_finder = PrefixFinder(aggregate_lister)
aggregate_unique = PrefixUnique(aggregate_finder)
@router.on_event("startup")
async def start_router() -> None:
_logger.debug(f"{router.prefix} router starting.")
@router.get("/list", response_model=list[str])
async def list_aggregate_calendars(
names: Iterator[str] = Depends(aggregate_lister),
) -> list[str]:
return list(names)
@router.get("/find/{prefix}", response_model=list[str])
async def find_aggregate_calendars(
names: Iterator[str] = Depends(aggregate_finder),
) -> list[str]:
return list(names)
@router.get("/get/{prefix}", response_model=list[CalEvent])
async def get_aggregate_calendar(
name: str = Depends(aggregate_unique),
) -> list[CalEvent]:
cfg = await Config.get()
aggregate = cfg.calendar.aggregates[name]
calendars = (
DavCalendar(await calendar_unique(cal_prefix)) for cal_prefix in aggregate
)
return sorted(
[
event
async for calendar in calendars # type: ignore
for event in (await calendar.events)
]
)