implement timeout instead of scheduling

This commit is contained in:
Jörn-Michael Miehe 2022-09-02 12:20:29 +00:00
parent c65aba82a4
commit 72e33238c4
3 changed files with 69 additions and 127 deletions

View file

@ -1,17 +1,19 @@
import asyncio import asyncio
import functools import functools
import logging import logging
import time
from io import BytesIO from io import BytesIO
from threading import Lock from typing import Any, Optional
from typing import Optional
from apscheduler.schedulers.asyncio import AsyncIOScheduler from async_lru import alru_cache
from webdav3.client import Resource from webdav3.client import Resource
from . import CLIENT
_logger = logging.getLogger(__name__) _logger = logging.getLogger(__name__)
def run_in_executor(f): def _run_in_executor(f):
""" """
Decorator to make blocking function call asyncio compatible Decorator to make blocking function call asyncio compatible
https://stackoverflow.com/questions/41063331/how-to-use-asyncio-with-existing-blocking-library/ https://stackoverflow.com/questions/41063331/how-to-use-asyncio-with-existing-blocking-library/
@ -28,64 +30,52 @@ def run_in_executor(f):
return inner return inner
def _get_ttl_hash(seconds: int = 20) -> int:
"""
Return the same value within `seconds` time period
https://stackoverflow.com/a/55900800
"""
return round(time.time() / seconds)
@alru_cache(maxsize=20)
async def _get_buffer(
remote_path: Any,
ttl_hash: Optional[int] = None,
) -> BytesIO:
del ttl_hash
@_run_in_executor
def buffer_inner(resource: Resource) -> BytesIO:
_logger.info(f"updating {resource}")
print(f"updating {resource}")
buffer = BytesIO()
resource.write_to(buffer)
return buffer
resource = CLIENT.resource(remote_path)
return await buffer_inner(resource)
class DavFile: class DavFile:
__instances: Optional[list["DavFile"]] = None def __init__(self, remote_path: Any) -> None:
__scheduler = None self.__remote_path = remote_path
def __init__(self, resource: Resource, refresh: bool = True) -> None: @property
self.__resource: Resource = resource async def __buffer(self) -> BytesIO:
self.__buffer = BytesIO() return await _get_buffer(
self.__lock = Lock() remote_path=self.__remote_path,
ttl_hash=_get_ttl_hash(20),
# register
if DavFile.__instances is None:
DavFile.__instances = []
if refresh:
DavFile.__instances.append(self)
async def download(self) -> None:
@run_in_executor
def download_inner() -> None:
self.__resource.write_to(self.__buffer)
_logger.info(f"updating {self.__resource}")
with self.__lock:
self.__buffer.seek(0)
self.__buffer.truncate(0)
await download_inner()
@classmethod
def refresh(cls, refresh_interval: int = 60) -> None:
if cls.__scheduler is not None:
cls.__scheduler.reschedule_job(
job_id=cls.__name__,
trigger="interval",
seconds=refresh_interval,
)
return
async def tick() -> None:
for davfile in DavFile.__instances:
await davfile.download()
cls.__scheduler = AsyncIOScheduler()
cls.__scheduler.start()
cls.__scheduler.add_job(tick)
cls.__scheduler.add_job(
tick,
id=cls.__name__,
trigger="interval",
seconds=refresh_interval,
) )
@property @property
def bytes(self) -> bytes: async def bytes(self) -> bytes:
with self.__lock: buffer = await self.__buffer
self.__buffer.seek(0)
return self.__buffer.read()
def __str__(self) -> str: buffer.seek(0)
return self.bytes.decode(encoding="utf-8") return buffer.read()
@property
async def string(self) -> str:
bytes = await self.bytes
return bytes.decode(encoding="utf-8")

View file

@ -1,10 +1,7 @@
import io
import logging
import re import re
import time from io import BytesIO
from typing import Iterator, Optional from typing import Iterator
from async_lru import alru_cache
from fastapi import APIRouter, Depends, HTTPException, status from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.responses import StreamingResponse from fastapi.responses import StreamingResponse
from PIL import Image from PIL import Image
@ -13,16 +10,9 @@ from webdav3.exceptions import RemoteResourceNotFound
from .. import CLIENT from .. import CLIENT
from ..dav_file import DavFile from ..dav_file import DavFile
_logger = logging.getLogger(__name__)
router = APIRouter(prefix="/image", tags=["image"]) router = APIRouter(prefix="/image", tags=["image"])
@router.on_event("startup")
async def on_startup():
_logger.debug("image router started")
_re_image_file = re.compile( _re_image_file = re.compile(
r"\.(gif|jpe?g|tiff?|png|bmp)$", r"\.(gif|jpe?g|tiff?|png|bmp)$",
flags=re.IGNORECASE, flags=re.IGNORECASE,
@ -68,32 +58,6 @@ async def find_images(
return list(file_names) return list(file_names)
async def get_ttl_hash(seconds: int = 20) -> int:
"""
Return the same value withing `seconds` time period
https://stackoverflow.com/a/55900800
"""
return round(time.time() / seconds)
@alru_cache(maxsize=20)
async def get_image_by_name(
file_name: str,
ttl_hash: Optional[int] = None,
) -> io.BytesIO:
del ttl_hash
file = DavFile(CLIENT.resource(f"img/{file_name}"), refresh=False)
print(f"Downloading {file_name}")
await file.download()
img = Image.open(io.BytesIO(file.bytes)).convert("RGB")
img_buffer = io.BytesIO()
img.save(img_buffer, format='JPEG', quality=85)
return img_buffer
@router.get( @router.get(
"/get/{prefix}", "/get/{prefix}",
response_class=StreamingResponse, response_class=StreamingResponse,
@ -114,7 +78,6 @@ async def get_image_by_name(
async def get_image( async def get_image(
prefix: str, prefix: str,
file_names: Iterator[str] = Depends(find_file_names), file_names: Iterator[str] = Depends(find_file_names),
ttl_hash: int = Depends(get_ttl_hash),
) -> StreamingResponse: ) -> StreamingResponse:
file_names = list(file_names) file_names = list(file_names)
@ -124,7 +87,11 @@ async def get_image(
elif len(file_names) > 1: elif len(file_names) > 1:
raise HTTPException(status_code=status.HTTP_409_CONFLICT) raise HTTPException(status_code=status.HTTP_409_CONFLICT)
img_buffer = await get_image_by_name(file_names[0], ttl_hash) img_file = DavFile(f"img/{file_names[0]}")
img = Image.open(BytesIO(await img_file.bytes)).convert("RGB")
img_buffer = BytesIO()
img.save(img_buffer, format='JPEG', quality=85)
img_buffer.seek(0) img_buffer.seek(0)
return StreamingResponse( return StreamingResponse(

View file

@ -1,48 +1,31 @@
import logging
import re import re
from typing import Iterator from typing import Iterator
from fastapi import APIRouter, Depends from fastapi import APIRouter, Depends
from markdown import Markdown from markdown import markdown
from pydantic import BaseModel from pydantic import BaseModel
from .. import CLIENT
from ..config import SETTINGS from ..config import SETTINGS
from ..dav_file import DavFile from ..dav_file import DavFile
router = APIRouter(prefix="/text", tags=["text"]) router = APIRouter(prefix="/text", tags=["text"])
_logger = logging.getLogger(__name__)
_md = Markdown()
_message = ""
_ticker = ""
_title = ""
@router.on_event("startup")
async def on_startup() -> None:
global _message, _ticker, _title
_message = DavFile(CLIENT.resource("message.txt"))
_ticker = DavFile(CLIENT.resource("ticker.txt"))
_title = DavFile(CLIENT.resource("title.txt"))
DavFile.refresh(60)
_logger.debug("text router started")
@router.get("/message") @router.get("/message")
async def get_message() -> str: async def get_message() -> str:
return _md.convert( message = await DavFile("message.txt").string
str(_message)
return markdown(
message
) )
async def get_ticker_lines() -> Iterator[str]: async def get_ticker_lines() -> Iterator[str]:
ticker = await DavFile("ticker.txt").string
return ( return (
line.strip() line.strip()
for line in str(_ticker).split("\n") for line in ticker.split("\n")
if line.strip() if line.strip()
) )
@ -61,7 +44,7 @@ async def get_ticker_content_lines(
async def get_ticker_content( async def get_ticker_content(
ticker_content_lines: Iterator[str] = Depends(get_ticker_content_lines), ticker_content_lines: Iterator[str] = Depends(get_ticker_content_lines),
) -> str: ) -> str:
return _md.convert( return markdown(
SETTINGS.ticker_separator.join(ticker_content_lines) SETTINGS.ticker_separator.join(ticker_content_lines)
) )
@ -98,6 +81,8 @@ async def get_ticker_commands(
@router.get("/title") @router.get("/title")
async def get_title() -> str: async def get_title() -> str:
return _md.convert( title = await DavFile("title.txt").string
str(_title)
return markdown(
title
) )