ovdashboard/api/ovdashboard_api/dav_file.py

81 lines
1.9 KiB
Python

import asyncio
import functools
import logging
import time
from io import BytesIO
from typing import Any, Optional
from async_lru import alru_cache
from webdav3.client import Resource
from . import webdav_resource
_logger = logging.getLogger(__name__)
def _run_in_executor(f):
"""
Decorator to make blocking function call asyncio compatible
https://stackoverflow.com/questions/41063331/how-to-use-asyncio-with-existing-blocking-library/
"""
@functools.wraps(f)
def inner(*args, **kwargs):
loop = asyncio.get_running_loop()
return loop.run_in_executor(
None,
functools.partial(f, *args, **kwargs),
)
return inner
def _get_ttl_hash(seconds: int = 20) -> int:
"""
Return the same value within `seconds` time period
https://stackoverflow.com/a/55900800
"""
return round(time.time() / seconds)
@alru_cache(maxsize=20)
async def _get_buffer(
remote_path: Any,
ttl_hash: Optional[int] = None,
) -> BytesIO:
del ttl_hash
@_run_in_executor
def buffer_inner(resource: Resource) -> BytesIO:
_logger.info(f"updating {resource}")
print(f"updating {resource}")
buffer = BytesIO()
resource.write_to(buffer)
return buffer
resource = webdav_resource(remote_path)
return await buffer_inner(resource)
class DavFile:
def __init__(self, remote_path: Any) -> None:
self.__remote_path = remote_path
@property
async def __buffer(self) -> BytesIO:
return await _get_buffer(
remote_path=self.__remote_path,
ttl_hash=_get_ttl_hash(20),
)
@property
async def bytes(self) -> bytes:
buffer = await self.__buffer
buffer.seek(0)
return buffer.read()
@property
async def string(self) -> str:
bytes = await self.bytes
return bytes.decode(encoding="utf-8")