import logging from io import BytesIO from threading import Lock from typing import Any from apscheduler.schedulers.asyncio import AsyncIOScheduler from webdav3.client import Client, Resource _logger = logging.getLogger(__name__) class DavFile: __instances = None __scheduler = None __job = None def __init__(self, client: Client, path: Any) -> None: self.__resource: Resource = client.resource(path) self.__buffer = BytesIO() self.__lock = Lock() # register if DavFile.__instances is None: DavFile.__instances = [] DavFile.__instances.append(self) def update(self) -> None: _logger.info(f"updating {self.__resource}") with self.__lock: self.__buffer.seek(0) self.__buffer.truncate(0) self.__resource.write_to(self.__buffer) @classmethod def refresh(cls, refresh_interval: int = 60): if cls.__job is not None: cls.__scheduler.reschedule_job( job_id=cls.__name__, trigger="interval", seconds=refresh_interval, ) return def tick() -> None: for davfile in DavFile.__instances: davfile.update() cls.__scheduler = AsyncIOScheduler() cls.__scheduler.start() cls.__scheduler.add_job(tick) cls.__job = cls.__scheduler.add_job( tick, id=cls.__name__, trigger="interval", seconds=refresh_interval, ) @property def bytes(self) -> bytes: with self.__lock: self.__buffer.seek(0) return self.__buffer.read() def __str__(self) -> str: return self.bytes.decode(encoding="utf-8")