kiwi-scp/kiwi_scp/rootkit.py

93 lines
3.1 KiB
Python
Raw Permalink Normal View History

2021-11-04 20:59:17 +00:00
import functools
import logging
import subprocess
2021-11-04 20:59:17 +00:00
from pathlib import Path
2022-01-27 16:57:15 +00:00
from typing import Optional, TypeVar, Union, Sequence, Any
2021-11-04 20:59:17 +00:00
import attr
2020-08-19 15:15:00 +00:00
from ._constants import IMAGES_DIRECTORY_NAME, LOCAL_IMAGES_NAME, DEFAULT_IMAGE_NAME
2021-11-13 02:26:32 +00:00
from .executable import DOCKER_EXE
2021-11-04 20:59:17 +00:00
_logger = logging.getLogger(__name__)
2022-01-27 16:57:15 +00:00
ROOTKIT_PREFIX = Path("/mnt")
2021-11-04 20:59:17 +00:00
@attr.s
class Rootkit:
2021-11-04 20:59:17 +00:00
image_tag: str = attr.ib()
@staticmethod
@functools.lru_cache(maxsize=None)
def __image_name(image_tag: Optional[str]) -> str:
if image_tag is not None:
return f"{LOCAL_IMAGES_NAME}:{image_tag}"
else:
return DEFAULT_IMAGE_NAME
@staticmethod
@functools.lru_cache(maxsize=None)
def __exists(image_tag: str) -> bool:
2021-11-13 02:26:32 +00:00
ps = DOCKER_EXE.run([
2022-01-27 16:57:35 +00:00
"images",
"--filter", f"reference={Rootkit.__image_name(image_tag)}",
"--format", "{{.Repository}}:{{.Tag}}"
2021-11-04 20:59:17 +00:00
], stdout=subprocess.PIPE)
2022-01-27 16:57:35 +00:00
return str(ps.stdout, "utf-8").strip() == Rootkit.__image_name(image_tag)
2021-11-04 20:59:17 +00:00
def __build_image(self) -> None:
if Rootkit.__exists(self.image_tag):
_logger.info(f"Using image {Rootkit.__image_name(self.image_tag)}")
else:
if self.image_tag is None:
_logger.info(f"Pulling image {Rootkit.__image_name(self.image_tag)}")
2021-11-13 02:26:32 +00:00
DOCKER_EXE.run([
2022-01-27 16:57:35 +00:00
"pull", Rootkit.__image_name(self.image_tag)
2021-11-04 20:59:17 +00:00
], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
else:
2021-11-04 20:59:17 +00:00
_logger.info(f"Building image {Rootkit.__image_name(self.image_tag)}")
2021-11-13 02:26:32 +00:00
DOCKER_EXE.run([
2022-01-27 16:57:35 +00:00
"build",
"-t", Rootkit.__image_name(self.image_tag),
"-f", f"{IMAGES_DIRECTORY_NAME}/{self.image_tag}.Dockerfile",
2021-11-04 20:59:17 +00:00
f"{IMAGES_DIRECTORY_NAME}"
], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
2021-11-27 15:21:54 +00:00
def run(self, process_args, **kwargs) -> Optional[subprocess.CompletedProcess]:
2022-01-27 16:57:15 +00:00
any_sequence = TypeVar("any_sequence", Union[str, Path, Any], Sequence[Union[str, Path, Any]])
def parse_args(argument: any_sequence) -> any_sequence:
if isinstance(argument, str):
return argument
elif isinstance(argument, Path):
2022-02-22 23:21:29 +00:00
argument = argument.absolute().relative_to("/")
2022-01-27 16:57:15 +00:00
return str(ROOTKIT_PREFIX.joinpath(argument))
elif not isinstance(argument, Sequence):
return str(argument)
else:
parsed = [parse_args(path) for path in argument]
flat = []
for item in parsed:
if not isinstance(item, list):
flat.append(item)
else:
flat.extend(item)
return flat
2021-11-04 20:59:17 +00:00
self.__build_image()
2021-11-27 15:21:54 +00:00
return DOCKER_EXE.run([
2022-01-27 16:57:15 +00:00
"run", "--rm",
"-v", f"/:{ROOTKIT_PREFIX!s}",
"-u", "root",
2021-11-04 20:59:17 +00:00
Rootkit.__image_name(self.image_tag),
2022-01-27 16:57:15 +00:00
*parse_args(process_args)
2021-11-04 20:59:17 +00:00
], **kwargs)