kiwi-scp/kiwi_scp/services.py

76 lines
1.9 KiB
Python
Raw Normal View History

2022-01-27 16:57:15 +00:00
import subprocess
2022-01-27 14:26:10 +00:00
from pathlib import Path
2022-02-22 23:21:55 +00:00
from typing import List, Generator, Optional, TYPE_CHECKING
2021-12-02 16:08:14 +00:00
import attr
2022-01-27 16:57:15 +00:00
from .rootkit import Rootkit
2022-01-24 17:06:11 +00:00
from .yaml import YAML
2021-12-02 16:08:14 +00:00
2022-01-27 14:26:10 +00:00
if TYPE_CHECKING:
2022-01-27 16:57:15 +00:00
from .project import Project
2022-01-27 14:26:10 +00:00
from .service import Service
2021-12-02 16:08:14 +00:00
@attr.s
class Services:
2022-01-27 14:26:10 +00:00
content: List["Service"] = attr.ib()
2021-12-02 16:08:14 +00:00
def __str__(self) -> str:
return YAML().dump({
"services": {
service.name: service.content
for service in self.content
2022-01-27 16:57:15 +00:00
},
"configs": [
str(config)
for config in self.configs
],
2021-12-02 16:08:14 +00:00
}).strip()
def __bool__(self) -> bool:
return bool(self.content)
2022-01-27 14:26:10 +00:00
@property
2022-01-27 16:57:15 +00:00
def parent_project(self) -> Optional["Project"]:
2022-01-27 14:26:10 +00:00
if not self:
return
2022-01-27 16:57:15 +00:00
return self.content[0].parent_project
2022-01-27 14:26:10 +00:00
@property
def configs(self) -> Generator[Path, None, None]:
for service in self.content:
yield from service.configs
2022-01-27 16:57:15 +00:00
def copy_configs(self) -> None:
project = self.parent_project
2022-02-22 23:21:29 +00:00
configs = list(self.configs)
2022-01-27 16:57:15 +00:00
2022-02-22 23:21:29 +00:00
if project is None or not configs:
2022-01-27 16:57:15 +00:00
return
instance = project.parent_instance
Rootkit("rsync").run([
2022-02-22 23:21:29 +00:00
"mkdir", "-p", instance.storage_directory
2022-01-27 16:57:15 +00:00
], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
Rootkit("rsync").run([
2022-02-22 23:21:29 +00:00
"rsync", "-rpt", instance.config_directory, instance.storage_directory
2022-01-27 16:57:15 +00:00
], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
2022-01-24 17:07:19 +00:00
2021-12-02 16:08:14 +00:00
@property
def names(self) -> Generator[str, None, None]:
return (
service.name
for service in self.content
)
def filter_existing(self, service_names: List[str]) -> "Services":
return Services([
service
for service in self.content
if service.name in service_names
2022-01-24 17:06:11 +00:00
])