1
0
Fork 0
mirror of https://github.com/yavook/kiwi-scp.git synced 2024-11-21 20:33:00 +00:00

Fix "Executable" class

This commit is contained in:
Jörn-Michael Miehe 2021-11-03 21:06:02 +01:00
parent a784475479
commit bf70db567a

View file

@ -1,74 +1,60 @@
# system
import functools
import logging
import os
import subprocess
from pathlib import Path
from typing import Optional, List, Any
import attr
_logger = logging.getLogger(__name__)
def _is_executable(filename):
if filename is None:
return False
return os.path.isfile(filename) and os.access(filename, os.X_OK)
def _find_exe_file(exe_name):
for path in os.environ['PATH'].split(os.pathsep):
exe_file = os.path.join(path, exe_name)
if _is_executable(exe_file):
return exe_file
raise FileNotFoundError(f"Executable '{exe_name}' not found in $PATH!")
@attr.s
class Executable:
class __Executable:
__exe_path = None
exe_name: str = attr.ib()
def __init__(self, exe_name):
self.__exe_path = _find_exe_file(exe_name)
@staticmethod
@functools.lru_cache(maxsize=None)
def __find_exe_file(exe_name: str) -> Optional[Path]:
for path in os.environ['PATH'].split(os.pathsep):
exe_file = Path(path).joinpath(exe_name)
if os.path.isfile(exe_file) and os.access(exe_file, os.X_OK):
return exe_file
def __build_cmd(self, args, kwargs):
cmd = [self.__exe_path, *args]
raise FileNotFoundError(f"Executable '{exe_name}' not found in $PATH!")
logging.debug(f"Executable cmd{cmd}, kwargs{kwargs}")
return cmd
@property
def exe_file(self) -> Optional[Path]:
return self.__find_exe_file(self.exe_name)
def run(self, process_args, **kwargs):
return subprocess.run(
self.__build_cmd(process_args, kwargs),
**kwargs
)
def __build_cmd(self, args, kwargs) -> List[Path, Any, ...]:
cmd = [self.exe_file, *args]
def Popen(self, process_args, **kwargs):
return subprocess.Popen(
self.__build_cmd(process_args, kwargs),
**kwargs
)
_logger.debug(f"Executable cmd{cmd}, kwargs{kwargs}")
return cmd
def run_less(self, process_args, **kwargs):
kwargs['stdout'] = subprocess.PIPE
kwargs['stderr'] = subprocess.DEVNULL
def run(self, process_args, **kwargs) -> Optional[subprocess.CompletedProcess]:
return subprocess.run(
self.__build_cmd(process_args, kwargs),
**kwargs
)
process = self.Popen(
process_args,
**kwargs
)
def Popen(self, process_args, **kwargs) -> subprocess.Popen[str]:
return subprocess.Popen(
self.__build_cmd(process_args, kwargs),
**kwargs
)
def run_less(self, process_args, **kwargs) -> Optional[subprocess.CompletedProcess]:
kwargs['stdout'] = subprocess.PIPE
kwargs['stderr'] = subprocess.DEVNULL
with self.Popen(process_args, **kwargs) as process:
less_process = Executable('less').run([
'-R', '+G'
], stdin=process.stdout)
process.communicate()
return less_process
__exe_name = None
__instances = {}
def __init__(self, exe_name):
self.__exe_name = exe_name
if exe_name not in Executable.__instances:
Executable.__instances[exe_name] = Executable.__Executable(exe_name)
def __getattr__(self, item):
return getattr(self.__instances[self.__exe_name], item)
return less_process