Source code for testplan.runners.pools.remote

"""Remote worker pool module."""

import os
import signal
import socket
from multiprocessing import cpu_count
from multiprocessing.pool import ThreadPool
from typing import Callable, Dict, List, Optional, Tuple, Type, Union

from schema import Or

from testplan.common.config import ConfigOption
from testplan.common.remote.remote_resource import (
    RemoteResource,
    RemoteResourceConfig,
    UnboundRemoteResourceConfig,
)
from testplan.common.utils.logger import TESTPLAN_LOGGER
from testplan.common.utils.path import fix_home_prefix, rebase_path
from testplan.common.utils.remote import copy_cmd, ssh_cmd
from testplan.common.utils.timing import get_sleeper, wait

from .base import Pool, PoolConfig
from .communication import Message
from .connection import ZMQServer
from .process import ProcessWorker, ProcessWorkerConfig


[docs] class UnboundRemoteWorkerConfig( ProcessWorkerConfig, UnboundRemoteResourceConfig ): """ Configuration object for :py:class:`~testplan.runners.pools.remote.RemoteWorker` resource entity. """ # RemoteWorker will receive params passed to RemotePool # thus allow extra keys ignore_extra_keys = True
[docs] @classmethod def get_options(cls): """ Schema for options validation and assignment of default values. """ return {"workers": int, "pool_type": str}
[docs] class RemoteWorkerConfig(UnboundRemoteWorkerConfig, RemoteResourceConfig): pass
[docs] class RemoteWorker(ProcessWorker, RemoteResource): """ Remote worker resource that pulls tasks from the transport provided, executes them in a local pool of workers and sends back task results. :param pool_type: Child pool type that remote workers will use. can be ``thread`` or ``process``, default to ``thread`` if ``workers`` is 1 and otherwise ``process``. :param workers: Number of thread/process workers of the child pool, default to 1. Also inherits all :py:class:`~testplan.runners.pools.process.ProcessWorkerConfig` and :py:class:`~testplan.common.remote.remote_resource.RemoteResource` options. """ CONFIG = RemoteWorkerConfig def __init__(self, **options) -> None: if options["workers"] == 1: options["pool_type"] = "thread" super().__init__(**options) @property def host(self) -> str: return self.cfg.remote_host def _set_child_script(self) -> None: """Specify the remote worker executable file.""" self._child_paths.local = fix_home_prefix(self._child_path()) self._child_paths.remote = rebase_path( self._child_paths.local, self._testplan_import_path.local, self._testplan_import_path.remote, ) def _proc_cmd_impl(self) -> List[str]: cmd = [ self.python_binary, "-uB", self._child_paths.remote, "--index", str(self.cfg.index), "--address", self.transport.address, "--type", "remote_worker", "--log-level", str(TESTPLAN_LOGGER.getEffectiveLevel()), "--wd", self._working_dirs.remote, "--runpath", self._remote_resource_runpath, "--remote-pool-type", self.cfg.pool_type, "--remote-pool-size", str(self.cfg.workers), "--sys-path-file", self._remote_syspath_file, ] if self.parent.resource_monitor_address: cmd.extend( [ "--resource-monitor-server", self.parent.resource_monitor_address, ] ) return cmd def _proc_cmd(self) -> str: """Command to start child process.""" cmd = self._proc_cmd_impl() return self.cfg.ssh_cmd(self.ssh_cfg, " ".join(cmd)) def _write_syspath(self) -> None: """ Write our current sys.path to a file and transfer it to the remote host. """ super(RemoteWorker, self)._write_syspath( sys_path=self._remote_sys_path() ) self._remote_syspath_file = os.path.join( self._remote_plan_runpath, f"sys_path_{os.path.basename(self._syspath_file)}", ) self._transfer_data( source=self._syspath_file, target=self._remote_syspath_file, remote_target=True, ) self.logger.debug( "Transferred sys.path to remote host at: %s", self._remote_syspath_file, )
[docs] def pre_start(self) -> None: self.define_runpath() with self.timer.record("prepare remote"): self._prepare_remote() self._set_child_script() self._write_syspath()
[docs] def pre_stop(self) -> None: """Stop child process worker.""" with self.timer.record("fetch results"): self._fetch_results()
[docs] def post_stop(self) -> None: self._clean_remote()
def _wait_stopped(self, timeout: float = None) -> None: sleeper = get_sleeper(1, timeout) while next(sleeper): if self.status != self.status.STOPPED: self.logger.info("Waiting for workers to stop") else: self.post_stop() break else: msg = f"Not able to stop worker {self} after {timeout}s" self.logger.error(msg) raise RuntimeError(msg) def _rebase_assertion(self, result) -> None: if isinstance(result, dict) and "source_path" in result: result["source_path"] = rebase_path( result["source_path"], self._remote_plan_runpath, self._get_plan().runpath, ) else: entries = getattr(result, "entries", []) for entry in entries: self._rebase_assertion(entry)
[docs] def rebase_attachment(self, result) -> None: """Rebase the path of attachment from remote to local""" for attachment in result.report.attachments: attachment.source_path = rebase_path( attachment.source_path, self._remote_plan_runpath, self._get_plan().runpath, ) self._rebase_assertion(result.report)
[docs] def rebase_task_path(self, task) -> None: """Rebase the path of task from local to remote""" task.rebase_path( self._workspace_paths.local, self._workspace_paths.remote )
[docs] class RemotePoolConfig(PoolConfig): """ Configuration object for :py:class:`~testplan.runners.pools.remote.RemotePool` executor resource entity. """ # RemotePool is taking param that are to be passed to RemoteWorker # thus allow extra keys ignore_extra_keys = True default_hostname = socket.gethostbyname(socket.gethostname())
[docs] @classmethod def get_options(cls): """ Schema for options validation and assignment of default values. """ return { "hosts": dict, ConfigOption( "abort_signals", default=[signal.SIGINT, signal.SIGTERM] ): [int], ConfigOption("worker_type", default=RemoteWorker): object, ConfigOption("pool_type", default="process"): str, ConfigOption("host", default=cls.default_hostname): str, ConfigOption("port", default=0): int, ConfigOption("worker_heartbeat", default=30): Or(int, float, None), }
[docs] class RemotePool(Pool): """ Pool task executor object that initializes remote workers and dispatches tasks. :param name: Pool name. :param hosts: Map of host(ip): number of their local thread/process workers. i.e {'hostname1': 2, '10.147.XX.XX': 4} :param abort_signals: Signals to trigger abort logic. Default: INT, TERM. :param worker_type: Type of worker to be initialized. :param pool_type: Child pool type that remote workers will use. can be ``thread`` or ``process``, default to ``thread`` if ``workers`` is 1 and otherwise ``process``. :param host: Host that pool binds and listens for requests. Defaults to local hostname. :param port: Port that pool binds. Default: 0 (random) :param worker_heartbeat: Worker heartbeat period. :param ssh_port: The ssh port number of remote host, default is 22. :param ssh_cmd: callable that prefix a command with ssh binary and options :param copy_cmd: callable that returns the cmdline to do copy on remote host :param workspace: Current project workspace to be transferred, default is pwd. :param workspace_exclude: Patterns to exclude files when pushing workspace. :param remote_runpath: Root runpath on remote host, default is same as local (Linux->Linux) or /var/tmp/$USER/testplan/$plan_name (Window->Linux). :param testplan_path: Path to import testplan from on remote host, default is testplan_lib under remote_runpath :param remote_workspace: The path of the workspace on remote host, default is fetched_workspace under remote_runpath :param clean_remote: Deleted root runpath on remote at exit. :param push: Files and directories to push to the remote. :type push: ``list`` that contains ``str`` or ``tuple``: - ``str``: Name of the file or directory - ``tuple``: A (src, dst) pair :param push_exclude: Patterns to exclude files on push stage. :param delete_pushed: Deleted pushed files on remote at exit. :param fetch_runpath: The flag of fetch remote resource's runpath, default to True. :param fetch_runpath_exclude: Exclude files matching PATTERN. :param pull: Files and directories to be pulled from the remote at the end. :param pull_exclude: Patterns to exclude files on pull stage.. :param env: Environment variables to be propagated. :param setup_script: Script to be executed on remote as very first thing. Also inherits all :py:class:`~testplan.runner.pools.base.Pool` options. """ CONFIG = RemotePoolConfig CONN_MANAGER = ZMQServer QUEUE_WAIT_INTERVAL = 3 MAX_THREAD_POOL_SIZE = 5 def __init__( self, name: str, hosts: Dict[str, int], abort_signals: List[int] = None, worker_type: Type = RemoteWorker, pool_type: str = "process", host: str = CONFIG.default_hostname, port: int = 0, worker_heartbeat: float = 30, ssh_port: int = 22, ssh_cmd: Callable = ssh_cmd, copy_cmd: Callable = copy_cmd, workspace: str = None, workspace_exclude: List[str] = None, remote_runpath: str = None, testplan_path: str = None, remote_workspace: str = None, clean_remote: bool = False, push: List[Union[str, Tuple[str, str]]] = None, push_exclude: List[str] = None, delete_pushed: bool = False, fetch_runpath: bool = True, fetch_runpath_exclude: List[str] = None, pull: List[str] = None, pull_exclude: List[str] = None, env: Dict[str, str] = None, setup_script: List[str] = None, **options, ) -> None: self.pool = None options.update(self.filter_locals(locals())) super(RemotePool, self).__init__(**options) self._options = options # pass to remote worker later self._request_handlers[Message.MetadataPull] = ( self._worker_setup_metadata ) self._instances = {} for host, number_of_workers in self.cfg.hosts.items(): self._instances[host] = { "host": host, "number_of_workers": number_of_workers, } @property def resource_monitor_address(self) -> Optional[str]: if self.parent.resource_monitor_server: return self.parent.resource_monitor_server.address @staticmethod def _worker_setup_metadata( worker: RemoteWorker, _: Message, response: Message ) -> None: worker.respond( response.make(Message.Metadata, data=worker.setup_metadata) ) def _add_workers(self) -> None: """TODO.""" for instance in self._instances.values(): worker = self.cfg.worker_type( index=instance["host"], remote_host=instance["host"], workers=instance["number_of_workers"], **self._options, ) self.logger.debug("Created %s", worker) worker.parent = self worker.cfg.parent = self.cfg self._workers.add(worker, uid=instance["host"]) def _start_workers(self) -> None: """Start all workers of the pool""" for worker in self._workers: self._conn.register(worker) if self.pool: self._workers.start_in_pool(self.pool) else: self._workers.start() def _stop_workers(self) -> None: if self.pool: self._workers.stop_in_pool(self.pool) else: self._workers.stop() def _start_thread_pool(self) -> None: size = len(self._instances) try: if size > 2: self.pool = ThreadPool(min(size, cpu_count())) except Exception as exc: if isinstance(exc, AttributeError): self.logger.warning( "Please upgrade to the suggested python interpreter." ) def _early_stop_worker(self, worker: RemoteWorker) -> bool: if len(worker.assigned) != 0: return False worker_uid = worker.uid() with self._pool_lock: if self.pool and (worker_uid not in self._stopping_queue): alive_worker_ids = set( [worker.uid() for worker in self.worker_status["active"]] ) if self.RESERVE_WORKER_NUM + self.unassigned.size() < len( alive_worker_ids - set(self._stopping_queue) ): self.logger.user_info( "Early stop worker %s", self._workers[worker_uid] ) self._stopping_queue.append(worker_uid) self.pool.apply_async( self._workers.sync_stop_resource, (self._workers[worker_uid],), ) return True return False
[docs] def starting(self) -> None: self._start_thread_pool() super(RemotePool, self).starting()
[docs] def stopping(self) -> None: for worker in self._workers: if worker.status == worker.status.STARTING: try: wait( lambda: worker.status in (worker.STATUS.STARTED, worker.STATUS.STOPPED), worker.cfg.status_wait_timeout, ) except Exception: self.logger.error( "Timeout waiting for worker %s to quit starting " "while pool %s is stopping", worker, self.cfg.name, ) super(RemotePool, self).stopping() if self.pool: self.pool.terminate() self.pool = None
[docs] def aborting(self) -> None: """Aborting logic.""" super(RemotePool, self).aborting() if self.pool: self.pool.terminate() self.pool = None
[docs] def get_current_status_for_debug(self) -> List[str]: """ Gets ``Hosts`` and ``Workers`` infromation for debugging. :return: Status information of Hosts and Workers. """ msgs = [f"Hosts and number of workers in {self.class_name}:"] for host, number_of_workers in self.cfg.hosts.items(): msgs.append( f"\t Host: {host}, Number of workers: {number_of_workers}" ) msgs.extend(super().get_current_status_for_debug()) return msgs