import asyncio import functools import logging import time from io import BytesIO from typing import Any, Optional from async_lru import alru_cache from webdav3.client import Resource from . import webdav_resource _logger = logging.getLogger(__name__) def _run_in_executor(f): """ Decorator to make blocking function call asyncio compatible https://stackoverflow.com/questions/41063331/how-to-use-asyncio-with-existing-blocking-library/ """ @functools.wraps(f) def inner(*args, **kwargs): loop = asyncio.get_running_loop() return loop.run_in_executor( None, functools.partial(f, *args, **kwargs), ) return inner def _get_ttl_hash(seconds: int = 20) -> int: """ Return the same value within `seconds` time period https://stackoverflow.com/a/55900800 """ return round(time.time() / seconds) @alru_cache(maxsize=20) async def _get_buffer( remote_path: Any, ttl_hash: Optional[int] = None, ) -> BytesIO: del ttl_hash @_run_in_executor def buffer_inner(resource: Resource) -> BytesIO: _logger.info(f"updating {resource}") print(f"updating {resource}") buffer = BytesIO() resource.write_to(buffer) return buffer resource = webdav_resource(remote_path) return await buffer_inner(resource) class DavFile: def __init__(self, remote_path: Any) -> None: self.__remote_path = remote_path @property async def __buffer(self) -> BytesIO: return await _get_buffer( remote_path=self.__remote_path, ttl_hash=_get_ttl_hash(20), ) @property async def bytes(self) -> bytes: buffer = await self.__buffer buffer.seek(0) return buffer.read() @property async def string(self) -> str: bytes = await self.bytes return bytes.decode(encoding="utf-8")