1
0
Fork 0
mirror of https://github.com/yavook/kiwi-scp.git synced 2024-11-22 21:03:00 +00:00
kiwi-scp/kiwi_scp/commands/cli.py
2021-12-02 17:08:14 +01:00

213 lines
7.2 KiB
Python

import importlib
import logging
import os
import sys
from enum import Enum, auto
from typing import List, Iterable, Type, Optional, TypeVar
import click
from ..instance import Instance
from ..project import Project
from ..services import Services
from ..wstring import WParagraph, WAlignment
_logger = logging.getLogger(__name__)
class KiwiCLI(click.MultiCommand):
"""Command Line Interface spread over multiple files in this directory"""
def list_commands(self, ctx: click.Context) -> List[str]:
"""list all the commands defined by cmd_*.py files in this directory"""
return [
filename[4:-3]
for filename in os.listdir(os.path.abspath(os.path.dirname(__file__)))
if filename.startswith("cmd_") and filename.endswith(".py")
]
def get_command(self, ctx: click.Context, cmd_name: str) -> Optional[click.Command]:
"""import and return a specific command"""
try:
cmd_module = importlib.import_module(f"kiwi_scp.commands.cmd_{cmd_name}")
except ImportError:
return
for cmd_name in dir(cmd_module):
member = getattr(cmd_module, cmd_name)
if isinstance(member, click.Command):
return member
class KiwiCommandType(Enum):
INSTANCE = auto()
PROJECT = auto()
PROJECTS = auto()
SERVICES = auto()
T = TypeVar("T")
class KiwiCommand:
type: KiwiCommandType = KiwiCommandType.SERVICES
enabled_only: bool = False
@staticmethod
def print_header(header: str) -> None:
click.secho(header, fg="green", bold=True)
@staticmethod
def print_error(error: str) -> None:
click.secho(error, file=sys.stderr, fg="red", bold=True)
@staticmethod
def print_list(content: Iterable[str]) -> None:
for item in content:
click.echo(click.style(" - ", fg="green") + click.style(item, fg="blue"))
@staticmethod
def user_query(description: str, default: T, cast_to: Type[T] = str) -> T:
# prompt user as per argument
while True:
try:
prompt = \
click.style(f"Enter {description} [", fg="green") + \
click.style(default, fg="blue") + \
click.style("] ", fg="green")
str_value = input(prompt).strip()
if str_value:
return cast_to(str_value)
else:
return default
except EOFError:
click.echo("Input aborted.")
return default
except Exception as e:
click.echo(f"Invalid input: {e}")
@staticmethod
def danger_confirm(*prompt_lines: str, default: Optional[bool] = None) -> bool:
if default is True:
suffix = "[YES|no]"
default_answer = "yes"
elif default is False:
suffix = "[yes|NO]"
default_answer = "no"
else:
suffix = "[yes|no]"
default_answer = None
dumb = WParagraph.from_strings(
click.style("WARNING", bold=True, underline=True, blink=True, fg="red"),
click.style("ここにゴミ", fg="cyan"),
click.style("を捨てないで下さい", fg="cyan"),
click.style("DO NOT DUMB HERE", fg="yellow"),
click.style("NO DUMB AREA", fg="yellow"),
).align(WAlignment.CENTER).surround("!")
prompt = WParagraph.from_strings(*prompt_lines).align(WAlignment.LEFT).emphasize(3)
answer = input(
f"{dumb}\n\n"
f"{prompt}\n\n"
f"Are you sure you want to proceed? {suffix}: "
).strip().lower()
if not answer:
answer = default_answer
while answer not in ["yes", "no"]:
answer = input("Please type 'yes' or 'no' explicitly: ").strip().lower()
return answer == "yes"
@classmethod
def run(cls, instance: Instance, project_names: List[str], service_names: List[str], **kwargs):
_logger.debug(f"{instance.directory!r}: {project_names!r}, {service_names!r}")
projects = [
project
for project in instance.projects
if project.name in project_names
]
if not projects:
# run for whole instance
_logger.debug(f"running for instance, kwargs={kwargs}")
cls.run_for_instance(instance, **kwargs)
elif not service_names:
# run for entire project(s)
for project_name, project in zip(project_names, projects):
if project is None:
_logger.debug(f"running for new project {project_name}, kwargs={kwargs}")
cls.run_for_new_project(instance, project_name, **kwargs)
else:
if cls.enabled_only and not project.config.enabled:
cls.print_error(f"Can't interact with disabled project {project_name}!")
return
_logger.debug(f"running for project {project.name}, kwargs={kwargs}")
cls.run_for_project(instance, project, **kwargs)
else:
# run for some services
project_name = project_names[0]
project = projects[0]
if project is None:
cls.print_error(f"Project '{project_name}' not in kiwi-scp instance at '{instance.directory}'!")
else:
if cls.enabled_only and not project.config.enabled:
cls.print_error(f"Can't interact with disabled project {project_name}!")
return
_logger.debug(f"running for services {service_names} in project {project_name}, kwargs={kwargs}")
cls.run_for_services(instance, project, service_names, **kwargs)
@classmethod
def run_for_instance(cls, instance: Instance, **kwargs) -> None:
for project in instance.projects:
if cls.enabled_only and not project.config.enabled:
cls.print_header(f"Skipping disabled project {project.name}")
continue
cls.run_for_project(instance, project, **kwargs)
@classmethod
def run_for_project(cls, instance: Instance, project: Project, **kwargs) -> None:
service_names = [service.name for service in project.services.content]
cls.run_for_services(instance, project, service_names, **kwargs)
@classmethod
def run_for_new_project(cls, instance: Instance, project_name: str, **kwargs) -> None:
cls.print_error(f"Project '{project_name}' not in kiwi-scp instance at '{instance.directory}'!")
@classmethod
def run_for_services(cls, instance: Instance, project: Project, service_names: List[str], **kwargs) -> None:
services = project.services.filter_existing(service_names)
new_service_names = [
service_name
for service_name
in service_names
if service_name not in list(services.names)
]
cls.run_for_filtered_services(instance, project, services, new_service_names, **kwargs)
@classmethod
def run_for_filtered_services(cls, instance: Instance, project: Project, services: Services,
new_service_names: List[str], **kwargs) -> None:
raise Exception