45 lines
1 KiB
Python
45 lines
1 KiB
Python
|
from asyncio import get_running_loop
|
||
|
from functools import partial, wraps
|
||
|
from time import time
|
||
|
|
||
|
from async_lru import alru_cache
|
||
|
|
||
|
|
||
|
def run_in_executor(f):
|
||
|
"""
|
||
|
Decorator to make blocking function call asyncio compatible
|
||
|
https://stackoverflow.com/questions/41063331/how-to-use-asyncio-with-existing-blocking-library/
|
||
|
"""
|
||
|
|
||
|
@wraps(f)
|
||
|
def inner(*args, **kwargs):
|
||
|
loop = get_running_loop()
|
||
|
return loop.run_in_executor(
|
||
|
None,
|
||
|
partial(f, *args, **kwargs),
|
||
|
)
|
||
|
|
||
|
return inner
|
||
|
|
||
|
|
||
|
def get_ttl_hash(seconds: int = 20) -> int:
|
||
|
"""
|
||
|
Return the same value within `seconds` time period
|
||
|
https://stackoverflow.com/a/55900800
|
||
|
"""
|
||
|
return round(time() / seconds)
|
||
|
|
||
|
|
||
|
def timed_alru_cache(**decorator_kwargs):
|
||
|
def decorate(f):
|
||
|
@alru_cache(**decorator_kwargs)
|
||
|
@wraps(f)
|
||
|
async def wrapper(ttl_hash: int, *args, **kwargs):
|
||
|
del ttl_hash
|
||
|
|
||
|
return await f(*args, **kwargs)
|
||
|
|
||
|
return wrapper
|
||
|
|
||
|
return decorate
|