import asyncio import functools from typing import Any import caldav from webdav3 import client as WebDAVclient from .config import SETTINGS _WEBDAV_CLIENT = WebDAVclient.Client({ "webdav_hostname": SETTINGS.webdav_url, "webdav_login": SETTINGS.dav_username, "webdav_password": SETTINGS.dav_password, }) def run_in_executor(f): """ Decorator to make blocking function call asyncio compatible https://stackoverflow.com/questions/41063331/how-to-use-asyncio-with-existing-blocking-library/ """ @functools.wraps(f) def inner(*args, **kwargs): loop = asyncio.get_running_loop() return loop.run_in_executor( None, functools.partial(f, *args, **kwargs), ) return inner @functools.lru_cache def webdav_resource(remote_path: Any) -> WebDAVclient.Resource: return _WEBDAV_CLIENT.resource(remote_path) @run_in_executor def webdav_list(remote_path: str) -> list: return _WEBDAV_CLIENT.list(remote_path) _CALDAV_CLIENT = caldav.DAVClient( url=SETTINGS.caldav_url, username=SETTINGS.dav_username, password=SETTINGS.dav_password, ) def caldav_principal() -> caldav.Principal: return _CALDAV_CLIENT.principal()