"""Remote worker pool module."""
import copy
import os
import signal
import socket
from multiprocessing import cpu_count
from multiprocessing.pool import ThreadPool
from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union
from schema import Or
from testplan.common.config import ConfigOption
from testplan.common.remote.remote_resource import (
RemoteResource,
RemoteResourceConfig,
UnboundRemoteResourceConfig,
)
from testplan.common.remote.remote_runtime import RuntimeBuilder
from testplan.common.utils.logger import TESTPLAN_LOGGER
from testplan.common.utils.observability import tracing
from testplan.common.utils.path import fix_home_prefix, rebase_path
from testplan.common.utils.process import kill_process
from testplan.common.utils.remote import copy_cmd, ssh_cmd
from testplan.common.utils.timing import get_sleeper, wait
from testplan.testing.base import TestResult
from .base import Pool, PoolConfig
from .communication import Message
from .connection import ZMQServer
from .process import ProcessWorker, ProcessWorkerConfig
from .tasks import Task
[docs]
class UnboundRemoteWorkerConfig(
ProcessWorkerConfig, UnboundRemoteResourceConfig
):
"""
Configuration object for
:py:class:`~testplan.runners.pools.remote.RemoteWorker` resource entity.
"""
# RemoteWorker will receive params passed to RemotePool
# thus allow extra keys
ignore_extra_keys = True
[docs]
@classmethod
def get_options(cls) -> Dict[Any, Any]:
"""
Schema for options validation and assignment of default values.
"""
return {"workers": int, "pool_type": str}
[docs]
class RemoteWorkerConfig(UnboundRemoteWorkerConfig, RemoteResourceConfig):
pass
[docs]
class RemoteWorker(ProcessWorker, RemoteResource):
"""
Remote worker resource that pulls tasks from the transport provided,
executes them in a local pool of workers and sends back task results.
:param pool_type: Child pool type that remote workers will use.
can be ``thread`` or ``process``, default to ``thread`` if
``workers`` is 1 and otherwise ``process``.
:param workers: Number of thread/process workers of the child
pool, default to 1.
Also inherits all
:py:class:`~testplan.runners.pools.process.ProcessWorkerConfig` and
:py:class:`~testplan.common.remote.remote_resource.RemoteResource`
options.
"""
CONFIG = RemoteWorkerConfig
def __init__(self, **options: Any) -> None:
if options["workers"] == 1:
options["pool_type"] = "thread"
super().__init__(**options)
@property
def host(self) -> str:
return self.cfg.remote_host # type: ignore[no-any-return]
def _set_child_script(self) -> None:
"""Specify the remote worker executable file."""
# XXX: we shouldn't always need full path of child.py
# XXX: child_via_mod
self._child_paths.local = fix_home_prefix(self._child_path())
self._child_paths.remote = rebase_path(
self._child_paths.local,
self._testplan_import_path.local, # type: ignore[arg-type]
self._testplan_import_path.remote, # type: ignore[arg-type]
)
def _proc_cmd_impl(self) -> List[str]:
cmd = [
self.remote_python_bin,
"-uB",
self._child_paths.remote,
"--index",
str(self.cfg.index),
"--address",
self.transport.address, # type: ignore[attr-defined]
"--type",
"remote_worker",
"--log-level",
str(TESTPLAN_LOGGER.getEffectiveLevel()),
"--wd",
self._working_dirs.remote,
"--runpath",
self._remote_resource_runpath,
"--remote-pool-type",
self.cfg.pool_type,
"--remote-pool-size",
str(self.cfg.workers),
"--sys-path-file",
self._remote_syspath_file,
]
if self.otel_traces and tracing._get_traceparent():
cmd.extend(["--otel-traceparent", tracing._get_traceparent()])
if self.otel_logs:
cmd.append("--otel-logs")
if self.parent is None:
raise RuntimeError("self.parent must not be None")
if self.parent.resource_monitor_address: # type: ignore[attr-defined]
cmd.extend(
[
"--resource-monitor-server",
self.parent.resource_monitor_address, # type: ignore[attr-defined]
]
)
return cmd
def _proc_cmd(self) -> str: # type: ignore[override]
"""Command to start child process."""
cmd = self._proc_cmd_impl()
return self.cfg.ssh_cmd(self.ssh_cfg, " ".join(cmd)) # type: ignore[no-any-return]
def _write_syspath(self) -> None: # type: ignore[override]
"""
Write our current sys.path to a file and transfer it to the remote
host.
"""
super(RemoteWorker, self)._write_syspath(
sys_path=self._remote_sys_path()
)
self._remote_syspath_file = os.path.join(
self._remote_plan_runpath,
f"sys_path_{os.path.basename(self._syspath_file)}",
)
self._transfer_data(
source=self._syspath_file,
target=self._remote_syspath_file,
remote_target=True,
)
self.logger.debug(
"Transferred sys.path to remote host at: %s",
self._remote_syspath_file,
)
[docs]
def pre_start(self) -> None:
self.make_runpath_dirs()
with self.timer.record("prepare remote"):
self._prepare_remote()
self._set_child_script()
self._write_syspath()
[docs]
def pre_stop(self) -> None:
"""Stop child process worker."""
with self.timer.record("fetch results"):
self._fetch_results()
[docs]
def stopping(self) -> None:
"""
Stop the ssh process.
Wait for child.py to naturally exit after cleanup, with timeout fallback.
"""
if not self._handler:
return
# First, wait for the SSH process to exit naturally
# This allows child.py to complete its cleanup (stopping child workers, etc.)
self.logger.debug(
"Waiting for remote worker %s to exit naturally (timeout: %ss)",
self.cfg.index,
self.cfg.stop_timeout,
)
sleeper = get_sleeper(
interval=(0.1, 0.5),
timeout=self.cfg.stop_timeout,
)
while next(sleeper):
retcode = self._handler.poll()
if retcode is not None:
self.logger.debug(
"Remote worker %s exited naturally with code %d",
self.cfg.index,
retcode,
)
return
# Timeout reached - child.py didn't exit gracefully
# Fall back to force-killing the SSH process
self.logger.warning(
"Remote worker %s did not exit within %ss, force-killing SSH process",
self.cfg.index,
self.cfg.stop_timeout,
)
kill_process(self._handler, timeout=1)
[docs]
def post_stop(self) -> None:
self._clean_remote()
def _rebase_assertion(self, result: Any) -> None:
if isinstance(result, dict) and "source_path" in result:
result["source_path"] = rebase_path(
result["source_path"],
self._remote_plan_runpath,
self._get_plan().runpath,
)
else:
entries = getattr(result, "entries", [])
for entry in entries:
self._rebase_assertion(entry)
[docs]
def rebase_attachment(self, result: TestResult) -> None:
"""Rebase the path of attachment from remote to local"""
for attachment in result.report.attachments: # type: ignore[attr-defined]
attachment.source_path = rebase_path(
attachment.source_path,
self._remote_plan_runpath,
self._get_plan().runpath,
)
self._rebase_assertion(result.report)
[docs]
def rebase_task_path(self, task: Task) -> None:
"""Rebase the path of task from local to remote"""
task.rebase_path(
self._workspace_paths.local, # type: ignore[arg-type]
self._workspace_paths.remote, # type: ignore[arg-type]
)
[docs]
class RemotePoolConfig(PoolConfig):
"""
Configuration object for
:py:class:`~testplan.runners.pools.remote.RemotePool` executor
resource entity.
"""
# RemotePool is taking param that are to be passed to RemoteWorker
# thus allow extra keys
ignore_extra_keys = True
default_hostname = socket.gethostbyname(socket.gethostname())
[docs]
@classmethod
def get_options(cls) -> Dict[Any, Any]:
"""
Schema for options validation and assignment of default values.
"""
return {
"hosts": dict,
ConfigOption(
"abort_signals", default=[signal.SIGINT, signal.SIGTERM]
): [int],
ConfigOption("worker_type", default=RemoteWorker): object,
ConfigOption("pool_type", default="process"): str,
ConfigOption("host", default=cls.default_hostname): str,
ConfigOption("port", default=0): int,
ConfigOption("worker_heartbeat", default=30): Or(int, float, None),
}
[docs]
class RemotePool(Pool):
"""
Pool task executor object that initializes remote workers and dispatches
tasks.
:param name: Pool name.
:param hosts: Map of host(ip): number of their local thread/process workers.
i.e {'hostname1': 2, '10.147.XX.XX': 4}
:param abort_signals: Signals to trigger abort logic. Default: INT, TERM.
:param worker_type: Type of worker to be initialized.
:param pool_type: Child pool type that remote workers will use.
can be ``thread`` or ``process``, default to ``thread`` if
``workers`` is 1 and otherwise ``process``.
:param host: Host that pool binds and listens for requests. Defaults to
local hostname.
:param port: Port that pool binds. Default: 0 (random)
:param worker_heartbeat: Worker heartbeat period.
:param ssh_port: The ssh port number of remote host, default is 22.
:param ssh_cmd: callable that prefix a command with ssh binary and options
:param copy_cmd: callable that returns the cmdline to do copy on remote host
:param workspace: Current project workspace to be transferred, default is pwd.
:param workspace_exclude: Patterns to exclude files when pushing workspace.
:param remote_runpath: Root runpath on remote host, default is same as local (Linux->Linux)
or /var/tmp/$USER/testplan/$plan_name (Window->Linux).
:param remote_workspace: The path of the workspace on remote host,
default is fetched_workspace under remote_runpath
:param clean_remote: Deleted root runpath on remote at exit.
:param push: Files and directories to push to the remote.
:type push: ``list`` that contains ``str`` or ``tuple``:
- ``str``: Name of the file or directory
- ``tuple``: A (src, dst) pair
:param push_exclude: Patterns to exclude files on push stage.
:param delete_pushed: Deleted pushed files on remote at exit.
:param fetch_runpath: The flag of fetch remote resource's runpath, default to True.
:param fetch_runpath_exclude: Exclude files matching PATTERN.
:param pull: Files and directories to be pulled from the remote at the end.
:param pull_exclude: Patterns to exclude files on pull stage..
:param env: Environment variables to be propagated.
:param setup_script: Script to be executed on remote as very first thing.
:param paramiko_config: Paramiko SSH client extra configuration.
:param remote_runtime_builder: RuntimeBuilder instance to prepare remote python env.
Default is ``SourceTransferBuilder()``.
Also inherits all :py:class:`~testplan.runner.pools.base.Pool` options.
"""
CONFIG = RemotePoolConfig
CONN_MANAGER = ZMQServer # type: ignore[assignment]
QUEUE_WAIT_INTERVAL = 3
MAX_THREAD_POOL_SIZE = 5
def __init__(
self,
name: str,
hosts: Dict[str, int],
abort_signals: Optional[List[int]] = None,
worker_type: Type = RemoteWorker,
pool_type: str = "process",
host: str = CONFIG.default_hostname,
port: int = 0,
worker_heartbeat: float = 30,
ssh_port: int = 22,
ssh_cmd: Callable = ssh_cmd,
copy_cmd: Callable = copy_cmd,
workspace: Optional[str] = None,
workspace_exclude: Optional[List[str]] = None,
remote_runpath: Optional[str] = None,
remote_workspace: Optional[str] = None,
clean_remote: bool = False,
push: Optional[List[Union[str, Tuple[str, str]]]] = None,
push_exclude: Optional[List[str]] = None,
delete_pushed: bool = False,
fetch_runpath: bool = True,
fetch_runpath_exclude: Optional[List[str]] = None,
pull: Optional[List[str]] = None,
pull_exclude: Optional[List[str]] = None,
env: Optional[Dict[str, str]] = None,
setup_script: Optional[List[str]] = None,
paramiko_config: Optional[dict] = None,
remote_runtime_builder: Optional[RuntimeBuilder] = None,
**options: Any,
) -> None:
self.pool: Optional[ThreadPool] = None
options.update(self.filter_locals(locals()))
super(RemotePool, self).__init__(**options)
self._options = options # pass to remote worker later
self._request_handlers[Message.MetadataPull] = (
self._worker_setup_metadata # type: ignore[assignment]
)
self._instances: Dict[str, Dict[str, Any]] = {}
for host, number_of_workers in self.cfg.hosts.items():
self._instances[host] = {
"host": host,
"number_of_workers": number_of_workers,
}
@property
def resource_monitor_address(self) -> Optional[str]:
if self.parent.resource_monitor_server: # type: ignore[union-attr]
return self.parent.resource_monitor_server.address # type: ignore[union-attr, no-any-return]
return None
@staticmethod
def _worker_setup_metadata(
worker: RemoteWorker, _: Message, response: Message
) -> None:
worker.respond(
response.make(Message.Metadata, data=worker.setup_metadata)
)
def _add_workers(self) -> None:
rtb = self._options.get("remote_runtime_builder")
for instance in self._instances.values():
worker = self.cfg.worker_type(
index=instance["host"],
remote_host=instance["host"],
workers=instance["number_of_workers"],
**{
**self._options,
"remote_runtime_builder": copy.deepcopy(rtb),
},
)
self.logger.debug("Created %s", worker)
worker.parent = self
worker.cfg.parent = self.cfg
self._workers.add(worker, uid=instance["host"])
def _start_workers(self) -> None:
"""Start all workers of the pool"""
for worker in self._workers:
self._conn.register(worker) # type: ignore[arg-type]
if self.pool:
self._workers.start_in_pool(
self.pool,
self.cfg.status_wait_timeout,
)
else:
self._workers.start()
def _stop_workers(self) -> None:
if self.pool:
self._workers.stop_in_pool(
self.pool,
self.cfg.status_wait_timeout,
)
else:
self._workers.stop()
def _start_thread_pool(self) -> None:
size = len(self._instances)
try:
if size >= 2:
self.pool = ThreadPool(min(size, cpu_count()))
except Exception as exc:
if isinstance(exc, AttributeError):
self.logger.warning(
"Please upgrade to the suggested python interpreter."
)
def _early_stop_worker(self, worker: RemoteWorker) -> bool: # type: ignore[override]
if len(worker.assigned) != 0:
return False
worker_uid: str = worker.uid() # type: ignore[assignment]
with self._pool_lock:
if self.pool and (worker_uid not in self._stopping_queue):
alive_worker_ids = set(
[worker.uid() for worker in self.worker_status["active"]]
)
if self.RESERVE_WORKER_NUM + self.unassigned.size() < len(
alive_worker_ids - set(self._stopping_queue)
):
self.logger.user_info(
"Early stop worker %s", self._workers[worker_uid]
)
self._stopping_queue.append(worker_uid)
self.pool.apply_async(
self._workers.sync_stop_resource,
(self._workers[worker_uid],),
)
return True
return False
[docs]
def starting(self) -> None:
self._start_thread_pool()
super(RemotePool, self).starting()
[docs]
def stopping(self) -> None:
for worker in self._workers:
if worker.status == worker.status.STARTING: # type: ignore[attr-defined]
try:
wait(
lambda: (
worker.status
in (worker.STATUS.STARTED, worker.STATUS.STOPPED)
),
worker.cfg.status_wait_timeout,
)
except Exception:
self.logger.error(
"Timeout waiting for worker %s to quit starting "
"while pool %s is stopping",
worker,
self.cfg.name,
)
super(RemotePool, self).stopping()
if self.pool:
self.pool.terminate()
self.pool = None
[docs]
def aborting(self) -> None:
"""Aborting logic."""
super(RemotePool, self).aborting()
if self.pool:
self.pool.terminate()
self.pool = None
[docs]
def get_current_status_for_debug(self) -> List[str]:
"""
Gets ``Hosts`` and ``Workers`` infromation for debugging.
:return: Status information of Hosts and Workers.
"""
msgs = [f"Hosts and number of workers in {self.class_name}:"]
for host, number_of_workers in self.cfg.hosts.items():
msgs.append(
f"\t Host: {host}, Number of workers: {number_of_workers}"
)
msgs.extend(super().get_current_status_for_debug())
return msgs