Source code for testplan.runnable.base

"""Tests runner module."""
import datetime
import inspect
import math
import os
import random
import re
import time
import uuid
import webbrowser
from collections import OrderedDict
from dataclasses import dataclass
from typing import (
    Any,
    Callable,
    Collection,
    Dict,
    List,
    MutableMapping,
    Optional,
    Pattern,
    Tuple,
    Union,
)

import pytz
from schema import And, Or, Use

from testplan import defaults
from testplan.common.config import ConfigOption
from testplan.common.entity import (
    Resource,
    Runnable,
    RunnableConfig,
    RunnableResult,
    RunnableStatus,
)
from testplan.common.exporters import BaseExporter, ExportContext, run_exporter
from testplan.common.remote.remote_service import RemoteService
from testplan.common.report import MergeError
from testplan.monitor.resource import (
    ResourceMonitorServer,
    ResourceMonitorClient,
)
from testplan.common.utils import logger, strings
from testplan.common.utils.package import import_tmp_module
from testplan.common.utils.path import default_runpath, makedirs, makeemptydirs
from testplan.common.utils.selector import Expr as SExpr
from testplan.common.utils.selector import apply_single
from testplan.environment import EnvironmentCreator, Environments
from testplan.exporters import testing as test_exporters
from testplan.exporters.testing.base import Exporter
from testplan.report import (
    ReportCategories,
    Status,
    TestGroupReport,
    TestReport,
)
from testplan.report.filter import ReportingFilter
from testplan.report.testing.styles import Style
from testplan.runnable.interactive import TestRunnerIHandler
from testplan.runners.base import Executor
from testplan.runners.pools.base import Pool
from testplan.runners.pools.tasks import Task, TaskResult
from testplan.runners.pools.tasks.base import (
    TaskTargetInformation,
    get_task_target_information,
    is_task_target,
)
from testplan.testing import common, filtering, listing, ordering, tagging
from testplan.testing.base import Test, TestResult
from testplan.testing.listing import Lister
from testplan.testing.multitest import MultiTest

TestTask = Union[Test, Task, Callable]


[docs]@dataclass class TaskInformation: target: TestTask materialized_test: Test uid: str
[docs]def get_exporters(values): """ Validation function for exporter declarations. :param values: Single or a list of exporter declaration(s). :return: List of initialized exporter objects. """ def get_exporter(value): if isinstance(value, BaseExporter): return value elif isinstance(value, tuple): exporter_cls, params = value return exporter_cls(**params) raise TypeError("Invalid exporter value: {}".format(value)) if values is None: return [] elif isinstance(values, list): return [get_exporter(v) for v in values] return [get_exporter(values)]
[docs]def result_for_failed_task(original_result): """ Create a new result entry for invalid result retrieved from a resource. """ result = TestResult() result.report = TestGroupReport( name=str(original_result.task), category=ReportCategories.ERROR ) attrs = [attr for attr in original_result.task.serializable_attrs] result_lines = [ "{}: {}".format(attr, getattr(original_result.task, attr)) if getattr(original_result.task, attr, None) else "" for attr in attrs ] result.report.logger.error( os.linesep.join([line for line in result_lines if line]) ) result.report.logger.error(original_result.reason) result.report.status_override = Status.ERROR return result
[docs]def validate_lines(d: dict) -> bool: for v in d.values(): if not ( isinstance(v, list) and all(map(lambda x: isinstance(x, int), v)) ) and not (isinstance(v, str) and v.strip() == "*"): raise ValueError( f'Unexpected value "{v}" of type {type(v)} for lines, ' 'list of integer or string literal "*" expected.' ) return True
[docs]class TestRunnerConfig(RunnableConfig): """ Configuration object for :py:class:`~testplan.runnable.TestRunner` runnable object. """ ignore_extra_keys = True
[docs] @classmethod def get_options(cls): return { "name": str, ConfigOption("description", default=None): Or(str, None), ConfigOption("logger_level", default=logger.USER_INFO): int, ConfigOption("file_log_level", default=logger.DEBUG): int, ConfigOption("runpath", default=default_runpath): Or( None, str, lambda x: callable(x) ), ConfigOption("path_cleanup", default=True): bool, ConfigOption("all_tasks_local", default=False): bool, ConfigOption( "shuffle", default=[] ): list, # list of string choices ConfigOption( "shuffle_seed", default=float(random.randint(1, 9999)) ): float, ConfigOption("exporters", default=None): Use(get_exporters), ConfigOption("stdout_style", default=defaults.STDOUT_STYLE): Style, ConfigOption("report_dir", default=defaults.REPORT_DIR): Or( str, None ), ConfigOption("xml_dir", default=None): Or(str, None), ConfigOption("pdf_path", default=None): Or(str, None), ConfigOption("json_path", default=None): Or(str, None), ConfigOption("http_url", default=None): Or(str, None), ConfigOption("pdf_style", default=defaults.PDF_STYLE): Style, ConfigOption("report_tags", default=[]): [ Use(tagging.validate_tag_value) ], ConfigOption("report_tags_all", default=[]): [ Use(tagging.validate_tag_value) ], ConfigOption("merge_scheduled_parts", default=False): bool, ConfigOption("browse", default=False): bool, ConfigOption("ui_port", default=None): Or(None, int), ConfigOption( "web_server_startup_timeout", default=defaults.WEB_SERVER_TIMEOUT, ): int, ConfigOption( "test_filter", default=filtering.Filter() ): filtering.BaseFilter, ConfigOption( "test_sorter", default=ordering.NoopSorter() ): ordering.BaseSorter, # Test lister is None by default, otherwise Testplan would # list tests, not run them ConfigOption("test_lister", default=None): Or( None, listing.BaseLister, listing.MetadataBasedLister ), ConfigOption("test_lister_output", default=None): Or(str, None), ConfigOption("verbose", default=False): bool, ConfigOption("debug", default=False): bool, ConfigOption("timeout", default=defaults.TESTPLAN_TIMEOUT): Or( None, And(int, lambda t: t >= 0) ), # active_loop_sleep impacts cpu usage in interactive mode ConfigOption("active_loop_sleep", default=0.05): float, ConfigOption( "interactive_handler", default=TestRunnerIHandler ): object, ConfigOption("extra_deps", default=[]): [ Or(str, lambda x: inspect.ismodule(x)) ], ConfigOption("label", default=None): Or(None, str), ConfigOption("tracing_tests", default=None): Or( And(dict, validate_lines), None, ), ConfigOption("tracing_tests_output", default="-"): str, ConfigOption("resource_monitor", default=False): bool, ConfigOption("reporting_filter", default=None): Or( And(str, Use(ReportingFilter.parse)), None ), ConfigOption("xfail_tests", default=None): Or(dict, None), ConfigOption("runtime_data", default={}): Or(dict, None), ConfigOption( "auto_part_runtime_limit", default=defaults.AUTO_PART_RUNTIME_LIMIT, ): Or(int, float), ConfigOption( "plan_runtime_target", default=defaults.PLAN_RUNTIME_TARGET ): Or(int, float), ConfigOption( "skip_strategy", default=common.SkipStrategy.noop() ): Use(common.SkipStrategy.from_option_or_none), }
[docs]class TestRunnerStatus(RunnableStatus): """ Status of a :py:class:`TestRunner <testplan.runnable.TestRunner>` runnable object. """
[docs]class TestRunnerResult(RunnableResult): """ Result object of a :py:class:`TestRunner <testplan.runnable.TestRunner>` runnable object. """ def __init__(self): super(TestRunnerResult, self).__init__() self.test_results = OrderedDict() self.exporter_results = [] self.report = None @property def success(self): """Run was successful.""" return not self.report.failed and all( [ exporter_result.success for exporter_result in self.exporter_results ] )
CACHED_TASK_INFO_ATTRIBUTE = "_cached_task_info" def _cache_task_info(task_info: TaskInformation): task = task_info.target setattr(task, CACHED_TASK_INFO_ATTRIBUTE, task_info) return task
[docs]class TestRunner(Runnable): r""" Adds tests to test :py:class:`executor <testplan.runners.base.Executor>` resources and invoke report :py:class:`exporter <testplan.exporters.testing.base.Exporter>` objects to create the :py:class:`~testplan.runnable.TestRunnerResult`. :param name: Name of test runner. :type name: ``str`` :param description: Description of test runner. :type description: ``str`` :param logger_level: Logger level for stdout. :type logger_level: ``int`` :param: file_log_level: Logger level for file. :type file_log_level: ``int`` :param runpath: Input runpath. :type runpath: ``str`` or ``callable`` :param path_cleanup: Clean previous runpath entries. :type path_cleanup: ``bool`` :param all_tasks_local: Schedule all tasks in local pool :type all_tasks_local: ``bool`` :param shuffle: Shuffle strategy. :type shuffle: ``list`` of ``str`` :param shuffle_seed: Shuffle seed. :type shuffle_seed: ``float`` :param exporters: Exporters for reports creation. :type exporters: ``list`` :param stdout_style: Styling output options. :type stdout_style: :py:class:`Style <testplan.report.testing.styles.Style>` :param report_dir: Report directory. :type report_dir: ``str`` :param xml_dir: XML output directory. :type xml_dir: ``str`` :param pdf_path: PDF output path <PATH>/\*.pdf. :type pdf_path: ``str`` :param json_path: JSON output path <PATH>/\*.json. :type json_path: ``str`` :param pdf_style: PDF creation styling options. :type pdf_style: :py:class:`Style <testplan.report.testing.styles.Style>` :param http_url: Web url for posting test report. :type http_url: ``str`` :param report_tags: Matches tests marked with any of the given tags. :type report_tags: ``list`` :param report_tags_all: Match tests marked with all of the given tags. :type report_tags_all: ``list`` :param merge_scheduled_parts: Merge report of scheduled MultiTest parts. :type merge_scheduled_parts: ``bool`` :param browse: Open web browser to display the test report. :type browse: ``bool`` or ``NoneType`` :param ui_port: Port of web server for displaying test report. :type ui_port: ``int`` or ``NoneType`` :param web_server_startup_timeout: Timeout for starting web server. :type web_server_startup_timeout: ``int`` :param test_filter: Tests filtering class. :type test_filter: Subclass of :py:class:`BaseFilter <testplan.testing.filtering.BaseFilter>` :param test_sorter: Tests sorting class. :type test_sorter: Subclass of :py:class:`BaseSorter <testplan.testing.ordering.BaseSorter>` :param test_lister: Tests listing class. :type test_lister: Subclass of :py:class:`BaseLister <testplan.testing.listing.BaseLister>` :param verbose: Enable or disable verbose mode. :type verbose: ``bool`` :param debug: Enable or disable debug mode. :type debug: ``bool`` :param timeout: Timeout value for test execution. :type timeout: ``NoneType`` or ``int`` (greater than 0). :param abort_wait_timeout: Timeout for test runner abort. :type abort_wait_timeout: ``int`` :param interactive_handler: Handler for interactive mode execution. :type interactive_handler: Subclass of :py:class: `TestRunnerIHandler <testplan.runnable.interactive.TestRunnerIHandler>` :param extra_deps: Extra module dependencies for interactive reload, or paths of these modules. :type extra_deps: ``list`` of ``module`` or ``str`` :param label: Label the test report with the given name, useful to categorize or classify similar reports . :type label: ``str`` or ``NoneType`` :param runtime_data: Historical runtime data which will be used for Multitest auto-part and weight-based Task smart-scheduling :type runtime_data: ``dict`` :param auto_part_runtime_limit: The runtime limitation for auto-part task :type auto_part_runtime_limit: ``int`` or ``float`` :param plan_runtime_target: The testplan total runtime limitation for smart schedule :type plan_runtime_target: ``int`` or ``float`` Also inherits all :py:class:`~testplan.common.entity.base.Runnable` options. """ CONFIG = TestRunnerConfig STATUS = TestRunnerStatus RESULT = TestRunnerResult def __init__(self, **options): super(TestRunner, self).__init__(**options) # uid to resource, in definition order self._test_metadata = [] self._tests: MutableMapping[str, str] = OrderedDict() self.result.report = TestReport( name=self.cfg.name, description=self.cfg.description, uid=self.cfg.name, timeout=self.cfg.timeout, label=self.cfg.label, ) self._exporters = None self._web_server_thread = None self._file_log_handler = None self._configure_stdout_logger() # Before saving test report, recursively generate unique strings in # uuid4 format as report uid instead of original one. Skip this step # when executing unit/functional tests or running in interactive mode. self._reset_report_uid = not self._is_interactive_run() self.scheduled_modules = [] # For interactive reload self.remote_services = {} self.runid_filename = uuid.uuid4().hex self.define_runpath() self._runnable_uids = set() self._verified_targets = {} # target object id -> runnable uid self.resource_monitor_server: Optional[ResourceMonitorServer] = None self.resource_monitor_server_file_path: Optional[str] = None self.resource_monitor_client: Optional[ResourceMonitorClient] = None def __str__(self): return f"Testplan[{self.uid()}]" @property def report(self) -> TestReport: """Tests report.""" return self.result.report @property def exporters(self): """ Return a list of :py:class:`report exporters <testplan.exporters.testing.base.Exporter>`. """ if self._exporters is None: self._exporters = self.get_default_exporters() if self.cfg.exporters: self._exporters.extend(self.cfg.exporters) for exporter in self._exporters: if hasattr(exporter, "cfg"): exporter.cfg.parent = self.cfg exporter.parent = self return self._exporters
[docs] def get_test_metadata(self): return self._test_metadata
[docs] def disable_reset_report_uid(self): """Do not generate unique strings in uuid4 format as report uid""" self._reset_report_uid = False
[docs] def get_default_exporters(self): """ Instantiate certain exporters if related cmdline argument (e.g. --pdf) or programmatic arguments (e.g. pdf_path) is passed but there are not any exporter declarations. """ exporters = [] if self.cfg.pdf_path: exporters.append(test_exporters.PDFExporter()) if self.cfg.report_tags or self.cfg.report_tags_all: exporters.append(test_exporters.TagFilteredPDFExporter()) if self.cfg.json_path: exporters.append(test_exporters.JSONExporter()) if self.cfg.xml_dir: exporters.append(test_exporters.XMLExporter()) if self.cfg.http_url: exporters.append(test_exporters.HTTPExporter()) if self.cfg.ui_port is not None: exporters.append( test_exporters.WebServerExporter(ui_port=self.cfg.ui_port) ) if ( not self._is_interactive_run() and self.cfg.tracing_tests is not None ): exporters.append(test_exporters.CoveredTestsExporter()) return exporters
[docs] def add_environment( self, env: EnvironmentCreator, resource: Optional[Environments] = None ): """ Adds an environment to the target resource holder. :param env: Environment creator instance. :type env: Subclass of :py:class:`~testplan.environment.EnvironmentCreator` :param resource: Target environments holder resource. :type resource: Subclass of :py:class:`~testplan.environment.Environments` :return: Environment uid. :rtype: ``str`` """ resource = ( self.resources[resource] if resource else self.resources.environments ) target = env.create(parent=self) env_uid = env.uid() resource.add(target, env_uid) return env_uid
[docs] def add_resource( self, resource: Resource, uid: Optional[str] = None ) -> str: """ Adds a test :py:class:`executor <testplan.runners.base.Executor>` resource in the test runner environment. :param resource: Test executor to be added. :param uid: Optional input resource uid. We now force its equality with resource's own uid. :return: Resource uid assigned. """ resource.parent = self resource.cfg.parent = self.cfg if uid and uid != resource.uid(): raise ValueError( f"Unexpected uid value ``{uid}`` received, mismatched with " f"Resource uid ``{resource.uid()}``" ) return self.resources.add(resource, uid=uid)
[docs] def add_exporters(self, exporters: List[Exporter]): """ Add a list of :py:class:`report exporters <testplan.exporters.testing.base.Exporter>` for outputting test report. :param exporters: Test exporters to be added. :type exporters: ``list`` of :py:class:`~testplan.runners.base.Executor` """ self.cfg.exporters.extend(get_exporters(exporters))
[docs] def add_remote_service(self, remote_service: RemoteService): """ Adds a remote service :py:class:`~testplan.common.remote.remote_service.RemoteService` object to test runner. :param remote_service: RemoteService object :param remote_service: :py:class:`~testplan.common.remote.remote_service.RemoteService` """ name = remote_service.cfg.name if name in self.remote_services: raise ValueError(f"Remove Service [{name}] already exists") remote_service.parent = self remote_service.cfg.parent = self.cfg self.remote_services[name] = remote_service remote_service.start()
def _stop_remote_services(self): for name, rmt_svc in self.remote_services.items(): self.logger.info("Stopping Remote Server %s", name) rmt_svc.stop() def _clone_task_for_part(self, task_info, _task_arguments, part): _task_arguments["part"] = part self.logger.debug( "Task re-created with arguments: %s", _task_arguments, ) # unfortunately it is not easy to clone a Multitest with some parameters changed # ideally we need just the part changed, but Multitests could not share Drivers, # so it could not be recreated from its configuration as then more than one # Multitest would own the same drivers. So here we recreating it from the task target = Task(**_task_arguments) new_task = self._collect_task_info(target) return new_task def _get_tasks( self, _task_arguments, num_of_parts, runtime_data ) -> List[TaskInformation]: self.logger.debug( "Task created with arguments: %s", _task_arguments, ) task = Task(**_task_arguments) task_info = self._collect_task_info(task) uid = task_info.uid tasks: List[TaskInformation] = [] time_info = runtime_data.get(uid, None) if num_of_parts: if not isinstance(task_info.materialized_test, MultiTest): raise TypeError( "multitest_parts specified in @task_target," " but the Runnable is not a MultiTest" ) if num_of_parts == "auto": if not time_info: self.logger.warning( "%s parts is auto but cannot find it in runtime-data", uid, ) num_of_parts = 1 else: num_of_parts = math.ceil( time_info["execution_time"] / ( self.cfg.auto_part_runtime_limit - time_info["setup_time"] ) ) if num_of_parts < 1: raise RuntimeError( f"Calculated num_of_parts for {uid} is {num_of_parts}," " check the input runtime_data and auto_part_runtime_limit" ) if "weight" not in _task_arguments: _task_arguments["weight"] = ( math.ceil( (time_info["execution_time"] / num_of_parts) + time_info["setup_time"] ) if time_info else self.cfg.auto_part_runtime_limit ) self.logger.user_info( "%s: parts=%d, weight=%d", uid, num_of_parts, _task_arguments["weight"], ) if num_of_parts == 1: task_info.target.weight = _task_arguments["weight"] tasks.append(task_info) else: for i in range(num_of_parts): part = (i, num_of_parts) new_task = self._clone_task_for_part( task_info, _task_arguments, part ) tasks.append(new_task) else: if time_info and not task.weight: task_info.target.weight = math.ceil( time_info["execution_time"] + time_info["setup_time"] ) self.logger.user_info( "%s: weight=%d", uid, task_info.target.weight ) tasks.append(task_info) return tasks
[docs] def discover( self, path: str = ".", name_pattern: Union[str, Pattern] = r".*\.py$", ) -> List[Task]: """ Discover task targets under path in the modules that matches name pattern, and return the created Task object. :param path: the root path to start a recursive walk and discover, default is current directory. :param name_pattern: a regex pattern to match the file name. :return: A list of Task objects """ self.logger.user_info( "Discovering task target with file name pattern '%s' under '%s'", name_pattern, path, ) regex = re.compile(name_pattern) tasks: List[TaskInformation] = [] runtime_data: dict = self.cfg.runtime_data or {} for root, dirs, files in os.walk(path or "."): for filename in files: if not regex.match(filename): continue filepath = os.path.join(root, filename) module = filename.split(".")[0] with import_tmp_module(module, root) as mod: for attr in dir(mod): target = getattr(mod, attr) if not is_task_target(target): continue self.logger.debug( "Discovered task target %s::%s", filepath, attr ) task_target_info = get_task_target_information(target) task_arguments = dict( target=attr, module=module, path=root, **task_target_info.task_kwargs, ) multitest_parts = ( None if self._is_interactive_run() else task_target_info.multitest_parts ) if task_target_info.target_params: for param in task_target_info.target_params: if isinstance(param, dict): task_arguments["args"] = None task_arguments["kwargs"] = param elif isinstance(param, (tuple, list)): task_arguments["args"] = param task_arguments["kwargs"] = None else: raise TypeError( f"task_target's parameters can only" " contain dict/tuple/list, but" " received: {param}" ) task_arguments["part"] = None tasks.extend( self._get_tasks( task_arguments, multitest_parts, runtime_data, ) ) else: tasks.extend( self._get_tasks( task_arguments, multitest_parts, runtime_data, ) ) return [_cache_task_info(task_info) for task_info in tasks]
[docs] def calculate_pool_size(self) -> None: """ Calculate the right size of the pool based on the weight (runtime) of the tasks, so that runtime of all tasks meets the plan_runtime_target. """ for executor in self.resources: if isinstance(executor, Pool) and executor.is_auto_size: pool_size = self.calculate_pool_size_by_tasks( list(executor.added_items.values()) ) self.logger.user_info( f"Set pool size to {pool_size} for {executor.cfg.name}" ) executor.size = pool_size
[docs] def calculate_pool_size_by_tasks(self, tasks: Collection[Task]) -> int: """ Calculate the right size of the pool based on the weight (runtime) of the tasks, so that runtime of all tasks meets the plan_runtime_target. """ if len(tasks) == 0: return 1 plan_runtime_target = self.cfg.plan_runtime_target _tasks = sorted(tasks, key=lambda task: task.weight, reverse=True) if _tasks[0].weight > plan_runtime_target: for task in _tasks: if task.weight > plan_runtime_target: self.logger.warning( "%s weight %d is greater than plan_runtime_target %d", task, task.weight, self.cfg.plan_runtime_target, ) self.logger.warning( "Update plan_runtime_weight to %d", _tasks[0].weight ) plan_runtime_target = _tasks[0].weight containers = [0] for task in _tasks: if task.weight: if min(containers) + task.weight <= plan_runtime_target: containers[ containers.index(min(containers)) ] += task.weight else: containers.append(task.weight) else: containers.append(plan_runtime_target) return len(containers)
[docs] def schedule( self, task: Optional[Task] = None, resource: Optional[str] = None, **options, ) -> Optional[str]: """ Schedules a serializable :py:class:`~testplan.runners.pools.tasks.base.Task` in a task runner :py:class:`~testplan.runners.pools.base.Pool` executor resource. :param task: Input task, if it is None, a new Task will be constructed using the options parameter. :type task: :py:class:`~testplan.runners.pools.tasks.base.Task` :param resource: Name of the target executor, which is usually a Pool, default value None indicates using local executor. :type resource: ``str`` or ``NoneType`` :param options: Task input options. :type options: ``dict`` :return uid: Assigned uid for task. :rtype: ``str`` or ``NoneType`` """ return self.add(task or Task(**options), resource=resource)
[docs] def schedule_all( self, path: str = ".", name_pattern: Union[str, Pattern] = r".*\.py$", resource: Optional[str] = None, ): """ Discover task targets under path in the modules that matches name pattern, create task objects from them and schedule them to resource (usually pool) for execution. :param path: the root path to start a recursive walk and discover, default is current directory. :type path: ``str`` :param name_pattern: a regex pattern to match the file name. :type name_pattern: ``str`` :param resource: Name of the target executor, which is usually a Pool, default value None indicates using local executor. :type resource: ``str`` or ``NoneType`` """ tasks = self.discover(path=path, name_pattern=name_pattern) for task in tasks: self.add(task, resource=resource)
[docs] def add( self, target: Union[Test, Task, Callable], resource: Optional[str] = None, ) -> Optional[str]: """ Adds a :py:class:`runnable <testplan.common.entity.base.Runnable>` test entity, or a :py:class:`~testplan.runners.pools.tasks.base.Task`, or a callable that returns a test entity to a :py:class:`~testplan.runners.base.Executor` resource. :param target: Test target. :type target: :py:class:`~testplan.common.entity.base.Runnable` or :py:class:`~testplan.runners.pools.tasks.base.Task` or ``callable`` :param resource: Name of the target executor, which is usually a Pool, default value None indicates using local executor. :type resource: ``str`` or ``NoneType`` :return: Assigned uid for test. :rtype: ``str`` or ```NoneType`` """ # Get the real test entity and verify if it should be added task_info = self._collect_task_info(target) self._verify_task_info(task_info) uid = task_info.uid # let see if it is filtered if not self._should_task_running(task_info): return None # "--list" option always means not executing tests lister: Lister = self.cfg.test_lister if lister is not None and not lister.metadata_based: self.cfg.test_lister.log_test_info(task_info.materialized_test) return None if resource is None or self._is_interactive_run(): # use local runner for interactive resource = self.resources.first() # just enqueue the materialized test target = task_info.materialized_test else: target = task_info.target if self._is_interactive_run(): self._register_task_for_interactive(task_info) self._register_task( resource, target, uid, task_info.materialized_test.get_metadata() ) return uid
def _is_interactive_run(self): return self.cfg.interactive_port is not None def _register_task(self, resource, target, uid, metadata): self._tests[uid] = resource self._test_metadata.append(metadata) self.resources[resource].add(target, uid) def _collect_task_info(self, target: TestTask) -> TaskInformation: if isinstance(target, Test): target_test = target elif isinstance(target, Task): # First check if there is a cached task info # that is an optimization flow where task info # need to be created at discover, but the already defined api # need to pass Task, so we attach task_info to the task itself # and here we remove it if hasattr(target, CACHED_TASK_INFO_ATTRIBUTE): task_info = getattr(target, CACHED_TASK_INFO_ATTRIBUTE) setattr(target, CACHED_TASK_INFO_ATTRIBUTE, None) return task_info else: target_test = target.materialize() elif callable(target): target_test = target() else: raise TypeError( "Unrecognized test target of type {}".format(type(target)) ) # TODO: include executor in ancestor chain? if isinstance(target_test, Runnable): target_test.parent = self target_test.cfg.parent = self.cfg uid = target_test.uid() # Reset the task uid which will be used for test result transport in # a pool executor, it makes logging or debugging easier. # TODO: This mutating target should we do a copy? if isinstance(target, Task): target._uid = uid return TaskInformation(target, target_test, uid) def _register_task_for_interactive(self, task_info: TaskInformation): target = task_info.target if isinstance(target, Task) and isinstance(target._target, str): self.scheduled_modules.append( ( target._module or target._target.rsplit(".", 1)[0], os.path.abspath(target._path), ) ) def _verify_task_info(self, task_info: TaskInformation) -> None: uid = task_info.uid if uid in self._tests: raise ValueError( '{} with uid "{}" already added.'.format(self._tests[uid], uid) ) if uid in self._runnable_uids: raise RuntimeError( f"Runnable with uid {uid} has already been verified" ) else: # TODO: this should be part of the add self._runnable_uids.add(uid) def _should_task_running(self, task_info: TaskInformation) -> bool: should_run = True if type(self.cfg.test_filter) is not filtering.Filter: test = task_info.materialized_test should_run = test.should_run() self.logger.debug( "Should run %s? %s", test.name, "Yes" if should_run else "No", ) return should_run
[docs] def make_runpath_dirs(self): """ Creates runpath related directories. """ if self._runpath is None: raise RuntimeError( "{} runpath cannot be None".format(self.__class__.__name__) ) self.logger.user_info( "Testplan has runpath: %s and pid %s", self._runpath, os.getpid() ) self._scratch = os.path.join(self._runpath, "scratch") if self.cfg.path_cleanup is False: makedirs(self._runpath) makedirs(self._scratch) else: makeemptydirs(self._runpath) makeemptydirs(self._scratch) with open( os.path.join(self._runpath, self.runid_filename), "wb" ) as fp: pass if self.cfg.resource_monitor: self.resource_monitor_server_file_path = os.path.join( self.scratch, "resource_monitor" ) makedirs(self.resource_monitor_server_file_path)
def _start_resource_monitor(self): """Start resource monitor server and client""" if self.cfg.resource_monitor: self.resource_monitor_server = ResourceMonitorServer( self.resource_monitor_server_file_path, detailed=self.cfg.logger_level == logger.DEBUG, ) self.resource_monitor_server.start() self.resource_monitor_client = ResourceMonitorClient( self.resource_monitor_server.address, is_local=True ) self.resource_monitor_client.start() def _stop_resource_monitor(self): """Stop resource monitor server and client""" if self.resource_monitor_client: self.resource_monitor_client.stop() self.resource_monitor_client = None if self.resource_monitor_server: self.resource_monitor_server.stop() self.resource_monitor_server = None
[docs] def add_pre_resource_steps(self): """Runnable steps to be executed before resources started.""" self._add_step(self.timer.start, "run") super(TestRunner, self).add_pre_resource_steps() self._add_step(self.make_runpath_dirs) self._add_step(self._configure_file_logger) self._add_step(self.calculate_pool_size) self._add_step(self._start_resource_monitor)
[docs] def add_main_batch_steps(self): """Runnable steps to be executed while resources are running.""" self._add_step(self._wait_ongoing)
[docs] def add_post_resource_steps(self): """Runnable steps to be executed after resources stopped.""" self._add_step(self._stop_remote_services) self._add_step(self._create_result) self._add_step(self._log_test_status) self._add_step(self.timer.end, "run") # needs to happen before export self._add_step(self._pre_exporters) self._add_step(self._invoke_exporters) self._add_step(self._post_exporters) self._add_step(self._close_file_logger) super(TestRunner, self).add_post_resource_steps() self._add_step(self._stop_resource_monitor)
def _wait_ongoing(self): # TODO: if a pool fails to initialize we could reschedule the tasks. if self.resources.start_exceptions: for resource, exception in self.resources.start_exceptions.items(): self.logger.critical( "Aborting %s due to start exception", resource ) resource.abort() _start_ts = time.time() while self.active: if self.cfg.timeout and time.time() - _start_ts > self.cfg.timeout: msg = ( f"Timeout: Aborting execution after {self.cfg.timeout} seconds", ) self.result.report.logger.error(msg) self.logger.error(msg) # Abort resources e.g pools for dep in self.abort_dependencies(): self._abort_entity(dep) break pending_work = False for resource in self.resources: # Check if any resource has pending work. # Maybe print periodically the pending work of resource. pending_work = resource.pending_work() or pending_work # Poll the resource's health - if it has unexpectedly died # then abort the entire test to avoid hanging. if not resource.is_alive: self.result.report.status_override = Status.ERROR self.logger.critical( "Aborting %s - %s unexpectedly died", self, resource ) self.abort() if pending_work is False: break time.sleep(self.cfg.active_loop_sleep) def _create_result(self): """Fetch task result from executors and create a full test result.""" step_result = True test_results = self.result.test_results plan_report = self.result.report test_rep_lookup = {} for uid, resource in self._tests.items(): if not isinstance(self.resources[resource], Executor): continue resource_result = self.resources[resource].results.get(uid) # Tasks may not been executed (i.e. timeout), although the thread # will wait for a buffer period until the follow up work finishes. # But for insurance we assume that still some uids are missing. if not resource_result: continue elif isinstance(resource_result, TaskResult): if resource_result.result is None: test_results[uid] = result_for_failed_task(resource_result) else: test_results[uid] = resource_result.result else: test_results[uid] = resource_result run, report = test_results[uid].run, test_results[uid].report if report.part: if ( report.category != ReportCategories.TASK_RERUN and self.cfg.merge_scheduled_parts ): # Save the report temporarily and later will merge it test_rep_lookup.setdefault( report.definition_name, [] ).append((test_results[uid].run, report)) if report.definition_name not in plan_report.entry_uids: # Create a placeholder for merging sibling reports if isinstance(resource_result, TaskResult): # `runnable` must be an instance of MultiTest since # the corresponding report has `part` defined. Can # get a full structured report by `dry_run` and the # order of testsuites/testcases can be retained. runnable = resource_result.task.materialize() runnable.parent = self runnable.cfg.parent = self.cfg runnable.unset_part() report = runnable.dry_run().report else: report = report.__class__( report.definition_name, category=report.category, ) else: continue # Wait all sibling reports collected plan_report.append(report) step_result = step_result and run is True # boolean or exception step_result = self._merge_reports(test_rep_lookup) and step_result # Reset UIDs of the test report and all of its children in UUID4 format if self._reset_report_uid: plan_report.reset_uid() return step_result def _merge_reports( self, test_report_lookup: Dict[str, List[Tuple[bool, Any]]] ): """ Merge report of MultiTest parts into test runner report. Return True if all parts are found and can be successfully merged. Format of test_report_lookup: { 'report_uid_1': [ (True, report_1_part_1), (True, report_1_part_2), ... ], 'report_uid_2': [ (True, report_2_part_1), (False, report_2_part_2), ... ], ... } """ merge_result = True for uid, result in test_report_lookup.items(): placeholder_report = self.result.report.get_by_uid(uid) num_of_parts = 0 part_indexes = set() merged = False with placeholder_report.logged_exceptions(): for run, report in result: if num_of_parts and num_of_parts != report.part[1]: raise ValueError( "Cannot merge parts for child report with" " `uid`: {uid}, invalid parameter of part" " provided.".format(uid=uid) ) elif report.part[0] in part_indexes: raise ValueError( "Cannot merge parts for child report with" " `uid`: {uid}, duplicate MultiTest parts" " had been scheduled.".format(uid=uid) ) else: part_indexes.add(report.part[0]) num_of_parts = report.part[1] if run: if isinstance(run, Exception): raise run else: placeholder_report.merge(report, strict=False) else: raise MergeError( "Cannot merge parts for child report with" " `uid`: {uid}, at least one part (index:{part})" " didn't run.".format(uid=uid, part=report.part[0]) ) else: if len(part_indexes) < num_of_parts: raise MergeError( "Cannot merge parts for child report with" " `uid`: {uid}, not all MultiTest parts" " had been scheduled.".format(uid=uid) ) merged = True # If fail to merge sibling reports, clear the placeholder report # but keep error logs, sibling reports will be appended at the end. if not merged: placeholder_report.entries = [] placeholder_report._index = {} placeholder_report.status_override = Status.ERROR for _, report in result: report.name = ( common.TEST_PART_PATTERN_FORMAT_STRING.format( report.name, report.part[0], report.part[1] ) ) report.uid = strings.uuid4() # considered as error report self.result.report.append(report) merge_result = ( merge_result and placeholder_report.status != Status.ERROR ) return merge_result
[docs] def uid(self): """Entity uid.""" return self.cfg.name
def _log_test_status(self): if not self.result.report.entries: self.logger.warning( "No tests were run - check your filter patterns." ) else: self.logger.log_test_status( self.cfg.name, self.result.report.status ) def _pre_exporters(self): # Apply report filter if one exists if self.cfg.reporting_filter is not None: self.result.report = self.cfg.reporting_filter(self.result.report) # Attach resource monitor data if self.resource_monitor_server: self.report.resource_meta_path = ( self.resource_monitor_server.dump() ) def _invoke_exporters(self) -> None: if self.result.report.is_empty(): # skip empty report return if hasattr(self.result.report, "bubble_up_attachments"): self.result.report.bubble_up_attachments() export_context = ExportContext() for exporter in self.exporters: if isinstance(exporter, test_exporters.Exporter): run_exporter( exporter=exporter, source=self.result.report, export_context=export_context, ) else: raise NotImplementedError( "Exporter logic not implemented for: {}".format( type(exporter) ) ) self.result.exporter_results = export_context.results def _post_exporters(self): # View report in web browser if "--browse" specified report_urls = [] report_opened = False if self.result.report.is_empty(): # skip empty report self.logger.warning("Empty report, nothing to be exported!") return for result in self.result.exporter_results: report_url = getattr(result.exporter, "report_url", None) if report_url: report_urls.append(report_url) web_server_thread = getattr( result.exporter, "web_server_thread", None ) if web_server_thread: # Keep an eye on this thread from `WebServerExporter` # which will be stopped on Testplan abort self._web_server_thread = web_server_thread # Give priority to open report from local server if self.cfg.browse and not report_opened: webbrowser.open(report_url) report_opened = True # Stuck here waiting for web server to terminate web_server_thread.join() if self.cfg.browse and not report_opened: if len(report_urls) > 0: for report_url in report_urls: webbrowser.open(report_url) else: self.logger.warning( "No reports opened, could not find " "an exported result to browse" )
[docs] def discard_pending_tasks( self, exec_selector: SExpr, report_status: Status = Status.NONE, report_reason: str = "", ): for k, v in self.resources.items(): if isinstance(v, Executor) and apply_single(exec_selector, k): v.discard_pending_tasks(report_status, report_reason)
[docs] def abort_dependencies(self): """ Yield all dependencies to be aborted before self abort. """ if self._ihandler is not None: yield self._ihandler yield from super(TestRunner, self).abort_dependencies()
[docs] def aborting(self): """Stop the web server if it is running.""" if self._web_server_thread is not None: self._web_server_thread.stop() self._stop_resource_monitor() self._close_file_logger()
def _configure_stdout_logger(self): """Configure the stdout logger by setting the required level.""" logger.STDOUT_HANDLER.setLevel(self.cfg.logger_level) def _configure_file_logger(self): """ Configure the file logger to the specified log levels. A log file will be created under the runpath (so runpath must be created before this method is called). """ if self.runpath is None: raise RuntimeError( "Need to set up runpath before configuring logger" ) if self.cfg.file_log_level is None: self.logger.debug("Not enabling file logging") else: self._file_log_handler = logger.configure_file_logger( self.cfg.file_log_level, self.runpath ) def _close_file_logger(self): """ Closes the file logger, releasing all file handles. This is necessary to avoid permissions errors on Windows. """ if self._file_log_handler is not None: self._file_log_handler.flush() self._file_log_handler.close() logger.TESTPLAN_LOGGER.removeHandler(self._file_log_handler) self._file_log_handler = None
[docs] def run(self): """ Executes the defined steps and populates the result object. """ if self.cfg.test_lister: self.result.run = True return self.result return super(TestRunner, self).run()