kiwi-scp/kiwi_scp/executable.py

65 lines
1.8 KiB
Python
Raw Normal View History

2021-11-03 20:06:02 +00:00
import functools
import logging
2020-08-12 15:46:50 +00:00
import os
import subprocess
2021-11-03 20:06:02 +00:00
from pathlib import Path
2021-11-13 02:26:32 +00:00
from typing import Optional, List
2020-08-12 14:43:13 +00:00
2021-11-03 20:06:02 +00:00
import attr
2020-08-17 09:57:08 +00:00
2021-11-03 20:06:02 +00:00
_logger = logging.getLogger(__name__)
2021-11-03 20:06:02 +00:00
@attr.s
class Executable:
2021-11-03 20:06:02 +00:00
exe_name: str = attr.ib()
2021-11-03 20:06:02 +00:00
@staticmethod
@functools.lru_cache(maxsize=None)
def __find_exe_file(exe_name: str) -> Optional[Path]:
2021-11-13 02:26:32 +00:00
for path in os.environ["PATH"].split(os.pathsep):
2021-11-03 20:06:02 +00:00
exe_file = Path(path).joinpath(exe_name)
if os.path.isfile(exe_file) and os.access(exe_file, os.X_OK):
return exe_file
2020-08-12 15:39:55 +00:00
2021-11-03 20:06:02 +00:00
raise FileNotFoundError(f"Executable '{exe_name}' not found in $PATH!")
2021-11-03 20:06:02 +00:00
@property
def exe_file(self) -> Optional[Path]:
return self.__find_exe_file(self.exe_name)
2021-11-04 20:59:17 +00:00
def __build_cmd(self, args, kwargs) -> List:
2021-11-03 20:06:02 +00:00
cmd = [self.exe_file, *args]
2021-11-03 20:06:02 +00:00
_logger.debug(f"Executable cmd{cmd}, kwargs{kwargs}")
return cmd
2021-11-03 20:06:02 +00:00
def run(self, process_args, **kwargs) -> Optional[subprocess.CompletedProcess]:
return subprocess.run(
self.__build_cmd(process_args, kwargs),
**kwargs
)
2020-08-13 09:34:30 +00:00
2021-11-04 20:59:17 +00:00
def Popen(self, process_args, **kwargs) -> subprocess.Popen:
2021-11-03 20:06:02 +00:00
return subprocess.Popen(
self.__build_cmd(process_args, kwargs),
**kwargs
)
2020-08-13 09:34:30 +00:00
2021-11-13 02:26:32 +00:00
def run_with_pager(self, process_args, **kwargs) -> Optional[subprocess.CompletedProcess]:
kwargs["stdout"] = subprocess.PIPE
kwargs["stderr"] = subprocess.DEVNULL
2021-11-03 20:06:02 +00:00
with self.Popen(process_args, **kwargs) as process:
2021-11-13 02:26:32 +00:00
less_process = Executable("less").run([
"-R", "+G"
2020-08-20 13:36:28 +00:00
], stdin=process.stdout)
2020-08-13 09:34:30 +00:00
process.communicate()
2021-11-03 20:06:02 +00:00
return less_process
2021-11-13 02:26:32 +00:00
DOCKER_EXE = Executable("docker")
COMPOSE_EXE = Executable("docker-compose")