ovdashboard/api/ovdashboard_api/async_helpers.py

30 lines
757 B
Python
Raw Normal View History

2022-09-05 12:54:02 +00:00
"""
Some useful helpers for working in async contexts.
"""
2022-09-04 22:30:40 +00:00
from asyncio import get_running_loop
from functools import partial, wraps
2022-09-08 00:24:36 +00:00
from typing import Awaitable, Callable, TypeVar
2022-09-04 22:30:40 +00:00
2022-09-08 00:24:36 +00:00
RT = TypeVar("RT")
2022-09-04 22:30:40 +00:00
2022-09-08 00:24:36 +00:00
def run_in_executor(
function: Callable[..., RT]
) -> Callable[..., Awaitable[RT]]:
2022-09-04 22:30:40 +00:00
"""
2022-09-05 12:54:02 +00:00
Decorator to make blocking a function call asyncio compatible.
2022-09-04 22:30:40 +00:00
https://stackoverflow.com/questions/41063331/how-to-use-asyncio-with-existing-blocking-library/
2022-09-05 13:02:15 +00:00
https://stackoverflow.com/a/53719009
2022-09-04 22:30:40 +00:00
"""
2022-09-08 00:24:36 +00:00
@wraps(function)
async def wrapper(*args, **kwargs) -> RT:
2022-09-04 22:30:40 +00:00
loop = get_running_loop()
2022-09-05 13:02:15 +00:00
return await loop.run_in_executor(
2022-09-04 22:30:40 +00:00
None,
2022-09-08 00:24:36 +00:00
partial(function, *args, **kwargs),
2022-09-04 22:30:40 +00:00
)
2022-09-05 13:02:15 +00:00
return wrapper