ovdashboard/api/ovkiosk/dav_file.py

88 lines
2.3 KiB
Python

import asyncio
import functools
import logging
from io import BytesIO
from threading import Lock
from typing import Optional
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from webdav3.client import Resource
_logger = logging.getLogger(__name__)
def run_in_executor(f):
"""
Decorator to make blocking function call asyncio compatible
https://stackoverflow.com/questions/41063331/how-to-use-asyncio-with-existing-blocking-library/
"""
@functools.wraps(f)
def inner(*args, **kwargs):
loop = asyncio.get_running_loop()
return loop.run_in_executor(None, functools.partial(f, *args, **kwargs))
return inner
class DavFile:
__instances: Optional[list["DavFile"]] = None
__scheduler = None
def __init__(self, resource: Resource, refresh: bool = True) -> None:
self.__resource: Resource = resource
self.__buffer = BytesIO()
self.__lock = Lock()
# register
if DavFile.__instances is None:
DavFile.__instances = []
if refresh:
DavFile.__instances.append(self)
async def download(self) -> None:
@run_in_executor
def download_inner() -> None:
self.__resource.write_to(self.__buffer)
_logger.info(f"updating {self.__resource}")
with self.__lock:
self.__buffer.seek(0)
self.__buffer.truncate(0)
await download_inner()
@classmethod
def refresh(cls, refresh_interval: int = 60) -> None:
if cls.__scheduler is not None:
cls.__scheduler.reschedule_job(
job_id=cls.__name__,
trigger="interval",
seconds=refresh_interval,
)
return
async def tick() -> None:
for davfile in DavFile.__instances:
await davfile.download()
cls.__scheduler = AsyncIOScheduler()
cls.__scheduler.start()
cls.__scheduler.add_job(tick)
cls.__scheduler.add_job(
tick,
id=cls.__name__,
trigger="interval",
seconds=refresh_interval,
)
@property
def bytes(self) -> bytes:
with self.__lock:
self.__buffer.seek(0)
return self.__buffer.read()
def __str__(self) -> str:
return self.bytes.decode(encoding="utf-8")