Created
July 11, 2019 08:53
-
-
Save ReallyLiri/18eb1306936da50253937166cc490843 to your computer and use it in GitHub Desktop.
Run external processes from python
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import gzip | |
import shutil | |
import logging | |
import os | |
from subprocess import Popen, PIPE, TimeoutExpired | |
SUCCESS_CODE = 0 | |
TIMEOUT_SECONDS = 600 | |
class ProcessSetupError(Exception): | |
pass | |
class ProcessTimeoutError(Exception): | |
pass | |
def pipe_to_str(bytes_str): | |
return bytes_str.decode().replace('\\n', '\n') if isinstance(bytes_str, bytes) else str(bytes_str) | |
def run_process(args, cwd=None, read_stdin_path=None, plot_stdout_path=None, append=False, env=None): | |
process_name = args[0] | |
logging.debug("Running '%s'", ' '.join(args)) | |
try: | |
stdin = PIPE if read_stdin_path else None | |
if cwd and not os.path.isabs(cwd): | |
cwd = os.path.join(os.getcwd(), cwd) | |
process = Popen(args, stdout=PIPE, stderr=PIPE, stdin=stdin, cwd=cwd, shell=False, env=env) | |
if process is None: | |
raise ProcessSetupError("An Error occurred while trying to start process '{}'".format(process_name)) | |
if read_stdin_path: | |
with gzip.open(read_stdin_path, 'rb') as f: | |
shutil.copyfileobj(f, process.stdin) | |
if plot_stdout_path: | |
with gzip.open(plot_stdout_path, 'ab' if append else 'wb') as f: | |
shutil.copyfileobj(process.stdout, f) | |
try: | |
output, errors = process.communicate(timeout=TIMEOUT_SECONDS) | |
except TimeoutExpired: | |
logging.exception("Timeout of process '%s' after %d seconds", process_name, TIMEOUT_SECONDS) | |
raise ProcessTimeoutError | |
output = pipe_to_str(output) | |
if output: | |
logging.info("Process '%s' output:\n%s", process_name, output) | |
errors = pipe_to_str(errors) | |
if errors: | |
logging.info("Process '%s' errors:\n%s", process_name, errors) | |
if process.returncode != SUCCESS_CODE: | |
logging.error("process '%s' run failed, return code: %s, stderr: %s", process_name, process.returncode, pipe_to_str(errors)) | |
return False, output | |
return True, output | |
except Exception as ex: | |
logging.exception("Process '%s' run failed with exception", process_name) | |
return False, None | |
def exec_system(command): | |
code = os.system(command) | |
if code != 0: | |
raise RuntimeError(f"Command '{command}' failed with code {code}") | |
def wrap_args_with_ssh_access(args, ssh_access): | |
modified_args = [ | |
'ssh', | |
ssh_access, | |
" ".join(args) | |
] | |
return modified_args | |
def wrap_args_with_pod_access(args, pod): | |
modified_args = [ | |
'kubectl', | |
'exec', | |
'-it', | |
pod, | |
'--' | |
] | |
modified_args.extend(args) | |
return modified_args |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment