ovdashboard/api/ovdashboard_api/dav_calendar.py

83 lines
2 KiB
Python
Raw Normal View History

2022-09-04 18:48:46 +00:00
import logging
from dataclasses import dataclass
from datetime import datetime, timedelta
from typing import Iterator
from caldav import Calendar
from pydantic import BaseModel
2022-09-04 20:50:11 +00:00
from vobject.icalendar import VEvent
2022-09-04 18:48:46 +00:00
from . import caldav_principal, get_ttl_hash, run_in_executor, timed_alru_cache
_logger = logging.getLogger(__name__)
class CalEvent(BaseModel):
summary: str
description: str
dtstart: datetime
dtend: datetime
@timed_alru_cache(maxsize=20)
async def _get_calendar(
calendar_name: str,
) -> Calendar:
@run_in_executor
def _get_calendar_inner() -> Calendar:
2022-09-04 20:50:11 +00:00
return caldav_principal().calendar(calendar_name)
return await _get_calendar_inner()
@timed_alru_cache(maxsize=20)
async def _get_calendar_events(
calendar_name: str,
) -> list[CalEvent]:
@run_in_executor
def _inner() -> Iterator[VEvent]:
2022-09-04 18:48:46 +00:00
_logger.info(f"updating {calendar_name}")
print(f"updating {calendar_name}")
2022-09-04 20:50:11 +00:00
calendar = caldav_principal().calendar(calendar_name)
return (
vevent
for event in calendar.date_search(
start=datetime.now(),
end=datetime.now() + timedelta(days=365),
expand=True,
)
for vevent in event.vobject_instance.contents["vevent"]
)
2022-09-04 18:48:46 +00:00
2022-09-04 20:50:11 +00:00
return [
CalEvent(
summary=vevent.summary.value,
description=vevent.description.value,
dtstart=vevent.dtstart.value,
dtend=vevent.dtend.value,
)
for vevent in await _inner()
]
2022-09-04 18:48:46 +00:00
@dataclass(frozen=True)
class DavCalendar:
calendar_name: str
@property
async def calendar(self) -> Calendar:
return await _get_calendar(
ttl_hash=get_ttl_hash(20),
calendar_name=self.calendar_name,
)
@property
2022-09-04 20:50:11 +00:00
async def events(self) -> list[CalEvent]:
return await _get_calendar_events(
ttl_hash=get_ttl_hash(20),
calendar_name=self.calendar_name,
2022-09-04 18:48:46 +00:00
)