Source code for testplan.testing.multitest.driver.app

"""Generic application driver."""

import datetime
import os
import platform
import shutil
import signal
import socket
import subprocess
import time
import uuid
import warnings
from typing import Dict, List, Optional, Union

try:
    from typing import Literal
except ImportError:
    from typing_extensions import Literal

import psutil
from schema import Or, Use

from testplan.common.config import ConfigOption
from testplan.common.entity import ActionResult
from testplan.common.utils.context import (
    ContextValue,
    expand,
    is_context,
    render,
)
from testplan.common.utils.documentation_helper import emphasized
from testplan.common.utils.match import LogMatcher
from testplan.common.utils.path import StdFiles, archive, makedirs
from testplan.common.utils.process import (
    kill_process,
    subprocess_popen,
    kill_proc_and_child_procs,
)
from testplan.common.utils.timing import TimeoutException, TimeoutExceptionInfo

from testplan.testing.multitest.driver.base import Driver, DriverConfig
from testplan.testing.multitest.driver.connection import (
    SubprocessFileConnectionExtractor,
    SubprocessPortConnectionExtractor,
)

IS_WIN = platform.system() == "Windows"


[docs] class OrphanedProcessException(Exception): """ Exception raised when there are orphaned processes after stopping the driver. """ def __init__(self, driver, procs): self.driver = driver self.procs = procs super().__init__(f"Orphaned processes detected: {procs} for {driver}")
[docs] class AppConfig(DriverConfig): """ Configuration object for :py:class:`~testplan.testing.multitest.driver.app.App` resource. """
[docs] @classmethod def get_options(cls): """ Schema for options validation and assignment of default values. """ return { "binary": str, ConfigOption("pre_args", default=None): Or(None, list), ConfigOption("args", default=None): Or(None, list), ConfigOption("shell", default=False): bool, ConfigOption("env", default=None): Or(None, dict), ConfigOption("binary_strategy", default="link"): lambda s: s in ("copy", "link", "noop"), ConfigOption("logname", default=None): Or(None, str), ConfigOption("app_dir_name", default=None): Or(None, str), ConfigOption("working_dir", default=None): Or(None, str), ConfigOption("expected_retcode", default=None): int, ConfigOption("stop_signal", default=None): Or( None, signal.Signals ), ConfigOption("stop_timeout", default=5): Use(float), ConfigOption("binary_log", default=False): bool, }
ArgsType = Union[List[Union[str, ContextValue]], str, ContextValue]
[docs] class App(Driver): """ Binary application driver. {emphasized_members_docs} :param name: Driver name. Also uid. :param binary: Path to the application binary. :param pre_args: Arguments to be prepended to binary command. An argument can be a :py:class:`~testplan.common.utils.context.ContextValue` and will be expanded on runtime. :param args: Arguments to be appended to binary command. An argument can be a :py:class:`~testplan.common.utils.context.ContextValue` and will be expanded on runtime. :param shell: Invoke shell for command execution. :param env: Environmental variables to be made available to child process; context value (when referring to other driver) and jinja2 template (when referring to self) will be resolved. :param binary_strategy: Whether to copy / link binary to runpath. :param logname: Base name of driver logfile under `app_path`, in which Testplan will look for `log_regexps` as driver start-up condition. Default to "stdout" (to match the output stream of binary). :param app_dir_name: Application directory name. :param working_dir: Application working directory. Default: runpath :param expected_retcode: the expected return code of the subprocess. Default value is None meaning it won't be checked. Set it to 0 to ennsure the driver is always gracefully shut down. :param stop_signal: Shutdown signal to be sent to subprocess. :param stop_timeout: Timeout for graceful shutdown (in seconds). :param binary_log: if `True` the log_matcher will handle the logfile as binary, and need to use binary regexps. Default value is `False`. Also inherits all :py:class:`~testplan.testing.multitest.driver.base.Driver` options. """ CONFIG = AppConfig EXTRACTORS = [ SubprocessFileConnectionExtractor(), SubprocessPortConnectionExtractor(), ] def __init__( self, name: str, binary: str, pre_args: ArgsType = None, args: ArgsType = None, shell: bool = False, env: Dict[str, str] = None, binary_strategy: Literal["copy", "link", "noop"] = "link", logname: str = None, app_dir_name: str = None, working_dir: str = None, expected_retcode: int = None, stop_signal: Optional[signal.Signals] = None, stop_timeout: float = 5, binary_log: bool = False, **options, ) -> None: options.update(self.filter_locals(locals())) # ``sigint_timeout`` is deprecated if "sigint_timeout" in options: options["stop_timeout"] = options.pop("sigint_timeout") warnings.warn( "``sigint_timeout`` argument is deprecated, " "please use ``stop_timeout`` instead.", DeprecationWarning, ) super(App, self).__init__(**options) self.proc = None self.std = None self._binary = None self._binpath: str = None self._etcpath: str = None self._retcode = None self._log_matcher = None self._resolved_bin = None self._env = None self._alive_child_procs = [] # for orphaned procs elimination @emphasized @property def pid(self) -> Optional[int]: """ Return pid of the child process if available, ``None`` otherwise. """ if self.proc: return self.proc.pid else: return None @property def retcode(self) -> Optional[int]: """ Return return code of the app process or ``None``. """ if self._retcode is None: if self.proc: self._retcode = self.proc.poll() return self._retcode @property def alive_child_procs(self) -> List[psutil.Process]: _, self._alive_child_procs = psutil.wait_procs( self._alive_child_procs, timeout=0 ) return self._alive_child_procs @emphasized @property def cmd(self) -> str: """Command that starts the application.""" args = self.cfg.args or [] pre_args = self.cfg.pre_args or [] cmd = [] cmd.extend(pre_args) cmd.append(self.binary) cmd.extend(args) cmd = [ expand(arg, self.context, str) if is_context(arg) else arg for arg in cmd ] return cmd @emphasized @property def env(self) -> Optional[Dict[str, str]]: """Environment variables.""" if self._env: return self._env if isinstance(self.cfg.env, dict): ctx = self.context_input(exclude=["env"]) self._env = { key: expand(val, self.context, str) if is_context(val) # allowing None val for child class use case else (render(val, ctx) if val is not None else None) for key, val in self.cfg.env.items() } return self._env @emphasized @property def logname(self) -> str: """Configured logname.""" return self.cfg.logname @emphasized @property def logpath(self) -> str: """Path for log regex matching.""" return ( os.path.join(self.app_path, self.logname) if self.logname else self.outpath ) @emphasized @property def outpath(self) -> str: """Path for stdout file regex matching.""" return self.std.out_path @emphasized @property def errpath(self) -> str: """Path for stderr file regex matching.""" return self.std.err_path @emphasized @property def app_path(self) -> str: """Application directory path.""" if self.cfg.app_dir_name: return os.path.join(self.runpath, self.cfg.app_dir_name) return self.runpath @emphasized @property def binpath(self) -> str: """'bin' directory under runpath.""" return self._binpath @emphasized @property def binary(self) -> str: """The actual binary to execute, might be copied/linked to runpath""" if self._binary: return self._binary if os.path.isfile(self.resolved_bin): if self.cfg.path_cleanup is True: name = os.path.basename(self.cfg.binary) else: name = "{}-{}".format( os.path.basename(self.cfg.binary), uuid.uuid4() ) target = os.path.join(self.binpath, name) if self.cfg.binary_strategy == "copy": shutil.copyfile(self.resolved_bin, target) shutil.copymode(self.resolved_bin, target) self._binary = target elif self.cfg.binary_strategy == "link" and not IS_WIN: os.symlink(os.path.abspath(self.resolved_bin), target) self._binary = target # else binary_strategy is noop then we don't do anything else: self._binary = self.resolved_bin else: self._binary = self.resolved_bin return self._binary @emphasized @property def etcpath(self) -> str: """'etc' directory under runpath.""" return self._etcpath @property def log_matcher(self) -> LogMatcher: """ Create if not exist and return the LogMatcher object that reads the log / stdout of the driver. :return: LogMatcher instance """ if not self._log_matcher: self._log_matcher = LogMatcher(self.logpath, self.cfg.binary_log) return self._log_matcher @property def resolved_bin(self) -> str: """Resolved binary path from self.cfg.binary""" if not self._resolved_bin: self._resolved_bin = self._prepare_binary() return self._resolved_bin def _prepare_binary(self) -> str: """prepare binary path, override for more sophisticated binary discover""" return self.cfg.binary @property def hostname(self) -> str: """ :return: hostname where the ETSApp is running """ return socket.gethostname()
[docs] def starting(self) -> None: """Starts the application binary.""" super(App, self).starting() cmd = " ".join(self.cmd) if self.cfg.shell else self.cmd cwd = self.cfg.working_dir or self.runpath try: self.logger.info( "%(driver)s driver command: %(cmd)s\n" "\tRunpath: %(runpath)s\n" "\tOut file: %(out)s\n" "\tErr file: %(err)s\n", { "driver": self, "cmd": cmd, "runpath": self.runpath, "out": self.std.out_path, "err": self.std.err_path, }, ) self._retcode = None self.proc = subprocess_popen( cmd, shell=self.cfg.shell, stdin=subprocess.PIPE, stdout=self.std.out, stderr=self.std.err, cwd=cwd, env=self.env, ) except Exception: self.logger.error( "Error while %s driver executed command: %s", self, cmd if self.cfg.shell else " ".join(cmd), ) self.aborting() raise
[docs] def started_check(self) -> ActionResult: """ Predicate indicating whether a binary in a subprocess has started. Tests whether the return code is zero if the underlying binary has finished execution, otherwise tests if user-specified pattern exists in driver logs. """ proc_result = self.proc.poll() extract_values_result = self.extract_values() if proc_result is not None and not extract_values_result: raise RuntimeError( f"{self} has unexpectedly stopped with: {proc_result}" ) return extract_values_result
@property def stop_timeout(self) -> float: return self.cfg.stop_timeout
[docs] def stopping(self) -> None: """Stops the application binary process.""" if self.proc is not None and self.retcode is None: # take a snapshot of all child procs # so that we can check if they all terminate later self._alive_child_procs = psutil.Process(self.pid).children( recursive=True ) if self.cfg.stop_signal is None: self.proc.terminate() else: self.proc.send_signal(self.cfg.stop_signal) super().stopping()
[docs] def stopped_check(self) -> ActionResult: if self.proc is None: return True if self.retcode is None: return False if self.alive_child_procs: raise OrphanedProcessException(self, self._alive_child_procs) return True
[docs] def stopped_check_with_watch(self, watch) -> ActionResult: def log_fn(exc): self.logger.warning("While killing driver %s: %s", self, exc) if time.time() >= watch.start_time + watch.total_wait: # not raising a timeout here self.logger.warning( "%s still alive after %d seconds, killing", self, self.stop_timeout, ) kill_proc_and_child_procs( self.proc, self.alive_child_procs, log_fn ) if watch.should_check(): try: return self.stopped_check() except OrphanedProcessException as exc: self.logger.warning(exc) kill_proc_and_child_procs( self.proc, self.alive_child_procs, log_fn ) return True return False
def _wait_stopped(self, timeout: Optional[float] = None) -> None: # for App driver and its inheritance, always force stop def log_fn(exc): self.logger.warning("While killing driver %s: %s", self, exc) try: super()._wait_stopped(timeout) except TimeoutException: self.logger.warning( "%s still alive after %d seconds, killing", self, self.stop_timeout, ) kill_proc_and_child_procs( self.proc, self.alive_child_procs, log_fn ) self._mark_stopped() except OrphanedProcessException as exc: self.logger.warning(exc) kill_proc_and_child_procs( self.proc, self.alive_child_procs, log_fn ) self._mark_stopped() # other exceptions gets propagated
[docs] def post_stop(self): rc = self.retcode self.proc = None if self.std: self.std.close() # reset env, binary etc. as they need re-eval in case of restart self._env = None self._binary = None self._log_matcher = None if (self.cfg.expected_retcode is not None) and ( self.cfg.expected_retcode != rc ): err_msg = ( f"App driver error: {self}," f" expected return code is {self.cfg.expected_retcode}," f" but actual return code is {rc}" ) raise RuntimeError(err_msg)
[docs] def make_runpath_dirs(self) -> None: """ Create mandatory directories and install files from given templates using the drivers context before starting the application binary. """ super(App, self).make_runpath_dirs() bin_dir = os.path.join(self.runpath, "bin") etc_dir = os.path.join(self.runpath, "etc") for directory in (bin_dir, etc_dir, self.app_path): makedirs(directory) self._binpath = bin_dir self._etcpath = etc_dir self.std = StdFiles(self.app_path)
def _install_target(self) -> str: return self.etcpath
[docs] def restart(self, clean: bool = True) -> None: """ Stop the driver, archive the app_dir or rename std/log, and then restart the driver. :param clean: if set to ``True``, perform a 'clean' restart where all persistence is deleted, else a normal restart. """ self.stop() if self.async_start: self.wait(self.status.STOPPED) if clean: self._move_app_path() else: self._move_std_and_logs() # we don't want to cleanup runpath during restart path_cleanup = self.cfg.path_cleanup self.cfg._options["path_cleanup"] = False self.start() if self.async_start: self.wait(self.status.STARTED) self.cfg._options["path_cleanup"] = path_cleanup
def _move_app_path(self) -> None: """ Move app_path directory to an archive location """ snapshot_path = self.app_path + datetime.datetime.now().strftime( "_%Y%m%d_%H%M%S" ) shutil.move(self.app_path, snapshot_path) os.makedirs(self.app_path) def _move_std_and_logs(self) -> None: """ Rename std and log files """ timestamp = datetime.datetime.now().strftime("_%Y%m%d_%H%M%S") for file in (self.outpath, self.errpath, self.logpath): if os.path.isfile(file): archive(file, timestamp)
[docs] def aborting(self) -> None: """Abort logic to force kill the child binary.""" if self.proc: self.logger.info( "Killing process id %s of %s", self.proc.pid, self ) kill_process(self.proc, self.stop_timeout, self.cfg.stop_signal) self.proc = None if self.std: self.std.close()