""" Definition of an asyncio compatible CalDAV calendar. Caches events using `timed_alru_cache`. """ from dataclasses import dataclass from datetime import datetime, timedelta from functools import total_ordering from logging import getLogger from typing import Iterator from caldav import Calendar from caldav.lib.error import ReportError from isodate import parse_duration from pydantic import BaseModel, validator from vobject.base import Component from .async_helpers import get_ttl_hash, run_in_executor, timed_alru_cache from .config import Config from .dav_common import caldav_principal from .settings import SETTINGS _logger = getLogger(__name__) def _string_strip(in_str: str) -> str: """ Wrapper for str.strip(). Used to define `pydantic` validators. """ return in_str.strip() @total_ordering class CalEvent(BaseModel): """ A CalDAV calendar event. Properties are to be named as in the EVENT component of RFC5545 (iCalendar). https://icalendar.org/iCalendar-RFC-5545/3-6-1-event-component.html """ summary: str = "" description: str = "" dtstart: datetime = datetime.utcnow() dtend: datetime = datetime.utcnow() class Config: frozen = True def __lt__(self, other: "CalEvent") -> bool: """ Order Events by start time. """ return self.dtstart < other.dtstart def __eq__(self, other: "CalEvent") -> bool: """ Compare all properties. """ return self.dict() == other.dict() _validate_summary = validator( "summary", allow_reuse=True, )(_string_strip) _validate_description = validator( "description", allow_reuse=True, )(_string_strip) @classmethod def from_vevent(cls, event: Component) -> "CalEvent": """ Create a CalEvent instance from a `VObject.VEvent` object. """ data = {} keys = ("summary", "description", "dtstart", "dtend", "duration") for key in keys: try: data[key] = event.contents[key][0].value # type: ignore except KeyError: pass if "dtend" not in data: data["dtend"] = data["dtstart"] if "duration" in data: try: dtstart = datetime.fromisoformat(data["dtstart"]) duration = parse_duration(data["duration"]) dtend = dtstart + duration data["dtend"] = dtend.isoformat() except (ValueError, TypeError, AttributeError): _logger.warn( "Could not add duration %s to %s", repr(data["duration"]), repr(data["dtstart"]), ) data["dtend"] = data["dtstart"] return cls.parse_obj(data) @timed_alru_cache(maxsize=SETTINGS.cache_size) async def _get_calendar( calendar_name: str, ) -> Calendar: """ Get a calendar by name using the CalDAV principal object. """ @run_in_executor def _inner() -> Calendar: return caldav_principal().calendar(calendar_name) return await _inner() @timed_alru_cache(maxsize=SETTINGS.cache_size) async def _get_calendar_events( calendar_name: str, ) -> list[CalEvent]: """ Get a sorted list of events by CalDAV calendar name. Do not return an iterator here - this result is cached and an iterator would get consumed. """ cfg = await Config.get() search_span = timedelta(days=cfg.calendar.future_days) @run_in_executor def _inner() -> Iterator[Component]: """ Get events by CalDAV calendar name. This can return an iterator - only the outer function is cached. """ _logger.info(f"downloading {calendar_name!r} ...") calendar = caldav_principal().calendar(calendar_name) date_start = datetime.utcnow().date() time_min = datetime.min.time() dt_start = datetime.combine(date_start, time_min) dt_end = dt_start + search_span try: search_result = calendar.date_search( start=dt_start, end=dt_end, expand=True, verify_expand=True, ) except ReportError: _logger.warning("CalDAV server does not support expanded search") search_result = calendar.date_search( start=dt_start, end=dt_end, expand=False, ) for event in search_result: vobject: Component = event.vobject_instance # type: ignore yield from vobject.vevent_list return sorted([ CalEvent.from_vevent(vevent) for vevent in await _inner() ]) @dataclass(frozen=True) class DavCalendar: """ Object representation of a CalDAV calendar. """ calendar_name: str @property async def calendar(self) -> Calendar: """ Calendar as `caldav` library representation. """ return await _get_calendar( ttl_hash=get_ttl_hash(), # type: ignore calendar_name=self.calendar_name, ) @property async def events(self) -> list[CalEvent]: """ Calendar events in object representation. """ return await _get_calendar_events( ttl_hash=get_ttl_hash(), # type: ignore calendar_name=self.calendar_name, )