""" Definition of WebDAV and CalDAV clients. """ from functools import lru_cache from logging import getLogger from time import sleep from typing import Any, Iterator from caldav import DAVClient as CalDAVclient from caldav import Principal as CalDAVPrincipal from webdav3.client import Client as WebDAVclient from webdav3.client import Resource as WebDAVResource from .async_helpers import run_in_executor from .settings import SETTINGS _WEBDAV_CLIENT = WebDAVclient({ "webdav_hostname": SETTINGS.webdav.url, "webdav_login": SETTINGS.webdav.username, "webdav_password": SETTINGS.webdav.password, }) _logger = getLogger(__name__) def webdav_check() -> bool: """ Checks if base resources are available. """ _logger.info( "Production mode is %s.", "enabled" if SETTINGS.production_mode else "disabled", ) if SETTINGS.production_mode: for _ in range(SETTINGS.webdav_retries): if _WEBDAV_CLIENT.check(""): break _logger.warning( "Waiting for WebDAV connection to %s ...", repr(SETTINGS.webdav.url), ) sleep(30) _logger.debug("WebDAV connection OK!") elif _WEBDAV_CLIENT.check(""): _logger.debug("WebDAV connection OK!") else: _logger.error( "WebDAV connection to %s FAILED!", repr(SETTINGS.webdav.url), ) return False if _WEBDAV_CLIENT.check(SETTINGS.webdav_prefix): _logger.debug("WebDAV prefix directory FOUND!") else: _logger.error( "WebDAV prefix directory %s NOT FOUND, please create it!", repr(SETTINGS.webdav_prefix), ) return False return True @lru_cache(maxsize=SETTINGS.cache_size) def webdav_resource(remote_path: Any) -> WebDAVResource: """ Gets a resource using the main WebDAV client. """ return _WEBDAV_CLIENT.resource( f"{SETTINGS.webdav_prefix}/{remote_path}" ) @run_in_executor def webdav_list(remote_path: str) -> list[str]: """ Asynchroneously lists a WebDAV path using the main WebDAV client. """ return _WEBDAV_CLIENT.list( f"{SETTINGS.webdav_prefix}/{remote_path}" ) _CALDAV_CLIENT = CalDAVclient( url=SETTINGS.caldav.url, username=SETTINGS.caldav.username, password=SETTINGS.caldav.password, ) def caldav_principal() -> CalDAVPrincipal: """ Gets the `Principal` object of the main CalDAV client. """ return _CALDAV_CLIENT.principal() @run_in_executor def caldav_list() -> Iterator[str]: """ Asynchroneously lists all calendars using the main WebDAV client. """ return ( cal.name for cal in caldav_principal().calendars() )