kiwi-simple-metrics/kiwi_simple_metrics/metrics/_report.py

133 lines
3.1 KiB
Python
Raw Normal View History

2023-09-01 22:56:20 +00:00
import urllib.parse
from dataclasses import dataclass
2023-08-31 22:10:33 +00:00
from typing import Any, Callable, Iterator, Self
2023-08-31 11:17:19 +00:00
2023-09-01 22:56:20 +00:00
import requests
2023-08-31 22:25:18 +00:00
from ..settings import SETTINGS, MetricSettings
2023-08-31 11:17:19 +00:00
@dataclass(slots=True, kw_only=True)
class ReportData:
name: str
value: float
threshold: float
inverted: bool
format: str
@classmethod
def from_settings(
cls, *,
name: str,
value: float,
settings: MetricSettings,
) -> Self:
return cls(
name=name,
value=value,
threshold=settings.threshold,
inverted=settings.inverted,
format=settings.report,
)
@classmethod
def from_free_total(
cls, *,
name: str,
free: float,
total: float,
settings: MetricSettings,
) -> Self:
return cls.from_settings(
name=name,
value=(total - free) / total * 100,
settings=settings,
)
@property
def report(self) -> "Report":
return Report(
result=self.format.format(
name=self.name,
value=self.value,
),
failed=(
self.value > self.threshold and not self.inverted
or self.value < self.threshold and self.inverted
),
)
@dataclass(slots=True, kw_only=True, frozen=True)
2023-08-31 11:17:19 +00:00
class Report:
result: str
failed: bool = False
2023-08-31 11:17:19 +00:00
2023-08-31 23:09:50 +00:00
def __str__(self) -> str:
state = "OK" if not self.failed else "FAIL"
2023-08-31 23:49:02 +00:00
return SETTINGS.log.format.format(
2023-08-31 23:09:50 +00:00
state=state,
result=self.result,
)
2023-08-31 22:10:33 +00:00
@classmethod
2023-08-31 23:49:02 +00:00
def summary(cls, *_reports: Any) -> Self:
2023-08-31 22:10:33 +00:00
reports = [
report
for report in _reports
if isinstance(report, Report)
]
return cls(
2023-08-31 22:25:18 +00:00
result=SETTINGS.separator.join(
2023-08-31 22:10:33 +00:00
report.result
for report in reports
),
failed=any(
report.failed
for report in reports
),
)
2023-08-31 11:17:19 +00:00
@classmethod
def aggregate(
2023-08-31 11:17:19 +00:00
cls, *,
settings: MetricSettings,
get_data: Callable[[], Iterator[ReportData]],
) -> Self | None:
if not settings.enabled:
return None
reports = [data.report for data in get_data()]
2023-08-31 11:17:19 +00:00
if not reports:
return None
return cls(
result=settings.report_outer.format(
name=settings.name,
2023-08-31 22:25:18 +00:00
inner=SETTINGS.separator.join(
report.result
for report in reports[:settings.count]
),
),
failed=any(
report.failed
for report in reports
),
)
2023-09-01 22:56:20 +00:00
def push_webhook(self) -> None:
if (url := SETTINGS.webhook.url if not self.failed
else SETTINGS.webhook.fail) is None:
return
requests.get(
url=str(url).format(
2023-09-02 16:30:54 +00:00
result=urllib.parse.quote_plus(self.result)
2023-09-01 22:56:20 +00:00
),
verify=not SETTINGS.webhook.insecure,
)