import logging import re from dataclasses import dataclass from io import BytesIO from asyncify import asyncify from cachetools import cachedmethod from redis import Redis from .helpers import RedisCache, WebDAVclient, davkey _logger = logging.getLogger(__name__) @dataclass(kw_only=True, frozen=True, slots=True) class Settings: url: str username: str = "johndoe" password: str = "s3cr3t!" class WebDAV: _webdav_client: WebDAVclient _cache: RedisCache def __init__(self, settings: Settings, redis: Redis, ttl_sec: int) -> None: try: self._webdav_client = WebDAVclient( { "webdav_hostname": settings.url, "webdav_login": settings.username, "webdav_password": settings.password, } ) assert self._webdav_client.check() is True except AssertionError: raise RuntimeError("WebDAV connection failed!") self._cache = RedisCache(cache=redis, ttl=ttl_sec) @asyncify @cachedmethod(cache=lambda self: self._cache, key=davkey("list_files")) def _list_files(self, directory: str = "") -> list[str]: """ List files in directory `directory` matching RegEx `regex` """ return self._webdav_client.list(directory) async def list_files( self, directory: str = "", *, regex: re.Pattern[str] = re.compile(""), ) -> list[str]: _logger.debug(f"list_files {directory!r} ({regex!r})") ls = await self._list_files(directory) return [path for path in ls if regex.search(path)] @asyncify @cachedmethod(cache=lambda self: self._cache, key=davkey("exists")) def exists(self, path: str) -> bool: """ `True` iff there is a WebDAV resource at `path` """ _logger.debug(f"file_exists {path!r}") return self._webdav_client.check(path) @asyncify @cachedmethod(cache=lambda self: self._cache, key=davkey("read_bytes")) def read_bytes(self, path: str) -> bytes: """ Load WebDAV file from `path` as bytes """ _logger.debug(f"read_bytes {path!r}") buffer = BytesIO() self._webdav_client.download_from(buffer, path) buffer.seek(0) return buffer.read() async def read_str(self, path: str, encoding="utf-8") -> str: """ Load WebDAV file from `path` as string """ _logger.debug(f"read_str {path!r}") return (await self.read_bytes(path)).decode(encoding=encoding).strip() @asyncify def write_bytes(self, path: str, buffer: bytes) -> None: """ Write bytes from `buffer` into WebDAV file at `path` """ _logger.debug(f"write_bytes {path!r}") self._webdav_client.upload_to(buffer, path) # invalidate cache entry # begin slice at 0 (there is no "self" argument) del self._cache[davkey("read_bytes", slice(0, None))(path)] async def write_str(self, path: str, content: str, encoding="utf-8") -> None: """ Write string from `content` into WebDAV file at `path` """ _logger.debug(f"write_str {path!r}") await self.write_bytes(path, content.encode(encoding=encoding))