import logging from io import BytesIO from threading import Lock from typing import Optional from apscheduler.schedulers.asyncio import AsyncIOScheduler from webdav3.client import Resource _logger = logging.getLogger(__name__) class DavFile: __instances: Optional[list["DavFile"]] = None __scheduler = None def __init__(self, resource: Resource, refresh: bool = True) -> None: self.__resource: Resource = resource self.__buffer = BytesIO() self.__lock = Lock() # register if DavFile.__instances is None: DavFile.__instances = [] if refresh: DavFile.__instances.append(self) def download(self) -> None: _logger.info(f"updating {self.__resource}") with self.__lock: self.__buffer.seek(0) self.__buffer.truncate(0) self.__resource.write_to(self.__buffer) @classmethod def refresh(cls, refresh_interval: int = 60) -> None: if cls.__scheduler is not None: cls.__scheduler.reschedule_job( job_id=cls.__name__, trigger="interval", seconds=refresh_interval, ) return def tick() -> None: for davfile in DavFile.__instances: davfile.download() cls.__scheduler = AsyncIOScheduler() cls.__scheduler.start() cls.__scheduler.add_job(tick) cls.__scheduler.add_job( tick, id=cls.__name__, trigger="interval", seconds=refresh_interval, ) @property def bytes(self) -> bytes: with self.__lock: self.__buffer.seek(0) return self.__buffer.read() def __str__(self) -> str: return self.bytes.decode(encoding="utf-8")