ovdashboard/api/ovdashboard_api/async_helpers.py

27 lines
751 B
Python

"""
Some useful helpers for working in async contexts.
"""
from asyncio import get_running_loop
from functools import partial, wraps
from typing import Awaitable, Callable, TypeVar
RT = TypeVar("RT")
def run_in_executor(function: Callable[..., RT]) -> Callable[..., Awaitable[RT]]:
"""
Decorator to make blocking a function call asyncio compatible.
https://stackoverflow.com/questions/41063331/how-to-use-asyncio-with-existing-blocking-library/
https://stackoverflow.com/a/53719009
"""
@wraps(function)
async def wrapper(*args, **kwargs) -> RT:
loop = get_running_loop()
return await loop.run_in_executor(
None,
partial(function, *args, **kwargs),
)
return wrapper