import logging import sys from enum import Enum, auto from typing import TypeVar, Iterable, Type, Optional, List import click from ..instance import Instance from ..project import Project from ..services import Services from ..wstring import WParagraph, WAlignment _logger = logging.getLogger(__name__) class KiwiCommandType(Enum): INSTANCE = auto() PROJECT = auto() PROJECTS = auto() SERVICES = auto() T = TypeVar("T") class KiwiCommandNotImplementedError(Exception): pass class KiwiCommand: type: KiwiCommandType = KiwiCommandType.SERVICES enabled_only: bool = False @staticmethod def print_header(header: str) -> None: click.secho(header, fg="green", bold=True) @staticmethod def print_error(error: str) -> None: click.secho(error, file=sys.stderr, fg="red", bold=True) @staticmethod def print_list(content: Iterable[str]) -> None: for item in content: click.echo(click.style(" - ", fg="green") + click.style(item, fg="blue")) @staticmethod def user_query(description: str, default: T, cast_to: Type[T] = str) -> T: # prompt user as per argument while True: try: prompt = \ click.style(f"Enter {description} [", fg="green") + \ click.style(default, fg="blue") + \ click.style("] ", fg="green") str_value = input(prompt).strip() if str_value: return cast_to(str_value) else: return default except EOFError: click.echo("Input aborted.") return default except Exception as e: click.echo(f"Invalid input: {e}") @staticmethod def danger_confirm(*prompt_lines: str, default: Optional[bool] = None) -> bool: if default is True: suffix = "[YES|no]" default_answer = "yes" elif default is False: suffix = "[yes|NO]" default_answer = "no" else: suffix = "[yes|no]" default_answer = None dumb = WParagraph.from_strings( click.style("WARNING", bold=True, underline=True, blink=True, fg="red"), click.style("ここにゴミ", fg="cyan"), click.style("を捨てないで下さい", fg="cyan"), click.style("DO NOT DUMB HERE", fg="yellow"), click.style("NO DUMB AREA", fg="yellow"), ).align(WAlignment.CENTER).surround("!") prompt = WParagraph.from_strings(*prompt_lines).align(WAlignment.LEFT).emphasize(3) answer = input( f"{dumb}\n\n" f"{prompt}\n\n" f"Are you sure you want to proceed? {suffix}: " ).strip().lower() if not answer: answer = default_answer while answer not in ["yes", "no"]: answer = input("Please type 'yes' or 'no' explicitly: ").strip().lower() return answer == "yes" @classmethod def run(cls, instance: Instance, project_names: List[str], service_names: List[str], **kwargs) -> None: _logger.debug(f"{instance.directory!r}: {project_names!r}, {service_names!r}") projects = instance.get_projects(project_names) if not projects: # run for whole instance _logger.debug(f"running for instance, kwargs={kwargs}") cls.run_for_instance(instance, **kwargs) elif not service_names: # run for entire project(s) for project_name, project in projects.items(): if project is None: _logger.debug(f"running for new project {project_name}, kwargs={kwargs}") cls.run_for_new_project(instance, project_name, **kwargs) else: if cls.enabled_only and not project.config.enabled: cls.print_error(f"Can't interact with disabled project {project_name}!") return _logger.debug(f"running for project {project.name}, kwargs={kwargs}") cls.run_for_project(instance, project, **kwargs) else: # run for some services project_name = list(projects)[0] project = projects[project_name] if project is None: cls.print_error(f"Project '{project_name}' not in kiwi-scp instance at '{instance.directory}'!") else: if cls.enabled_only and not project.config.enabled: cls.print_error(f"Can't interact with disabled project {project_name}!") return _logger.debug(f"running for services {service_names} in project {project_name}, kwargs={kwargs}") cls.run_for_services(instance, project, service_names, **kwargs) @classmethod def run_for_instance(cls, instance: Instance, **kwargs) -> None: for project in instance.projects: if cls.enabled_only and not project.config.enabled: cls.print_header(f"Skipping disabled project {project.name}") continue cls.run_for_project(instance, project, **kwargs) @classmethod def run_for_project(cls, instance: Instance, project: Project, **kwargs) -> None: service_names = [service.name for service in project.services.content] cls.run_for_services(instance, project, service_names, **kwargs) @classmethod def run_for_new_project(cls, instance: Instance, project_name: str, **kwargs) -> None: cls.print_error(f"Project '{project_name}' not in kiwi-scp instance at '{instance.directory}'!") @classmethod def run_for_services(cls, instance: Instance, project: Project, service_names: List[str], **kwargs) -> None: services = project.services.filter_existing(service_names) new_service_names = [ service_name for service_name in service_names if service_name not in list(services.names) ] cls.run_for_filtered_services(instance, project, services, new_service_names, **kwargs) @classmethod def run_for_filtered_services(cls, instance: Instance, project: Project, services: Services, new_service_names: List[str], **kwargs) -> None: raise KiwiCommandNotImplementedError()