refac: _common.ListManager

This commit is contained in:
Jörn-Michael Miehe 2023-10-26 15:58:02 +02:00
parent 7fb3aa0f42
commit c47773e70d
5 changed files with 101 additions and 56 deletions

View file

@ -7,7 +7,7 @@ from dataclasses import dataclass
from logging import getLogger
from typing import Awaitable, Callable, Generic, ParamSpec, TypeVar
from fastapi import HTTPException, status
from fastapi import Depends, HTTPException, status
from webdav3.exceptions import RemoteResourceNotFound
from ...core.config import get_config
@ -27,50 +27,97 @@ _RESPONSE_OK = {
Params = ParamSpec("Params")
Return = TypeVar("Return")
type _DepCallable[**Params, Return] = Callable[Params, Awaitable[Return]]
@dataclass(slots=True, frozen=True)
class Dependable(Generic[Params, Return]):
func: Callable[Params, Awaitable[Return]]
func: _DepCallable[Params, Return]
responses: dict
type _NDependable[Return] = Dependable[[], Return]
@dataclass(init=False, slots=True, frozen=True)
class ListManager:
lister: Dependable[[], list[str]]
filter: Dependable[[str], list[str]]
getter: Dependable[[str], str]
def __init__(
self,
lister: Dependable[[], list[str]],
) -> None:
object.__setattr__(self, "lister", lister)
async def _filter(
prefix: str,
names: list[str] = Depends(self.lister.func),
) -> list[str]:
return [item for item in names if item.lower().startswith(prefix.lower())]
object.__setattr__(
self, "filter", Dependable(func=_filter, responses=_RESPONSE_OK)
)
async def _getter(
names: list[str] = Depends(self.filter.func),
) -> str:
match names:
case [name]:
return name
case []:
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
case _:
raise HTTPException(status_code=status.HTTP_409_CONFLICT)
object.__setattr__(
self,
"getter",
Dependable(
func=_getter,
responses={
**_RESPONSE_OK,
status.HTTP_404_NOT_FOUND: {
"description": "Prefix not found",
"content": None,
},
status.HTTP_409_CONFLICT: {
"description": "Ambiguous prefix",
"content": None,
},
},
),
)
def get_remote_path(
path_name: str,
) -> _NDependable[str]:
) -> _DepCallable[[], str]:
async def _get_remote_path() -> str:
cfg = await get_config()
return getattr(cfg, path_name)
return Dependable(
func=_get_remote_path,
responses={**_RESPONSE_OK},
)
return _get_remote_path
def list_files(
rp: _DepCallable[[], str],
*,
path_name: str,
re: re.Pattern[str],
) -> _NDependable[list[str]]:
) -> Dependable[[], list[str]]:
"""
List files in remote `path` matching the RegEx `re`
"""
async def _list_files() -> list[str]:
cfg = await get_config()
path = getattr(cfg, path_name)
async def _list_files(
remote_path: str = Depends(rp),
) -> list[str]:
try:
return await WebDAV.list_files(path, regex=re)
return await WebDAV.list_files(remote_path, regex=re)
except RemoteResourceNotFound:
_logger.error(
"WebDAV path %s lost!",
repr(path),
)
_logger.error("WebDAV path %s lost!", repr(remote_path))
raise HTTPException(status_code=status.HTTP_404_NOT_FOUND)
return Dependable(
@ -78,7 +125,7 @@ def list_files(
responses={
**_RESPONSE_OK,
status.HTTP_404_NOT_FOUND: {
"description": f"{path_name!r} not found",
"description": "Remote path not found",
"content": None,
},
},
@ -102,7 +149,7 @@ def list_files(
def filter_prefix(
src: _NDependable[list[str]],
src: Dependable[[], list[str]],
) -> Dependable[[str], list[str]]:
"""
Filter names from an async source `src` for names starting with a given prefix.
@ -112,7 +159,9 @@ def filter_prefix(
prefix: str,
) -> list[str]:
return list(
item for item in (await src.func()) if item.lower().startswith(prefix.lower())
item
for item in (await src.func())
if item.lower().startswith(prefix.lower())
)
return Dependable(

View file

@ -16,32 +16,30 @@ from magic import Magic
from ...core.dav_common import webdav_ensure_files, webdav_ensure_path
from ...core.webdav import WebDAV
from ._common import filter_prefix, filter_prefix_unique, get_remote_path, list_files
from ._common import ListManager, get_remote_path, list_files
_logger = getLogger(__name__)
_magic = Magic(mime=True)
_PATH_NAME = "file_dir"
router = APIRouter(prefix="/file", tags=["file"])
_ls = list_files(
path_name=_PATH_NAME,
_rp = get_remote_path("file_dir")
_files = ListManager(
list_files(
rp=_rp,
re=re.compile(
r"[^/]$",
flags=re.IGNORECASE,
),
)
_rp = get_remote_path(path_name=_PATH_NAME)
_fp = filter_prefix(_ls)
_fpu = filter_prefix_unique(_fp)
)
@router.on_event("startup")
async def start_router() -> None:
_logger.debug(f"{router.prefix} router starting.")
remote_path = await _rp.func()
remote_path = await _rp()
if not webdav_ensure_path(remote_path):
webdav_ensure_files(
remote_path,
@ -52,33 +50,32 @@ async def start_router() -> None:
@router.get(
"/list",
responses=_ls.responses,
responses=_files.lister.responses,
)
async def list_all_files(
names: list[str] = Depends(_ls.func),
names: list[str] = Depends(_files.lister.func),
) -> list[str]:
return names
@router.get(
"/find/{prefix}",
responses=_fp.responses,
responses=_files.filter.responses,
)
async def find_files_by_prefix(
names: list[str] = Depends(_fp.func),
names: list[str] = Depends(_files.filter.func),
) -> list[str]:
return names
@router.get(
"/get/{prefix}",
responses=_fpu.responses,
responses=_files.getter.responses,
response_class=StreamingResponse,
)
async def get_file_by_prefix(
prefix: str,
remote_path: str = Depends(_rp.func),
name: str = Depends(_fpu.func),
remote_path: str = Depends(_rp),
name: str = Depends(_files.getter.func),
) -> StreamingResponse:
buffer = BytesIO(await WebDAV.read_bytes(f"{remote_path}/{name}"))
@ -88,5 +85,5 @@ async def get_file_by_prefix(
return StreamingResponse(
content=buffer,
media_type=mime,
headers={"Content-Disposition": f"filename={prefix}"},
headers={"Content-Disposition": f"filename={name}"},
)

View file

@ -24,15 +24,15 @@ _PATH_NAME = "image_dir"
router = APIRouter(prefix="/image", tags=["image"])
_rp = get_remote_path(path_name=_PATH_NAME)
_ls = list_files(
path_name=_PATH_NAME,
rp=_rp,
re=re.compile(
r"\.(gif|jpe?g|tiff?|png|bmp)$",
flags=re.IGNORECASE,
),
)
_rp = get_remote_path(path_name=_PATH_NAME)
_fp = filter_prefix(_ls)
_fpu = filter_prefix_unique(_fp)
@ -41,7 +41,7 @@ _fpu = filter_prefix_unique(_fp)
async def start_router() -> None:
_logger.debug(f"{router.prefix} router starting.")
remote_path = await _rp.func()
remote_path = await _rp()
if not webdav_ensure_path(remote_path):
webdav_ensure_files(
remote_path,
@ -77,8 +77,7 @@ async def find_images_by_prefix(
response_class=StreamingResponse,
)
async def get_image_by_prefix(
prefix: str,
remote_path: str = Depends(_rp.func),
remote_path: str = Depends(_rp),
name: str = Depends(_fpu.func),
) -> StreamingResponse:
cfg = await get_config()
@ -91,7 +90,7 @@ async def get_image_by_prefix(
return StreamingResponse(
content=img_buffer,
media_type="image/jpeg",
headers={"Content-Disposition": f"filename={prefix}.jpg"},
headers={"Content-Disposition": f"filename={name}.jpg"},
)

View file

@ -22,15 +22,15 @@ _PATH_NAME = "text_dir"
router = APIRouter(prefix="/text", tags=["text"])
_rp = get_remote_path(path_name=_PATH_NAME)
_ls = list_files(
path_name=_PATH_NAME,
rp=_rp,
re=re.compile(
r"\.(txt|md)$",
flags=re.IGNORECASE,
),
)
_rp = get_remote_path(path_name=_PATH_NAME)
_fp = filter_prefix(_ls)
_fpu = filter_prefix_unique(_fp)
@ -39,7 +39,7 @@ _fpu = filter_prefix_unique(_fp)
async def start_router() -> None:
_logger.debug(f"{router.prefix} router starting.")
remote_path = await _rp.func()
remote_path = await _rp()
if not webdav_ensure_path(remote_path):
webdav_ensure_files(
remote_path,
@ -70,7 +70,7 @@ async def find_texts_by_prefix(
async def _get_raw_text_by_prefix(
remote_path: str = Depends(_rp.func),
remote_path: str = Depends(_rp),
name: str = Depends(_fpu.func),
) -> str:
return await WebDAV.read_str(f"{remote_path}/{name}")

View file

@ -26,7 +26,7 @@ router = APIRouter(prefix="/ticker", tags=["text"])
async def start_router() -> None:
_logger.debug(f"{router.prefix} router starting.")
remote_path = await _rp.func()
remote_path = await _rp()
if not webdav_ensure_path(remote_path):
webdav_ensure_files(
remote_path,
@ -37,7 +37,7 @@ async def start_router() -> None:
async def get_ticker_lines() -> Iterator[str]:
cfg = await get_config()
file_name = await _fpu.func(cfg.ticker.file_name)
remote_path = await _rp.func()
remote_path = await _rp()
ticker = await WebDAV.read_str(f"{remote_path}/{file_name}")