mirror of
https://code.lenaisten.de/Lenaisten/advent22.git
synced 2024-11-23 00:03:07 +00:00
69 lines
1.5 KiB
Python
69 lines
1.5 KiB
Python
import asyncio
|
|
import functools
|
|
from contextlib import asynccontextmanager, contextmanager
|
|
from io import BytesIO
|
|
from typing import AsyncContextManager, AsyncIterator, ContextManager, Iterator
|
|
|
|
from cache import AsyncLRU
|
|
|
|
#
|
|
# sync impl
|
|
#
|
|
|
|
|
|
def get_buf_sync() -> ContextManager[BytesIO]:
|
|
if getattr(get_buf_sync, "inner", None) is None:
|
|
|
|
@functools.lru_cache
|
|
def inner() -> bytes:
|
|
print("sync open")
|
|
with open(__file__, "rb") as file:
|
|
return file.readline()
|
|
|
|
setattr(get_buf_sync, "inner", inner)
|
|
|
|
@contextmanager
|
|
def ctx() -> Iterator[BytesIO]:
|
|
yield BytesIO(get_buf_sync.inner())
|
|
|
|
return ctx()
|
|
|
|
|
|
def main_sync() -> None:
|
|
for _ in range(2):
|
|
with get_buf_sync() as buffer:
|
|
print(buffer.read())
|
|
|
|
|
|
#
|
|
# async impl
|
|
#
|
|
|
|
|
|
async def get_buf_async() -> AsyncContextManager[BytesIO]:
|
|
if getattr(get_buf_async, "inner", None) is None:
|
|
|
|
@AsyncLRU()
|
|
async def inner() -> bytes:
|
|
print("async open")
|
|
with open(__file__, "rb") as file:
|
|
return file.readline()
|
|
|
|
setattr(get_buf_async, "inner", inner)
|
|
|
|
@asynccontextmanager
|
|
async def ctx() -> AsyncIterator[BytesIO]:
|
|
yield BytesIO(await get_buf_async.inner())
|
|
|
|
return ctx()
|
|
|
|
|
|
async def main_async() -> None:
|
|
for _ in range(2):
|
|
async with await get_buf_async() as buffer:
|
|
print(buffer.read())
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main_sync()
|
|
asyncio.run(main_async())
|