"""Remote execution utilities."""
import getpass
import os
import platform
import shlex
import socket
import subprocess
import sys
IS_WIN = platform.system() == "Windows"
USER = getpass.getuser()
DEFAULT_SSH_OPT = "-p {port} {user}@{host}"
[docs]
def worker_is_remote(remote_host):
"""
Check remote_host is not the same as current.
"""
try:
socket.inet_aton(remote_host)
except socket.error:
remote_ip = socket.gethostbyname(remote_host)
else:
remote_ip = remote_host
local_ip = socket.gethostbyname(socket.gethostname())
return local_ip != remote_ip
[docs]
def ssh_bin():
try:
binary = os.environ["SSH_BINARY"]
except KeyError:
if IS_WIN:
raise Exception("SSH binary not provided.")
else:
binary = (
subprocess.check_output("which ssh", shell=True)
.decode(sys.stdout.encoding)
.strip()
)
return binary
[docs]
def ssh_cmd(ssh_cfg, command):
"""
Prefix command with ssh binary and option.
:param ssh_cfg: dict with "host" and "port" (optional) keys
:param command: command to execute on remote host
:return: full cmd list
"""
full_cmd = [ssh_bin()]
ssh_opt = os.environ.get("SSH_OPT") or DEFAULT_SSH_OPT
full_cmd.extend(
ssh_opt.format(
port=ssh_cfg.get("port", 22),
user=USER,
host=ssh_cfg["host"],
).split(" ")
)
full_cmd.append(command)
return full_cmd
[docs]
def copy_cmd(source, target, exclude=None, port=None, deref_links=False):
"""Returns remote copy command."""
try:
binary = os.environ["RSYNC_BINARY"]
except KeyError:
if IS_WIN:
binary = None
else:
binary = (
subprocess.check_output("which rsync", shell=True)
.decode(sys.stdout.encoding)
.strip()
)
if binary:
full_cmd = [binary, "-r", "-L" if deref_links else "-l"]
if exclude is not None:
for item in exclude:
full_cmd.extend(["--exclude", item])
if port is not None:
# Add '-e "ssh -p "' option to rsync command
ssh = "{} -p {}".format(ssh_bin(), port)
full_cmd.extend(["-e", ssh])
full_cmd.extend([source, target])
return full_cmd
else:
# Proceed with SCP
try:
binary = os.environ["SCP_BINARY"]
except KeyError:
if not IS_WIN:
binary = (
subprocess.check_output("which scp", shell=True)
.decode(sys.stdout.encoding)
.strip()
)
else:
raise Exception("SCP binary not provided.")
full_cmd = [binary, "-r"]
if port is not None:
full_cmd.extend(["-P", port])
full_cmd.extend([source, target])
return full_cmd
[docs]
def link_cmd(path, link):
"""Returns link creation command."""
return " ".join(["/bin/ln", "-sfn", shlex.quote(path), shlex.quote(link)])
[docs]
def mkdir_cmd(path):
"""Return mkdir command"""
return " ".join(["/bin/mkdir", "-p", shlex.quote(path)])
[docs]
def rm_cmd(path):
"""Return rm command"""
return " ".join(["/bin/rm", "-rf", shlex.quote(path)])
[docs]
def filepath_exist_cmd(path):
"""Checks if filepath exists."""
return " ".join(["/bin/test", "-e", shlex.quote(path)])