improved typing

This commit is contained in:
Jörn-Michael Miehe 2022-08-31 02:22:57 +00:00
parent 3ffc72f065
commit 6859e29986

View file

@ -1,7 +1,7 @@
import logging
from io import BytesIO
from threading import Lock
from typing import Any
from typing import Any, Optional
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from webdav3.client import Client, Resource
@ -10,9 +10,8 @@ _logger = logging.getLogger(__name__)
class DavFile:
__instances = None
__instances: Optional[list["DavFile"]] = None
__scheduler = None
__job = None
def __init__(self, client: Client, path: Any) -> None:
self.__resource: Resource = client.resource(path)
@ -25,7 +24,7 @@ class DavFile:
DavFile.__instances.append(self)
def update(self) -> None:
def download(self) -> None:
_logger.info(f"updating {self.__resource}")
with self.__lock:
self.__buffer.seek(0)
@ -33,8 +32,8 @@ class DavFile:
self.__resource.write_to(self.__buffer)
@classmethod
def refresh(cls, refresh_interval: int = 60):
if cls.__job is not None:
def refresh(cls, refresh_interval: int = 60) -> None:
if cls.__scheduler is not None:
cls.__scheduler.reschedule_job(
job_id=cls.__name__,
trigger="interval",
@ -44,13 +43,13 @@ class DavFile:
def tick() -> None:
for davfile in DavFile.__instances:
davfile.update()
davfile.download()
cls.__scheduler = AsyncIOScheduler()
cls.__scheduler.start()
cls.__scheduler.add_job(tick)
cls.__job = cls.__scheduler.add_job(
cls.__scheduler.add_job(
tick,
id=cls.__name__,
trigger="interval",