import logging from io import BytesIO from threading import Lock from typing import Any from apscheduler.schedulers.asyncio import AsyncIOScheduler from webdav3.client import Client, Resource _logger = logging.getLogger(__name__) class DavFile: __instances = None def __init__(self, client: Client, path: Any) -> None: self.__resource: Resource = client.resource(path) self.__buffer = BytesIO() self.__lock = Lock() # register if DavFile.__instances is None: DavFile.__instances = [] DavFile.__instances.append(self) def update(self) -> None: _logger.info(f"updating {self.__resource}") with self.__lock: self.__buffer.seek(0) self.__buffer.truncate(0) self.__resource.write_to(self.__buffer) @classmethod def refresh(cls, refresh_interval: int = 5) -> AsyncIOScheduler: scheduler = AsyncIOScheduler() def tick() -> None: for davfile in DavFile.__instances: davfile.update() scheduler.add_job(tick) scheduler.add_job( tick, "interval", seconds=refresh_interval, ) scheduler.start() return scheduler @property def bytes(self) -> bytes: with self.__lock: self.__buffer.seek(0) return self.__buffer.read() def __str__(self) -> str: return self.bytes.decode(encoding="utf-8")