"""Child worker process module."""
import argparse
import logging
import os
import platform
import shutil
import signal
import socket
import subprocess
import sys
import threading
import time
import traceback
[docs]def parse_cmdline():
"""Child worker command line parsing"""
parser = argparse.ArgumentParser(description="Remote runner parser")
parser.add_argument("--address", action="store")
parser.add_argument("--index", action="store")
parser.add_argument("--wd", action="store")
parser.add_argument("--runpath", action="store", default=None)
parser.add_argument("--type", action="store")
parser.add_argument("--log-level", action="store", default=0, type=int)
parser.add_argument("--remote-pool-type", action="store", default="thread")
parser.add_argument("--remote-pool-size", action="store", default=1)
parser.add_argument("--sys-path-file", action="store")
parser.add_argument("--resource-monitor-server", action="store")
return parser.parse_args()
[docs]class ChildLoop:
"""
Child process loop that can be started in a process and starts a local
thread pool to execute the tasks received.
"""
def __init__(
self,
index,
transport,
pool_type,
pool_size,
worker_type,
logger,
runpath=None,
):
self._metadata = {"index": index, "pid": os.getpid()}
self._transport = transport
self._pool_type = pool_type
self._pool_size = int(pool_size)
self._pool_cfg = None
self._worker_type = worker_type
self._to_heartbeat = float(0)
self.runpath = runpath
self.logger = logger
@property
def metadata(self):
"""Metadata information."""
return self._metadata
def _child_pool(self):
# Local thread pool will not cleanup the previous layer runpath.
self._pool: Pool = self._pool_type(
name=f"Pool_{self._metadata['pid']}",
worker_type=self._worker_type,
size=self._pool_size,
runpath=self.runpath,
allow_task_rerun=False, # always return False
)
self._pool.parent = self
self._pool.cfg.parent = self._pool_cfg
return self._pool
def _handle_abort(self, signum, frame):
self.logger.debug(
"Signal handler called for signal %s from %s",
signum,
threading.current_thread(),
)
if self._pool:
self._pool.abort()
os.kill(os.getpid(), 9)
self.logger.debug("Pool %s aborted.", self._pool)
def _setup_logfiles(self):
from testplan.common.utils.logger import LOGFILE_FORMAT
if not os.path.exists(self.runpath):
os.makedirs(self.runpath)
stdout_file = os.path.join(
self.runpath, f"{self._metadata['index']}_stdout"
)
stderr_file = os.path.join(
self.runpath, f"{self._metadata['index']}_stderr"
)
log_file = os.path.join(self.runpath, f"{self._metadata['index']}_log")
self.logger.info("stdout file = %s", stdout_file)
self.logger.info("stderr file = %s", stderr_file)
self.logger.info(
"log file = %(file)s (log level = %(lvl)s)",
{"file": log_file, "lvl": self.logger.level},
)
# redirect stdout/stderr
stderr_fd = open(stderr_file, "w")
stdout_fd = open(stdout_file, "w")
os.close(2)
os.close(1)
os.dup2(stderr_fd.fileno(), 2)
os.dup2(stdout_fd.fileno(), 1)
fhandler = logging.FileHandler(log_file, encoding="utf-8")
formatter = logging.Formatter(LOGFILE_FORMAT)
fhandler.setFormatter(formatter)
fhandler.setLevel(self.logger.level)
self.logger.addHandler(fhandler)
def _send_and_expect(self, message, send, expect):
try:
return self._transport.send_and_receive(
message.make(send), expect=expect
)
except AttributeError:
self.logger.critical("Pool seems dead, child exits.")
raise
def _pre_loop_setup(self, message):
response = self._send_and_expect(
message,
message.ConfigRequest,
[message.ConfigSending, message.Stop],
)
# Process pool might be exiting after worker restarts and tries
# to connect, at this time worker can gracefully exit.
if response.cmd == message.Stop:
print("Stop message received, child exits.")
os._exit(0)
# Response.data: [cfg, cfg.parent, cfg.parent.parent, ...]
pool_cfg = response.data[0]
for idx, cfg in enumerate(response.data):
try:
cfg.parent = response.data[idx + 1]
except IndexError:
break
self._pool_cfg = pool_cfg
for sig in self._pool_cfg.abort_signals:
signal.signal(sig, self._handle_abort)
pool_metadata = response.sender_metadata
if self.runpath is None:
if pool_metadata.get("runpath") is None:
raise RuntimeError("runpath was not set in pool metadata")
self.runpath = pool_metadata["runpath"]
self._setup_logfiles()
[docs] def worker_loop(self):
"""
Child process worker loop. Manages an underlying thread pool, pulls and
sends back results to the main pool.
"""
from testplan.runners.pools.communication import Message
message = Message(**self.metadata)
try:
self._pre_loop_setup(message)
except Exception:
print("_pre_loop_setup failed")
self._transport.send_and_receive(
message.make(message.SetupFailed, data=traceback.format_exc()),
expect=message.Ack,
)
return
with self._child_pool():
message = Message(**self.metadata)
next_possible_request = 0
next_heartbeat = 0
request_delay = self._pool_cfg.active_loop_sleep
while True:
# TODO: SHALL CHECK CHILD POOL ALIVE HERE
now = time.time()
if self._pool_cfg.worker_heartbeat and now > next_heartbeat:
hb_resp = self._transport.send_and_receive(
message.make(message.Heartbeat, data=time.time())
)
if hb_resp is None:
self.logger.critical("Pool seems dead, child exits.")
self.exit_loop()
break
else:
self.logger.debug(
"Pool heartbeat response: %s at %s before %ss.",
hb_resp.cmd,
hb_resp.data,
time.time() - hb_resp.data,
)
if hb_resp.cmd == Message.DiscardPending:
self._pool.discard_pending_tasks(
report_reason="DiscardPending received"
)
next_heartbeat = now + self._pool_cfg.worker_heartbeat
# Send back results
if self._pool.results:
task_results = []
for uid in list(self._pool.results.keys()):
task_results.append(self._pool.results[uid])
self.logger.debug(
"Sending back result for %s",
self._pool.results[uid].task,
)
del self._pool.results[uid]
self._transport.send_and_receive(
message.make(message.TaskResults, data=task_results),
expect=message.Ack,
)
# Request new tasks
demand = (
self._pool.workers_requests()
- self._pool.unassigned.qsize()
)
if demand > 0 and time.time() > next_possible_request:
received = self._transport.send_and_receive(
message.make(message.TaskPullRequest, data=demand)
)
if received is None or received.cmd == Message.Stop:
self.logger.critical(
"Pool seems dead or stopping, child exits."
)
self.exit_loop()
break
elif received.cmd == Message.TaskSending:
next_possible_request = time.time()
request_delay = 0
for task in received.data:
self.logger.debug("Added %s to local pool", task)
self._pool.add(task, task.uid())
# Reset workers request counters
for worker in self._pool._workers:
worker.requesting = 0
elif received.cmd == Message.Ack:
request_delay = min(
(request_delay + 0.2) * 1.5,
self._pool_cfg.max_active_loop_sleep,
)
next_possible_request = time.time() + request_delay
time.sleep(self._pool_cfg.active_loop_sleep)
self.logger.info("Local pool %s stopped.", self._pool)
[docs] def exit_loop(self):
self._pool.abort()
[docs]class RemoteChildLoop(ChildLoop):
"""
Child loop for remote workers.
This involved exchange of metadata for additional functionality.
"""
def __init__(self, *args, **kwargs):
super(RemoteChildLoop, self).__init__(*args, **kwargs)
self._setup_metadata = None
def _pre_loop_setup(self, message):
super(RemoteChildLoop, self)._pre_loop_setup(message)
response = self._send_and_expect(
message, message.MetadataPull, [message.Metadata, message.Stop]
)
if response.cmd == message.Stop:
print("Stop message received, child exits.")
os._exit(0)
self._setup_metadata = response.data
if self._setup_metadata.env:
for key, value in self._setup_metadata.env.items():
os.environ[key] = value
if self._setup_metadata.setup_script:
if subprocess.call(
self._setup_metadata.setup_script,
stdout=sys.stdout,
stderr=sys.stderr,
):
raise RuntimeError("Setup script exited with non 0 code.")
[docs] def exit_loop(self):
if self._setup_metadata.delete_pushed:
for item in self._setup_metadata.push_dirs:
self.logger.user_info("Removing directory: %s", item)
shutil.rmtree(item, ignore_errors=True)
for item in self._setup_metadata.push_files:
self.logger.user_info("Removing file: %s", item)
os.remove(item)
super(RemoteChildLoop, self).exit_loop()
[docs]def child_logic(args):
"""Able to be imported child logic."""
import psutil
from testplan.runners.pools.base import Pool, Worker
from testplan.runners.pools.process import ProcessPool, ProcessWorker
from testplan.runners.pools.connection import ZMQClient
if args.log_level:
from testplan.common.utils.logger import (
TESTPLAN_LOGGER,
STDOUT_HANDLER,
)
TESTPLAN_LOGGER.setLevel(args.log_level)
TESTPLAN_LOGGER.removeHandler(STDOUT_HANDLER)
print(
f"Starting child process worker on {socket.gethostname()},"
f" {os.getpid()} with parent {psutil.Process(os.getpid()).ppid()}"
)
if args.runpath:
print(f"Removing old runpath: {args.runpath}")
shutil.rmtree(args.runpath, ignore_errors=True)
class NoRunpathPool(Pool):
"""
Pool that creates no runpath directory.
Has only one worker.
Will use the one already created by parent process.
"""
# To eliminate a not needed runpath layer.
def make_runpath_dirs(self):
self._runpath = self.cfg.runpath
# FIXME: dedup
class NoRunpathThreadPool(Pool):
"""
Pool that creates no runpath directory.
Will use the one already created by parent process.
Supports multiple thread workers.
"""
# To eliminate a not needed runpath layer.
def make_runpath_dirs(self):
self._runpath = self.cfg.runpath
class NoRunpathProcessPool(ProcessPool):
"""
Pool that creates no runpath directory.
Will use the one already created by parent process.
Supports multiple process workers.
"""
# To eliminate a not needed runpath layer.
def make_runpath_dirs(self):
self._runpath = self.cfg.runpath
transport = ZMQClient(address=args.address, recv_timeout=30)
if args.type == "process_worker":
loop = ChildLoop(
args.index, transport, NoRunpathPool, 1, Worker, TESTPLAN_LOGGER
)
loop.worker_loop()
elif args.type == "remote_worker":
if args.remote_pool_type == "process":
pool_type = NoRunpathProcessPool
worker_type = ProcessWorker
else:
pool_type = NoRunpathThreadPool
worker_type = Worker
loop = RemoteChildLoop(
args.index,
transport,
pool_type,
args.remote_pool_size,
worker_type,
TESTPLAN_LOGGER,
runpath=args.runpath,
)
loop.worker_loop()
[docs]def process_syspath_file(filename, working_dir=None):
"""
Process the syspath file, which should contain one sys.path entry per line
Since we might be on a remote host, we need to check the accessibility of
those entries. And we should always be able to directly access modules in
the working directory. The result is written back to the original file for
bookkeeping.
"""
with open(filename) as f:
new_syspath = f.read().split("\n")
new_syspath = list(filter(os.path.exists, new_syspath))
if working_dir is not None:
new_syspath.insert(0, working_dir)
with open(filename, "w") as f:
f.write("\n".join(new_syspath))
return new_syspath
if __name__ == "__main__":
"""
To start an external child process worker.
"""
ARGS = parse_cmdline()
if ARGS.wd:
os.chdir(ARGS.wd)
os.environ["PWD"] = ARGS.wd
if ARGS.sys_path_file:
sys.path = process_syspath_file(ARGS.sys_path_file, ARGS.wd)
# upfront import to speed-up execution
import testplan
import psutil
from testplan.runners.pools.communication import Message
from testplan.runners.pools.base import Pool, Worker
from testplan.runners.pools.process import ProcessPool, ProcessWorker
from testplan.runners.pools.connection import ZMQClient
from testplan.common.utils.logger import LOGFILE_FORMAT
from testplan.common.utils.logger import (
TESTPLAN_LOGGER,
STDOUT_HANDLER,
)
resource_monitor_client = None
if ARGS.resource_monitor_server:
from testplan.monitor.resource import ResourceMonitorClient
resource_monitor_client = ResourceMonitorClient(
ARGS.resource_monitor_server
)
resource_monitor_client.start()
child_logic(ARGS)
print("child.py exiting")
if resource_monitor_client:
resource_monitor_client.stop()
os._exit(0)