Source code for testplan.runners.pools.remote

"""Remote worker pool module."""

import copy
import os
import signal
import socket
from multiprocessing import cpu_count
from multiprocessing.pool import ThreadPool
from typing import Any, Callable, Dict, List, Optional, Tuple, Type, Union

from schema import Or

from testplan.common.config import ConfigOption
from testplan.common.remote.remote_resource import (
    RemoteResource,
    RemoteResourceConfig,
    UnboundRemoteResourceConfig,
)
from testplan.common.remote.remote_runtime import RuntimeBuilder
from testplan.common.utils.logger import TESTPLAN_LOGGER
from testplan.common.utils.observability import tracing
from testplan.common.utils.path import fix_home_prefix, rebase_path
from testplan.common.utils.process import kill_process
from testplan.common.utils.remote import copy_cmd, ssh_cmd
from testplan.common.utils.timing import get_sleeper, wait

from testplan.testing.base import TestResult

from .base import Pool, PoolConfig
from .communication import Message
from .connection import ZMQServer
from .process import ProcessWorker, ProcessWorkerConfig
from .tasks import Task


[docs] class UnboundRemoteWorkerConfig( ProcessWorkerConfig, UnboundRemoteResourceConfig ): """ Configuration object for :py:class:`~testplan.runners.pools.remote.RemoteWorker` resource entity. """ # RemoteWorker will receive params passed to RemotePool # thus allow extra keys ignore_extra_keys = True
[docs] @classmethod def get_options(cls) -> Dict[Any, Any]: """ Schema for options validation and assignment of default values. """ return {"workers": int, "pool_type": str}
[docs] class RemoteWorkerConfig(UnboundRemoteWorkerConfig, RemoteResourceConfig): pass
[docs] class RemoteWorker(ProcessWorker, RemoteResource): """ Remote worker resource that pulls tasks from the transport provided, executes them in a local pool of workers and sends back task results. :param pool_type: Child pool type that remote workers will use. can be ``thread`` or ``process``, default to ``thread`` if ``workers`` is 1 and otherwise ``process``. :param workers: Number of thread/process workers of the child pool, default to 1. Also inherits all :py:class:`~testplan.runners.pools.process.ProcessWorkerConfig` and :py:class:`~testplan.common.remote.remote_resource.RemoteResource` options. """ CONFIG = RemoteWorkerConfig def __init__(self, **options: Any) -> None: if options["workers"] == 1: options["pool_type"] = "thread" super().__init__(**options) @property def host(self) -> str: return self.cfg.remote_host # type: ignore[no-any-return] def _set_child_script(self) -> None: """Specify the remote worker executable file.""" # XXX: we shouldn't always need full path of child.py # XXX: child_via_mod self._child_paths.local = fix_home_prefix(self._child_path()) self._child_paths.remote = rebase_path( self._child_paths.local, self._testplan_import_path.local, # type: ignore[arg-type] self._testplan_import_path.remote, # type: ignore[arg-type] ) def _proc_cmd_impl(self) -> List[str]: cmd = [ self.remote_python_bin, "-uB", self._child_paths.remote, "--index", str(self.cfg.index), "--address", self.transport.address, # type: ignore[attr-defined] "--type", "remote_worker", "--log-level", str(TESTPLAN_LOGGER.getEffectiveLevel()), "--wd", self._working_dirs.remote, "--runpath", self._remote_resource_runpath, "--remote-pool-type", self.cfg.pool_type, "--remote-pool-size", str(self.cfg.workers), "--sys-path-file", self._remote_syspath_file, ] if self.otel_traces and tracing._get_traceparent(): cmd.extend(["--otel-traceparent", tracing._get_traceparent()]) if self.otel_logs: cmd.append("--otel-logs") if self.parent is None: raise RuntimeError("self.parent must not be None") if self.parent.resource_monitor_address: # type: ignore[attr-defined] cmd.extend( [ "--resource-monitor-server", self.parent.resource_monitor_address, # type: ignore[attr-defined] ] ) return cmd def _proc_cmd(self) -> str: # type: ignore[override] """Command to start child process.""" cmd = self._proc_cmd_impl() return self.cfg.ssh_cmd(self.ssh_cfg, " ".join(cmd)) # type: ignore[no-any-return] def _write_syspath(self) -> None: # type: ignore[override] """ Write our current sys.path to a file and transfer it to the remote host. """ super(RemoteWorker, self)._write_syspath( sys_path=self._remote_sys_path() ) self._remote_syspath_file = os.path.join( self._remote_plan_runpath, f"sys_path_{os.path.basename(self._syspath_file)}", ) self._transfer_data( source=self._syspath_file, target=self._remote_syspath_file, remote_target=True, ) self.logger.debug( "Transferred sys.path to remote host at: %s", self._remote_syspath_file, )
[docs] def pre_start(self) -> None: self.make_runpath_dirs() with self.timer.record("prepare remote"): self._prepare_remote() self._set_child_script() self._write_syspath()
[docs] def pre_stop(self) -> None: """Stop child process worker.""" with self.timer.record("fetch results"): self._fetch_results()
[docs] def stopping(self) -> None: """ Stop the ssh process. Wait for child.py to naturally exit after cleanup, with timeout fallback. """ if not self._handler: return # First, wait for the SSH process to exit naturally # This allows child.py to complete its cleanup (stopping child workers, etc.) self.logger.debug( "Waiting for remote worker %s to exit naturally (timeout: %ss)", self.cfg.index, self.cfg.stop_timeout, ) sleeper = get_sleeper( interval=(0.1, 0.5), timeout=self.cfg.stop_timeout, ) while next(sleeper): retcode = self._handler.poll() if retcode is not None: self.logger.debug( "Remote worker %s exited naturally with code %d", self.cfg.index, retcode, ) return # Timeout reached - child.py didn't exit gracefully # Fall back to force-killing the SSH process self.logger.warning( "Remote worker %s did not exit within %ss, force-killing SSH process", self.cfg.index, self.cfg.stop_timeout, ) kill_process(self._handler, timeout=1)
[docs] def post_stop(self) -> None: self._clean_remote()
def _rebase_assertion(self, result: Any) -> None: if isinstance(result, dict) and "source_path" in result: result["source_path"] = rebase_path( result["source_path"], self._remote_plan_runpath, self._get_plan().runpath, ) else: entries = getattr(result, "entries", []) for entry in entries: self._rebase_assertion(entry)
[docs] def rebase_attachment(self, result: TestResult) -> None: """Rebase the path of attachment from remote to local""" for attachment in result.report.attachments: # type: ignore[attr-defined] attachment.source_path = rebase_path( attachment.source_path, self._remote_plan_runpath, self._get_plan().runpath, ) self._rebase_assertion(result.report)
[docs] def rebase_task_path(self, task: Task) -> None: """Rebase the path of task from local to remote""" task.rebase_path( self._workspace_paths.local, # type: ignore[arg-type] self._workspace_paths.remote, # type: ignore[arg-type] )
[docs] class RemotePoolConfig(PoolConfig): """ Configuration object for :py:class:`~testplan.runners.pools.remote.RemotePool` executor resource entity. """ # RemotePool is taking param that are to be passed to RemoteWorker # thus allow extra keys ignore_extra_keys = True default_hostname = socket.gethostbyname(socket.gethostname())
[docs] @classmethod def get_options(cls) -> Dict[Any, Any]: """ Schema for options validation and assignment of default values. """ return { "hosts": dict, ConfigOption( "abort_signals", default=[signal.SIGINT, signal.SIGTERM] ): [int], ConfigOption("worker_type", default=RemoteWorker): object, ConfigOption("pool_type", default="process"): str, ConfigOption("host", default=cls.default_hostname): str, ConfigOption("port", default=0): int, ConfigOption("worker_heartbeat", default=30): Or(int, float, None), }
[docs] class RemotePool(Pool): """ Pool task executor object that initializes remote workers and dispatches tasks. :param name: Pool name. :param hosts: Map of host(ip): number of their local thread/process workers. i.e {'hostname1': 2, '10.147.XX.XX': 4} :param abort_signals: Signals to trigger abort logic. Default: INT, TERM. :param worker_type: Type of worker to be initialized. :param pool_type: Child pool type that remote workers will use. can be ``thread`` or ``process``, default to ``thread`` if ``workers`` is 1 and otherwise ``process``. :param host: Host that pool binds and listens for requests. Defaults to local hostname. :param port: Port that pool binds. Default: 0 (random) :param worker_heartbeat: Worker heartbeat period. :param ssh_port: The ssh port number of remote host, default is 22. :param ssh_cmd: callable that prefix a command with ssh binary and options :param copy_cmd: callable that returns the cmdline to do copy on remote host :param workspace: Current project workspace to be transferred, default is pwd. :param workspace_exclude: Patterns to exclude files when pushing workspace. :param remote_runpath: Root runpath on remote host, default is same as local (Linux->Linux) or /var/tmp/$USER/testplan/$plan_name (Window->Linux). :param remote_workspace: The path of the workspace on remote host, default is fetched_workspace under remote_runpath :param clean_remote: Deleted root runpath on remote at exit. :param push: Files and directories to push to the remote. :type push: ``list`` that contains ``str`` or ``tuple``: - ``str``: Name of the file or directory - ``tuple``: A (src, dst) pair :param push_exclude: Patterns to exclude files on push stage. :param delete_pushed: Deleted pushed files on remote at exit. :param fetch_runpath: The flag of fetch remote resource's runpath, default to True. :param fetch_runpath_exclude: Exclude files matching PATTERN. :param pull: Files and directories to be pulled from the remote at the end. :param pull_exclude: Patterns to exclude files on pull stage.. :param env: Environment variables to be propagated. :param setup_script: Script to be executed on remote as very first thing. :param paramiko_config: Paramiko SSH client extra configuration. :param remote_runtime_builder: RuntimeBuilder instance to prepare remote python env. Default is ``SourceTransferBuilder()``. Also inherits all :py:class:`~testplan.runner.pools.base.Pool` options. """ CONFIG = RemotePoolConfig CONN_MANAGER = ZMQServer # type: ignore[assignment] QUEUE_WAIT_INTERVAL = 3 MAX_THREAD_POOL_SIZE = 5 def __init__( self, name: str, hosts: Dict[str, int], abort_signals: Optional[List[int]] = None, worker_type: Type = RemoteWorker, pool_type: str = "process", host: str = CONFIG.default_hostname, port: int = 0, worker_heartbeat: float = 30, ssh_port: int = 22, ssh_cmd: Callable = ssh_cmd, copy_cmd: Callable = copy_cmd, workspace: Optional[str] = None, workspace_exclude: Optional[List[str]] = None, remote_runpath: Optional[str] = None, remote_workspace: Optional[str] = None, clean_remote: bool = False, push: Optional[List[Union[str, Tuple[str, str]]]] = None, push_exclude: Optional[List[str]] = None, delete_pushed: bool = False, fetch_runpath: bool = True, fetch_runpath_exclude: Optional[List[str]] = None, pull: Optional[List[str]] = None, pull_exclude: Optional[List[str]] = None, env: Optional[Dict[str, str]] = None, setup_script: Optional[List[str]] = None, paramiko_config: Optional[dict] = None, remote_runtime_builder: Optional[RuntimeBuilder] = None, **options: Any, ) -> None: self.pool: Optional[ThreadPool] = None options.update(self.filter_locals(locals())) super(RemotePool, self).__init__(**options) self._options = options # pass to remote worker later self._request_handlers[Message.MetadataPull] = ( self._worker_setup_metadata # type: ignore[assignment] ) self._instances: Dict[str, Dict[str, Any]] = {} for host, number_of_workers in self.cfg.hosts.items(): self._instances[host] = { "host": host, "number_of_workers": number_of_workers, } @property def resource_monitor_address(self) -> Optional[str]: if self.parent.resource_monitor_server: # type: ignore[union-attr] return self.parent.resource_monitor_server.address # type: ignore[union-attr, no-any-return] return None @staticmethod def _worker_setup_metadata( worker: RemoteWorker, _: Message, response: Message ) -> None: worker.respond( response.make(Message.Metadata, data=worker.setup_metadata) ) def _add_workers(self) -> None: rtb = self._options.get("remote_runtime_builder") for instance in self._instances.values(): worker = self.cfg.worker_type( index=instance["host"], remote_host=instance["host"], workers=instance["number_of_workers"], **{ **self._options, "remote_runtime_builder": copy.deepcopy(rtb), }, ) self.logger.debug("Created %s", worker) worker.parent = self worker.cfg.parent = self.cfg self._workers.add(worker, uid=instance["host"]) def _start_workers(self) -> None: """Start all workers of the pool""" for worker in self._workers: self._conn.register(worker) # type: ignore[arg-type] if self.pool: self._workers.start_in_pool( self.pool, self.cfg.status_wait_timeout, ) else: self._workers.start() def _stop_workers(self) -> None: if self.pool: self._workers.stop_in_pool( self.pool, self.cfg.status_wait_timeout, ) else: self._workers.stop() def _start_thread_pool(self) -> None: size = len(self._instances) try: if size >= 2: self.pool = ThreadPool(min(size, cpu_count())) except Exception as exc: if isinstance(exc, AttributeError): self.logger.warning( "Please upgrade to the suggested python interpreter." ) def _early_stop_worker(self, worker: RemoteWorker) -> bool: # type: ignore[override] if len(worker.assigned) != 0: return False worker_uid: str = worker.uid() # type: ignore[assignment] with self._pool_lock: if self.pool and (worker_uid not in self._stopping_queue): alive_worker_ids = set( [worker.uid() for worker in self.worker_status["active"]] ) if self.RESERVE_WORKER_NUM + self.unassigned.size() < len( alive_worker_ids - set(self._stopping_queue) ): self.logger.user_info( "Early stop worker %s", self._workers[worker_uid] ) self._stopping_queue.append(worker_uid) self.pool.apply_async( self._workers.sync_stop_resource, (self._workers[worker_uid],), ) return True return False
[docs] def starting(self) -> None: self._start_thread_pool() super(RemotePool, self).starting()
[docs] def stopping(self) -> None: for worker in self._workers: if worker.status == worker.status.STARTING: # type: ignore[attr-defined] try: wait( lambda: ( worker.status in (worker.STATUS.STARTED, worker.STATUS.STOPPED) ), worker.cfg.status_wait_timeout, ) except Exception: self.logger.error( "Timeout waiting for worker %s to quit starting " "while pool %s is stopping", worker, self.cfg.name, ) super(RemotePool, self).stopping() if self.pool: self.pool.terminate() self.pool = None
[docs] def aborting(self) -> None: """Aborting logic.""" super(RemotePool, self).aborting() if self.pool: self.pool.terminate() self.pool = None
[docs] def get_current_status_for_debug(self) -> List[str]: """ Gets ``Hosts`` and ``Workers`` infromation for debugging. :return: Status information of Hosts and Workers. """ msgs = [f"Hosts and number of workers in {self.class_name}:"] for host, number_of_workers in self.cfg.hosts.items(): msgs.append( f"\t Host: {host}, Number of workers: {number_of_workers}" ) msgs.extend(super().get_current_status_for_debug()) return msgs