ovdashboard/api/ovdashboard_api/__init__.py

53 lines
1.2 KiB
Python
Raw Normal View History

2022-09-03 15:40:46 +00:00
import asyncio
2022-09-02 15:28:52 +00:00
import functools
from typing import Any
2022-09-02 19:21:15 +00:00
import caldav
from webdav3 import client as WebDAVclient
2022-08-29 11:27:18 +00:00
from .config import SETTINGS
2022-09-02 19:21:15 +00:00
_WEBDAV_CLIENT = WebDAVclient.Client({
2022-08-29 11:27:18 +00:00
"webdav_hostname": SETTINGS.webdav_url,
"webdav_login": SETTINGS.dav_username,
"webdav_password": SETTINGS.dav_password,
})
2022-09-02 15:28:52 +00:00
2022-09-03 15:40:46 +00:00
def run_in_executor(f):
"""
Decorator to make blocking function call asyncio compatible
https://stackoverflow.com/questions/41063331/how-to-use-asyncio-with-existing-blocking-library/
"""
@functools.wraps(f)
def inner(*args, **kwargs):
loop = asyncio.get_running_loop()
return loop.run_in_executor(
None,
functools.partial(f, *args, **kwargs),
)
return inner
2022-09-02 15:28:52 +00:00
@functools.lru_cache
2022-09-02 19:21:15 +00:00
def webdav_resource(remote_path: Any) -> WebDAVclient.Resource:
2022-09-02 15:28:52 +00:00
return _WEBDAV_CLIENT.resource(remote_path)
2022-09-03 15:40:46 +00:00
@run_in_executor
2022-09-02 15:28:52 +00:00
def webdav_list(remote_path: str) -> list:
return _WEBDAV_CLIENT.list(remote_path)
2022-09-02 19:21:15 +00:00
_CALDAV_CLIENT = caldav.DAVClient(
url=SETTINGS.caldav_url,
username=SETTINGS.dav_username,
password=SETTINGS.dav_password,
)
def caldav_principal() -> caldav.Principal:
return _CALDAV_CLIENT.principal()