2021-12-02 16:19:14 +00:00
|
|
|
import logging
|
|
|
|
import sys
|
|
|
|
from enum import Enum, auto
|
|
|
|
from typing import TypeVar, Iterable, Type, Optional, List
|
|
|
|
|
|
|
|
import click
|
|
|
|
|
2021-12-02 16:33:07 +00:00
|
|
|
from ..instance import Instance
|
|
|
|
from ..project import Project
|
|
|
|
from ..services import Services
|
|
|
|
from ..wstring import WParagraph, WAlignment
|
2021-12-02 16:19:14 +00:00
|
|
|
|
|
|
|
_logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
|
|
|
class KiwiCommandType(Enum):
|
|
|
|
INSTANCE = auto()
|
|
|
|
PROJECT = auto()
|
|
|
|
PROJECTS = auto()
|
|
|
|
SERVICES = auto()
|
|
|
|
|
|
|
|
|
|
|
|
T = TypeVar("T")
|
|
|
|
|
|
|
|
|
|
|
|
class KiwiCommand:
|
|
|
|
type: KiwiCommandType = KiwiCommandType.SERVICES
|
|
|
|
enabled_only: bool = False
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def print_header(header: str) -> None:
|
|
|
|
click.secho(header, fg="green", bold=True)
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def print_error(error: str) -> None:
|
|
|
|
click.secho(error, file=sys.stderr, fg="red", bold=True)
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def print_list(content: Iterable[str]) -> None:
|
|
|
|
for item in content:
|
|
|
|
click.echo(click.style(" - ", fg="green") + click.style(item, fg="blue"))
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def user_query(description: str, default: T, cast_to: Type[T] = str) -> T:
|
|
|
|
# prompt user as per argument
|
|
|
|
while True:
|
|
|
|
try:
|
|
|
|
prompt = \
|
|
|
|
click.style(f"Enter {description} [", fg="green") + \
|
|
|
|
click.style(default, fg="blue") + \
|
|
|
|
click.style("] ", fg="green")
|
|
|
|
str_value = input(prompt).strip()
|
|
|
|
if str_value:
|
|
|
|
return cast_to(str_value)
|
|
|
|
else:
|
|
|
|
return default
|
|
|
|
|
|
|
|
except EOFError:
|
|
|
|
click.echo("Input aborted.")
|
|
|
|
return default
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
click.echo(f"Invalid input: {e}")
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def danger_confirm(*prompt_lines: str, default: Optional[bool] = None) -> bool:
|
|
|
|
if default is True:
|
|
|
|
suffix = "[YES|no]"
|
|
|
|
default_answer = "yes"
|
|
|
|
|
|
|
|
elif default is False:
|
|
|
|
suffix = "[yes|NO]"
|
|
|
|
default_answer = "no"
|
|
|
|
|
|
|
|
else:
|
|
|
|
suffix = "[yes|no]"
|
|
|
|
default_answer = None
|
|
|
|
|
|
|
|
dumb = WParagraph.from_strings(
|
|
|
|
click.style("WARNING", bold=True, underline=True, blink=True, fg="red"),
|
|
|
|
click.style("ここにゴミ", fg="cyan"),
|
|
|
|
click.style("を捨てないで下さい", fg="cyan"),
|
|
|
|
click.style("DO NOT DUMB HERE", fg="yellow"),
|
|
|
|
click.style("NO DUMB AREA", fg="yellow"),
|
|
|
|
).align(WAlignment.CENTER).surround("!")
|
|
|
|
|
|
|
|
prompt = WParagraph.from_strings(*prompt_lines).align(WAlignment.LEFT).emphasize(3)
|
|
|
|
|
|
|
|
answer = input(
|
|
|
|
f"{dumb}\n\n"
|
|
|
|
f"{prompt}\n\n"
|
|
|
|
f"Are you sure you want to proceed? {suffix}: "
|
|
|
|
).strip().lower()
|
|
|
|
|
|
|
|
if not answer:
|
|
|
|
answer = default_answer
|
|
|
|
|
|
|
|
while answer not in ["yes", "no"]:
|
|
|
|
answer = input("Please type 'yes' or 'no' explicitly: ").strip().lower()
|
|
|
|
|
|
|
|
return answer == "yes"
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def run(cls, instance: Instance, project_names: List[str], service_names: List[str], **kwargs) -> None:
|
|
|
|
|
|
|
|
_logger.debug(f"{instance.directory!r}: {project_names!r}, {service_names!r}")
|
|
|
|
|
2021-12-02 18:32:38 +00:00
|
|
|
projects = instance.get_projects(project_names)
|
2021-12-02 16:19:14 +00:00
|
|
|
|
|
|
|
if not projects:
|
|
|
|
# run for whole instance
|
|
|
|
_logger.debug(f"running for instance, kwargs={kwargs}")
|
|
|
|
cls.run_for_instance(instance, **kwargs)
|
|
|
|
|
|
|
|
elif not service_names:
|
|
|
|
# run for entire project(s)
|
2021-12-02 18:32:38 +00:00
|
|
|
for project_name, project in projects.items():
|
2021-12-02 16:19:14 +00:00
|
|
|
if project is None:
|
|
|
|
_logger.debug(f"running for new project {project_name}, kwargs={kwargs}")
|
|
|
|
cls.run_for_new_project(instance, project_name, **kwargs)
|
|
|
|
|
|
|
|
else:
|
|
|
|
if cls.enabled_only and not project.config.enabled:
|
|
|
|
cls.print_error(f"Can't interact with disabled project {project_name}!")
|
|
|
|
return
|
|
|
|
|
|
|
|
_logger.debug(f"running for project {project.name}, kwargs={kwargs}")
|
|
|
|
cls.run_for_project(instance, project, **kwargs)
|
|
|
|
|
|
|
|
else:
|
|
|
|
# run for some services
|
2021-12-02 18:32:38 +00:00
|
|
|
project_name = list(projects)[0]
|
|
|
|
project = projects[project_name]
|
2021-12-02 16:19:14 +00:00
|
|
|
|
|
|
|
if project is None:
|
|
|
|
cls.print_error(f"Project '{project_name}' not in kiwi-scp instance at '{instance.directory}'!")
|
|
|
|
|
|
|
|
else:
|
|
|
|
if cls.enabled_only and not project.config.enabled:
|
|
|
|
cls.print_error(f"Can't interact with disabled project {project_name}!")
|
|
|
|
return
|
|
|
|
|
|
|
|
_logger.debug(f"running for services {service_names} in project {project_name}, kwargs={kwargs}")
|
|
|
|
cls.run_for_services(instance, project, service_names, **kwargs)
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def run_for_instance(cls, instance: Instance, **kwargs) -> None:
|
|
|
|
for project in instance.projects:
|
|
|
|
if cls.enabled_only and not project.config.enabled:
|
|
|
|
cls.print_header(f"Skipping disabled project {project.name}")
|
|
|
|
continue
|
|
|
|
|
|
|
|
cls.run_for_project(instance, project, **kwargs)
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def run_for_project(cls, instance: Instance, project: Project, **kwargs) -> None:
|
|
|
|
service_names = [service.name for service in project.services.content]
|
|
|
|
cls.run_for_services(instance, project, service_names, **kwargs)
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def run_for_new_project(cls, instance: Instance, project_name: str, **kwargs) -> None:
|
|
|
|
cls.print_error(f"Project '{project_name}' not in kiwi-scp instance at '{instance.directory}'!")
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def run_for_services(cls, instance: Instance, project: Project, service_names: List[str], **kwargs) -> None:
|
|
|
|
services = project.services.filter_existing(service_names)
|
|
|
|
|
|
|
|
new_service_names = [
|
|
|
|
service_name
|
|
|
|
for service_name
|
|
|
|
in service_names
|
|
|
|
if service_name not in list(services.names)
|
|
|
|
]
|
|
|
|
|
|
|
|
cls.run_for_filtered_services(instance, project, services, new_service_names, **kwargs)
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
def run_for_filtered_services(cls, instance: Instance, project: Project, services: Services,
|
|
|
|
new_service_names: List[str], **kwargs) -> None:
|
|
|
|
raise Exception
|