kiwi-scp/kiwi_scp/executable.py

75 lines
1.9 KiB
Python
Raw Permalink Normal View History

2020-08-12 15:46:50 +00:00
# system
import logging
2020-08-12 15:46:50 +00:00
import os
import subprocess
2020-08-12 14:43:13 +00:00
2020-08-17 09:57:08 +00:00
def _is_executable(filename):
2020-08-12 14:43:13 +00:00
if filename is None:
return False
return os.path.isfile(filename) and os.access(filename, os.X_OK)
2020-08-17 09:57:08 +00:00
def _find_exe_file(exe_name):
2020-08-12 14:43:13 +00:00
for path in os.environ['PATH'].split(os.pathsep):
exe_file = os.path.join(path, exe_name)
2020-08-17 09:57:08 +00:00
if _is_executable(exe_file):
2020-08-12 14:43:13 +00:00
return exe_file
raise FileNotFoundError(f"Executable '{exe_name}' not found in $PATH!")
class Executable:
class __Executable:
2020-08-12 15:39:55 +00:00
__exe_path = None
2020-08-17 09:57:08 +00:00
def __init__(self, exe_name):
self.__exe_path = _find_exe_file(exe_name)
2020-08-12 15:39:55 +00:00
2020-08-20 13:36:28 +00:00
def __build_cmd(self, args, kwargs):
2020-08-12 15:39:55 +00:00
cmd = [self.__exe_path, *args]
logging.debug(f"Executable cmd{cmd}, kwargs{kwargs}")
return cmd
def run(self, process_args, **kwargs):
return subprocess.run(
2020-08-20 13:36:28 +00:00
self.__build_cmd(process_args, kwargs),
**kwargs
)
def Popen(self, process_args, **kwargs):
return subprocess.Popen(
2020-08-20 13:36:28 +00:00
self.__build_cmd(process_args, kwargs),
**kwargs
)
def run_less(self, process_args, **kwargs):
2020-08-13 09:34:30 +00:00
kwargs['stdout'] = subprocess.PIPE
kwargs['stderr'] = subprocess.DEVNULL
process = self.Popen(
process_args,
2020-08-13 09:34:30 +00:00
**kwargs
)
2020-08-20 13:36:28 +00:00
less_process = Executable('less').run([
'-R', '+G'
], stdin=process.stdout)
2020-08-13 09:34:30 +00:00
process.communicate()
return less_process
2020-08-12 15:39:55 +00:00
__exe_name = None
__instances = {}
2020-08-17 09:57:08 +00:00
def __init__(self, exe_name):
self.__exe_name = exe_name
if exe_name not in Executable.__instances:
2020-08-17 09:57:08 +00:00
Executable.__instances[exe_name] = Executable.__Executable(exe_name)
def __getattr__(self, item):
return getattr(self.__instances[self.__exe_name], item)