import asyncio import functools import logging from io import BytesIO from threading import Lock from typing import Optional from apscheduler.schedulers.asyncio import AsyncIOScheduler from webdav3.client import Resource _logger = logging.getLogger(__name__) def run_in_executor(f): """ Decorator to make blocking function call asyncio compatible https://stackoverflow.com/questions/41063331/how-to-use-asyncio-with-existing-blocking-library/ """ @functools.wraps(f) def inner(*args, **kwargs): loop = asyncio.get_running_loop() return loop.run_in_executor( None, functools.partial(f, *args, **kwargs), ) return inner class DavFile: __instances: Optional[list["DavFile"]] = None __scheduler = None def __init__(self, resource: Resource, refresh: bool = True) -> None: self.__resource: Resource = resource self.__buffer = BytesIO() self.__lock = Lock() # register if DavFile.__instances is None: DavFile.__instances = [] if refresh: DavFile.__instances.append(self) async def download(self) -> None: @run_in_executor def download_inner() -> None: self.__resource.write_to(self.__buffer) _logger.info(f"updating {self.__resource}") with self.__lock: self.__buffer.seek(0) self.__buffer.truncate(0) await download_inner() @classmethod def refresh(cls, refresh_interval: int = 60) -> None: if cls.__scheduler is not None: cls.__scheduler.reschedule_job( job_id=cls.__name__, trigger="interval", seconds=refresh_interval, ) return async def tick() -> None: for davfile in DavFile.__instances: await davfile.download() cls.__scheduler = AsyncIOScheduler() cls.__scheduler.start() cls.__scheduler.add_job(tick) cls.__scheduler.add_job( tick, id=cls.__name__, trigger="interval", seconds=refresh_interval, ) @property def bytes(self) -> bytes: with self.__lock: self.__buffer.seek(0) return self.__buffer.read() def __str__(self) -> str: return self.bytes.decode(encoding="utf-8")