import logging import threading from io import BytesIO from typing import Any from webdav3.client import Client, Resource _logger = logging.getLogger(__name__) class DavFile: def __init__(self, client: Client, path: Any) -> None: self.__resource: Resource = client.resource(path) self.__buffer = BytesIO() self.__lock = threading.Lock() def update(self) -> None: _logger.debug(f"updating {self.__resource}") with self.__lock: self.__buffer.seek(0) self.__buffer.truncate(0) self.__resource.write_to(self.__buffer) def refresh(self, refresh_interval: int = 5) -> threading.Event: stop_handle = threading.Event() def refresh_loop() -> None: while not stop_handle.wait(refresh_interval): self.update() thread = threading.Thread(target=refresh_loop) thread.daemon = True thread.start() return stop_handle @property def bytes(self) -> bytes: with self.__lock: self.__buffer.seek(0) return self.__buffer.read() def __str__(self) -> str: return self.bytes.decode(encoding="utf-8")