1
0
Fork 0
mirror of https://github.com/yavook/kiwi-scp.git synced 2024-11-22 04:43:00 +00:00

Fix "Rootkit" class

This commit is contained in:
Jörn-Michael Miehe 2021-11-04 21:59:17 +01:00
parent bf70db567a
commit 52f0e9b8a3
2 changed files with 61 additions and 69 deletions

View file

@ -28,7 +28,7 @@ class Executable:
def exe_file(self) -> Optional[Path]:
return self.__find_exe_file(self.exe_name)
def __build_cmd(self, args, kwargs) -> List[Path, Any, ...]:
def __build_cmd(self, args, kwargs) -> List:
cmd = [self.exe_file, *args]
_logger.debug(f"Executable cmd{cmd}, kwargs{kwargs}")
@ -40,7 +40,7 @@ class Executable:
**kwargs
)
def Popen(self, process_args, **kwargs) -> subprocess.Popen[str]:
def Popen(self, process_args, **kwargs) -> subprocess.Popen:
return subprocess.Popen(
self.__build_cmd(process_args, kwargs),
**kwargs

View file

@ -1,86 +1,78 @@
# system
import functools
import logging
import os
import subprocess
from pathlib import Path
from typing import Optional, TypeVar, Union, List
import attr
# local
from ._constants import IMAGES_DIRECTORY_NAME, LOCAL_IMAGES_NAME, DEFAULT_IMAGE_NAME
from .executable import Executable
_logger = logging.getLogger(__name__)
PSL = TypeVar("PSL", Union[Path, str], List[Union[Path, str]])
def prefix_path(path: PSL, prefix: Path = Path("/mnt")) -> PSL:
if isinstance(path, Path):
return prefix.joinpath(path.absolute())
def _prefix_path(prefix, path):
if isinstance(path, str):
abs_path = os.path.abspath(path)
return os.path.realpath(f"{prefix}/{abs_path}")
return prefix_path(Path(path), prefix)
elif isinstance(path, list):
return [_prefix_path(prefix, p) for p in path]
def prefix_path_mnt(path):
return _prefix_path('/mnt/', path)
def _image_name(image_tag):
if image_tag is not None:
return f"{LOCAL_IMAGES_NAME}:{image_tag}"
else:
return DEFAULT_IMAGE_NAME
return [prefix_path(prefix, p) for p in path]
@attr.s
class Rootkit:
class __Rootkit:
__image_tag = None
image_tag: str = attr.ib()
def __init__(self, image_tag=None):
self.__image_tag = image_tag
@staticmethod
@functools.lru_cache(maxsize=None)
def __image_name(image_tag: Optional[str]) -> str:
if image_tag is not None:
return f"{LOCAL_IMAGES_NAME}:{image_tag}"
else:
return DEFAULT_IMAGE_NAME
def __exists(self):
ps = Executable('docker').run([
'images',
'--filter', f"reference={_image_name(self.__image_tag)}",
'--format', '{{.Repository}}:{{.Tag}}'
], stdout=subprocess.PIPE)
@staticmethod
@functools.lru_cache(maxsize=None)
def __exists(image_tag: str) -> bool:
ps = Executable('docker').run([
'images',
'--filter', f"reference={Rootkit.__image_name(image_tag)}",
'--format', '{{.Repository}}:{{.Tag}}'
], stdout=subprocess.PIPE)
return str(ps.stdout, 'utf-8').strip() == _image_name(self.__image_tag)
return str(ps.stdout, 'utf-8').strip() == Rootkit.__image_name(image_tag)
def __build_image(self) -> None:
if Rootkit.__exists(self.image_tag):
_logger.info(f"Using image {Rootkit.__image_name(self.image_tag)}")
else:
if self.image_tag is None:
_logger.info(f"Pulling image {Rootkit.__image_name(self.image_tag)}")
Executable('docker').run([
'pull', Rootkit.__image_name(self.image_tag)
], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
def __build_image(self):
if self.__exists():
logging.info(f"Using image {_image_name(self.__image_tag)}")
else:
if self.__image_tag is None:
logging.info(f"Pulling image {_image_name(self.__image_tag)}")
Executable('docker').run([
'pull', _image_name(self.__image_tag)
], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
_logger.info(f"Building image {Rootkit.__image_name(self.image_tag)}")
Executable('docker').run([
'build',
'-t', Rootkit.__image_name(self.image_tag),
'-f', f"{IMAGES_DIRECTORY_NAME}/{self.image_tag}.Dockerfile",
f"{IMAGES_DIRECTORY_NAME}"
], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
else:
logging.info(f"Building image {_image_name(self.__image_tag)}")
Executable('docker').run([
'build',
'-t', _image_name(self.__image_tag),
'-f', f"{IMAGES_DIRECTORY_NAME}/{self.__image_tag}.Dockerfile",
f"{IMAGES_DIRECTORY_NAME}"
], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
def run(self, process_args, **kwargs):
self.__build_image()
Executable('docker').run([
'run', '--rm',
'-v', '/:/mnt',
'-u', 'root',
_image_name(self.__image_tag),
*process_args
], **kwargs)
__image_tag = None
__instances = {}
def __init__(self, image_tag=None):
self.__image_tag = image_tag
if _image_name(self.__image_tag) not in Rootkit.__instances:
Rootkit.__instances[_image_name(self.__image_tag)] = Rootkit.__Rootkit(image_tag)
def __getattr__(self, item):
return getattr(self.__instances[_image_name(self.__image_tag)], item)
def run(self, process_args, **kwargs):
self.__build_image()
Executable('docker').run([
'run', '--rm',
'-v', '/:/mnt',
'-u', 'root',
Rootkit.__image_name(self.image_tag),
*process_args
], **kwargs)