76 lines
1.7 KiB
Python
76 lines
1.7 KiB
Python
import asyncio
|
|
import functools
|
|
import time
|
|
from typing import Any
|
|
|
|
import caldav
|
|
from async_lru import alru_cache
|
|
from webdav3 import client as WebDAVclient
|
|
|
|
from .settings import SETTINGS
|
|
|
|
_WEBDAV_CLIENT = WebDAVclient.Client({
|
|
"webdav_hostname": SETTINGS.webdav_url,
|
|
"webdav_login": SETTINGS.dav_username,
|
|
"webdav_password": SETTINGS.dav_password,
|
|
})
|
|
|
|
|
|
def run_in_executor(f):
|
|
"""
|
|
Decorator to make blocking function call asyncio compatible
|
|
https://stackoverflow.com/questions/41063331/how-to-use-asyncio-with-existing-blocking-library/
|
|
"""
|
|
|
|
@functools.wraps(f)
|
|
def inner(*args, **kwargs):
|
|
loop = asyncio.get_running_loop()
|
|
return loop.run_in_executor(
|
|
None,
|
|
functools.partial(f, *args, **kwargs),
|
|
)
|
|
|
|
return inner
|
|
|
|
|
|
def get_ttl_hash(seconds: int = 20) -> int:
|
|
"""
|
|
Return the same value within `seconds` time period
|
|
https://stackoverflow.com/a/55900800
|
|
"""
|
|
return round(time.time() / seconds)
|
|
|
|
|
|
def timed_alru_cache(**decorator_kwargs):
|
|
def decorate(f):
|
|
@alru_cache(**decorator_kwargs)
|
|
@functools.wraps(f)
|
|
async def wrapper(ttl_hash: int, *args, **kwargs):
|
|
del ttl_hash
|
|
|
|
return await f(*args, **kwargs)
|
|
|
|
return wrapper
|
|
|
|
return decorate
|
|
|
|
|
|
@functools.lru_cache
|
|
def webdav_resource(remote_path: Any) -> WebDAVclient.Resource:
|
|
return _WEBDAV_CLIENT.resource(remote_path)
|
|
|
|
|
|
@run_in_executor
|
|
def webdav_list(remote_path: str) -> list:
|
|
return _WEBDAV_CLIENT.list(remote_path)
|
|
|
|
|
|
_CALDAV_CLIENT = caldav.DAVClient(
|
|
url=SETTINGS.caldav_url,
|
|
username=SETTINGS.dav_username,
|
|
password=SETTINGS.dav_password,
|
|
)
|
|
|
|
|
|
def caldav_principal() -> caldav.Principal:
|
|
return _CALDAV_CLIENT.principal()
|