kiwi-simple-metrics/kiwi_simple_metrics/metrics/_report.py

83 lines
2 KiB
Python
Raw Normal View History

from dataclasses import dataclass
2023-08-31 22:10:33 +00:00
from typing import Any, Callable, Iterator, Self
2023-08-31 11:17:19 +00:00
from ..settings import MetricSettings
@dataclass(slots=True, kw_only=True)
class ReportData:
name: str
value: float
@classmethod
def from_free_total(cls, *, name: str, free: float, total: float) -> Self:
return cls(
name=name,
value=(total - free) / total * 100,
)
def report(self, settings: MetricSettings) -> "Report":
return Report(
result=settings.report.format(
name=self.name,
value=self.value,
),
failed=(
self.value > settings.threshold and not settings.inverted
or self.value < settings.threshold and settings.inverted
),
)
@dataclass(slots=True, kw_only=True, frozen=True)
2023-08-31 11:17:19 +00:00
class Report:
result: str
failed: bool = False
2023-08-31 11:17:19 +00:00
2023-08-31 22:10:33 +00:00
@classmethod
def concat(cls, *_reports: Any) -> Self:
reports = [
report
for report in _reports
if isinstance(report, Report)
]
return cls(
result=", ".join(
report.result
for report in reports
),
failed=any(
report.failed
for report in reports
),
)
2023-08-31 11:17:19 +00:00
@classmethod
def aggregate(
2023-08-31 11:17:19 +00:00
cls, *,
settings: MetricSettings,
get_data: Callable[[], Iterator[ReportData]],
) -> Self | None:
if not settings.enabled:
return None
reports = [
data.report(settings)
for data in get_data()
]
2023-08-31 11:17:19 +00:00
return cls(
result=settings.report_outer.format(
name=settings.name,
inner=", ".join(
report.result
for report in reports[:settings.count]
),
),
failed=any(
report.failed
for report in reports
),
)