ovdashboard/api/ovdashboard_api/async_helpers.py
2022-09-05 23:30:06 +02:00

54 lines
1.2 KiB
Python

"""
Some useful helpers for working in async contexts.
"""
from asyncio import get_running_loop
from functools import partial, wraps
from time import time
from async_lru import alru_cache
def run_in_executor(f):
"""
Decorator to make blocking a function call asyncio compatible.
https://stackoverflow.com/questions/41063331/how-to-use-asyncio-with-existing-blocking-library/
"""
@wraps(f)
def inner(*args, **kwargs):
loop = get_running_loop()
return loop.run_in_executor(
None,
partial(f, *args, **kwargs),
)
return inner
def get_ttl_hash(seconds: int = 20) -> int:
"""
Return the same value within `seconds` time period.
https://stackoverflow.com/a/55900800
"""
return round(time() / seconds)
def timed_alru_cache(**decorator_kwargs):
"""
Decorator which adds an (unused) param `ttl_hash`
and the `@alru_cache` annotation to a function.
"""
def decorate(f):
@alru_cache(**decorator_kwargs)
@wraps(f)
async def wrapper(ttl_hash: int, *args, **kwargs):
del ttl_hash
return await f(*args, **kwargs)
return wrapper
return decorate