""" Some useful helpers for working in async contexts. """ from asyncio import get_running_loop from functools import partial, wraps from typing import Awaitable, Callable, TypeVar RT = TypeVar("RT") def run_in_executor( function: Callable[..., RT] ) -> Callable[..., Awaitable[RT]]: """ Decorator to make blocking a function call asyncio compatible. https://stackoverflow.com/questions/41063331/how-to-use-asyncio-with-existing-blocking-library/ https://stackoverflow.com/a/53719009 """ @wraps(function) async def wrapper(*args, **kwargs) -> RT: loop = get_running_loop() return await loop.run_in_executor( None, partial(function, *args, **kwargs), ) return wrapper