mirror of
https://code.lenaisten.de/Lenaisten/advent22.git
synced 2024-11-23 08:13:01 +00:00
55 lines
1.5 KiB
Python
55 lines
1.5 KiB
Python
import re
|
|
from io import BytesIO, TextIOWrapper
|
|
|
|
from cache import AsyncTTL
|
|
from webdav3.client import Client as WebDAVclient
|
|
|
|
from .settings import SETTINGS
|
|
|
|
_WEBDAV_CLIENT = WebDAVclient(
|
|
{
|
|
"webdav_hostname": SETTINGS.webdav.url,
|
|
"webdav_login": SETTINGS.webdav.username,
|
|
"webdav_password": SETTINGS.webdav.password,
|
|
"disable_check": SETTINGS.webdav.disable_check,
|
|
}
|
|
)
|
|
|
|
|
|
@AsyncTTL(time_to_live=SETTINGS.cache_ttl)
|
|
async def dav_list_files(regex: re.Pattern[str], directory: str = "") -> list[str]:
|
|
ls = _WEBDAV_CLIENT.list(directory)
|
|
return [f"{directory}/{path}" for path in ls if regex.search(path)]
|
|
|
|
|
|
@AsyncTTL(time_to_live=SETTINGS.cache_ttl)
|
|
async def dav_file_exists(path: str) -> bool:
|
|
return _WEBDAV_CLIENT.check(path)
|
|
|
|
|
|
@AsyncTTL(time_to_live=SETTINGS.cache_ttl)
|
|
async def dav_get_file(path: str) -> BytesIO:
|
|
resource = _WEBDAV_CLIENT.resource(path)
|
|
buffer = BytesIO()
|
|
resource.write_to(buffer)
|
|
|
|
return buffer
|
|
|
|
|
|
@AsyncTTL(time_to_live=SETTINGS.cache_ttl)
|
|
async def dav_get_textfile_content(path: str, encoding="utf-8") -> str:
|
|
buffer = await dav_get_file(path)
|
|
tio = TextIOWrapper(buffer, encoding=encoding)
|
|
tio.seek(0)
|
|
return tio.read().strip()
|
|
|
|
|
|
async def dav_write_file(path: str, buffer: BytesIO) -> None:
|
|
resource = _WEBDAV_CLIENT.resource(path)
|
|
resource.read_from(buffer)
|
|
|
|
|
|
async def dav_write_textfile_content(path: str, content: str, encoding="utf-8") -> None:
|
|
buffer = BytesIO(content.encode(encoding=encoding))
|
|
buffer.seek(0)
|
|
await dav_write_file(path, buffer)
|