Source code for testplan.testing.py_test

"""PyTest test runner."""
import collections
import inspect
import os
import re
import traceback
from typing import Dict, Generator

import pytest
from schema import Or

from testplan.common.config import ConfigOption
from testplan.common.utils import validation
from testplan.report import (
    ReportCategories,
    RuntimeStatus,
    Status,
    TestCaseReport,
    TestGroupReport,
)
from testplan.testing import base as testing
from testplan.testing.multitest.entries import assertions
from testplan.testing.multitest.entries import base as entries_base
from testplan.testing.multitest.entries.schemas.base import (
    registry as schema_registry,
)
from testplan.testing.multitest.entries.stdout.base import (
    registry as stdout_registry,
)
from testplan.testing.result import Result

# Regex for parsing suite and case name and case parameters
_CASE_REGEX = re.compile(
    r"^(?P<suite_name>.+)::"
    r"(?P<case_name>[^\[]+)(?:\[(?P<case_params>.+)\])?$",
    re.DOTALL,
)


[docs] class PyTestConfig(testing.TestConfig): """ Configuration object for :py:class:`~testplan.testing.py_test.PyTest` test runner. """
[docs] @classmethod def get_options(cls): return { "target": Or(str, [str]), ConfigOption("select", default=""): str, ConfigOption("extra_args", default=None): Or([str], None), ConfigOption("result", default=Result): validation.is_subclass( Result ), }
[docs] class PyTest(testing.Test): """ PyTest plugin for Testplan. Allows tests written for PyTest to be run from Testplan, with the test results logged and included in the Testplan report. :param name: Test instance name, often used as uid of test entity. :type name: ``str`` :param target: Target of PyTest configuration. :type target: ``str`` or ``list`` of ``str`` :param description: Description of test instance. :type description: ``str`` :param select: Selection of PyTest configuration. :type select: ``str`` :param extra_args: Extra arguments passed to pytest. :type extra_args: ``NoneType`` or ``list`` of ``str`` :param result: Result that contains assertion entries. :type result: :py:class:`~testplan.testing.multitest.result.Result` Also inherits all :py:class:`~testplan.testing.base.Test` options. """ CONFIG = PyTestConfig def __init__( self, name, target, description=None, select="", extra_args=None, result=Result, **options ): options.update(self.filter_locals(locals())) super(PyTest, self).__init__(**options) # Initialise a seperate plugin object to pass to PyTest. This avoids # namespace clashes with the PyTest object, since PyTest will scan for # methods that look like hooks in the plugin. quiet = not self._debug_logging_enabled self._pytest_plugin = _ReportPlugin(self, self.report, quiet) self._collect_plugin = _CollectPlugin(quiet) self._pytest_args = self._build_pytest_args() # Map from testsuite/testcase name to nodeid. Filled out after # tests are collected via dry_run(). self._nodeids = None
[docs] def add_main_batch_steps(self): """Specify the test steps: run the tests, then log the results.""" self._add_step(self.run_tests) self._add_step(self.log_test_results, top_down=False)
[docs] def setup(self): """Setup the PyTest plugin for the suite.""" self._pytest_plugin.setup()
[docs] def run_tests(self): """Run pytest and wait for it to terminate.""" # Execute pytest with self as a plugin for hook support with self.report.timer.record("run"): return_code = pytest.main( self._pytest_args, plugins=[self._pytest_plugin] ) if return_code == 5: self.result.report.status_override = Status.UNSTABLE self.logger.info("No tests were run") elif return_code != 0: self.result.report.status_override = Status.FAILED self.logger.info( "pytest exited with return code %d", return_code )
def _collect_tests(self): """Collect test items but do not run any.""" # We shall restore sys.path after calling pytest.main # as it might prepend test rootdir in sys.path # but this has other problem (helper package) return_code = pytest.main( self._pytest_args + ["--collect-only"], plugins=[self._collect_plugin], ) if return_code not in (0, 5): # rc 5: no tests were run raise RuntimeError( "Collection failure, exit code = {}".format(return_code) ) return self._collect_plugin.collected
[docs] def get_test_context(self): """ Inspect the test suites and cases by running PyTest with the --collect-only flag and passing in our collection plugin. :return: List containing pairs of suite name and testcase names. :rtype: List[Tuple[str, List[str]]] """ try: collected = self._collect_tests() except RuntimeError: self.result.report.status_override = Status.ERROR self.logger.exception("Failed to collect tests.") return [] # The plugin will handle converting PyTest tests into suites and # testcase names. suites = collections.defaultdict(set) for item in collected: suite_name, case_name, _ = _case_parse(item.nodeid) suites[suite_name].add(case_name) return [ (suite, list(testcases)) for suite, testcases in suites.items() ]
def _dry_run_testsuites(self): self._nodeids = { "testsuites": {}, "testcases": collections.defaultdict(dict), } for item in self._collect_tests(): _add_empty_testcase_report(item, self.result.report, self._nodeids)
[docs] def run_testcases_iter( self, testsuite_pattern: str = "*", testcase_pattern: str = "*", shallow_report: Dict = None, ) -> Generator: """ Run all testcases and yield testcase reports. :param testsuite_pattern: pattern to match for testsuite names :param testcase_pattern: pattern to match for testcase names :param shallow_report: shallow report entry :return: generator yielding testcase reports and UIDs for merge step """ if not self._nodeids: # Need to collect the tests so we know the nodeids for each # testsuite/case. self.dry_run() test_report = self._new_test_report() quiet = not self._debug_logging_enabled pytest_plugin = _ReportPlugin(self, test_report, quiet) pytest_plugin.setup() pytest_args, current_uids = self._build_iter_pytest_args( testsuite_pattern, testcase_pattern ) # Will call `pytest.main` to run all testcases as a whole, accordingly, # runtime status of all these testcases will be set at the same time. yield {"runtime_status": RuntimeStatus.RUNNING}, current_uids self.logger.info("Running PyTest with args: %r", pytest_args) return_code = pytest.main(pytest_args, plugins=[pytest_plugin]) self.logger.info("PyTest exit code: %d", return_code) for suite_report in test_report: for child_report in suite_report: if isinstance(child_report, TestCaseReport): yield ( child_report, [test_report.uid, suite_report.uid], ) elif isinstance(child_report, TestGroupReport): if ( child_report.category != ReportCategories.PARAMETRIZATION ): raise RuntimeError( "Unexpected report category: {}".format( child_report.category ) ) for testcase_report in child_report: yield ( testcase_report, [ test_report.uid, suite_report.uid, child_report.uid, ], ) else: raise TypeError( "Unexpected report type: {}".format(type(child_report)) )
def _build_iter_pytest_args(self, testsuite_pattern, testcase_pattern): """ Build the PyTest args for running a particular set of testsuites and testcases as specified. """ if self._nodeids is None: raise RuntimeError("Need to call dry_run() first") if testsuite_pattern == "*" and testcase_pattern == "*": if isinstance(self.cfg.target, str): pytest_args = [self.cfg.target] else: pytest_args = self.cfg.target[:] current_uids = [self.uid()] elif testcase_pattern == "*": pytest_args = [self._nodeids["testsuites"][testsuite_pattern]] current_uids = [self.uid(), testsuite_pattern] else: pytest_args = [ self._nodeids["testcases"][testsuite_pattern][testcase_pattern] ] suite_name, case_name, case_params = _case_parse(pytest_args[0]) if case_params: current_uids = [ self.uid(), suite_name, case_name, "{}[{}]".format(case_name, case_params), ] else: current_uids = [self.uid(), suite_name, case_name] if self.cfg.extra_args: pytest_args.extend(self.cfg.extra_args) return pytest_args, current_uids def _build_pytest_args(self): """ :return: a list of the args to be passed to PyTest :rtype: List[str] """ if isinstance(self.cfg.target, str): pytest_args = [self.cfg.target] else: pytest_args = self.cfg.target[:] if self.cfg.select: pytest_args.extend(["-k", self.cfg.select]) if self.cfg.extra_args: pytest_args.extend(self.cfg.extra_args) return pytest_args
class _ReportPlugin: """ Plugin object passed to PyTest. Contains hooks used to update the Testplan report with the status of testcases. """ def __init__(self, parent, report, quiet): self._parent = parent self._report = report self._quiet = quiet # Collection of suite reports - will be initialised by the setup() # method. self._suite_reports = None # The current working testcase report. It needs to be stored on this # object since it is set and read by different callback hooks. self._current_case_report = None # Result object which supports various assertions like in MultiTest. # Its entries will later be added to current testcase report. self._current_result_obj = None @pytest.fixture def result(self): """ Return the result object for the current test case. :return: the result object for the current test case :rtype: ``Result`` """ return self._current_result_obj @pytest.fixture def env(self): """ Return the testing environment. :return: the testing environment :rtype: ``Environment`` """ return self._parent.resources def setup(self): """Set up environment as required.""" self._suite_reports = collections.defaultdict(collections.OrderedDict) def case_report(self, suite_name, case_name, case_params): """ Return the case report for the specified suite and case name, creating it first if necessary. :param suite_name: the suite name to get the report for :type suite_name: ``str`` :param case_name: the case name to get the report for :type case_name: ``str`` :param case_params: the case parameters to get the report for :type case_params: ``str`` or ``NoneType`` :return: the case report :rtype: :py:class:`testplan.report.testing.TestCaseReport` """ if case_params is None: report = self._suite_reports[suite_name].get(case_name) if report is None: report = TestCaseReport(case_name, uid=case_name) self._suite_reports[suite_name][case_name] = report return report else: group_report = self._suite_reports[suite_name].get(case_name) if group_report is None: # create group report for parametrized testcases group_report = TestGroupReport( name=case_name, uid=case_name, category=ReportCategories.PARAMETRIZATION, ) self._suite_reports[suite_name][case_name] = group_report case_name = "{}[{}]".format(case_name, case_params) try: report = group_report.get_by_uid(case_name) except: # create report of parametrized testcase report = TestCaseReport(name=case_name, uid=case_name) group_report.append(report) return report def pytest_runtest_setup(self, item): """ Hook called by pytest to set up a test. :param item: the test item to set up (see pytest documentation) """ # Extract suite name, case name and parameters suite_name, case_name, case_params = _case_parse(item.nodeid) report = self.case_report(suite_name, case_name, case_params) try: func_doc = item.function.__doc__ except AttributeError: func_doc = None if func_doc is not None: report.description = os.linesep.join( " {}".format(line) for line in inspect.getdoc(item.function).split(os.linesep) ) self._current_case_report = report self._current_result_obj = self._parent.cfg.result( stdout_style=self._parent.stdout_style, _scratch=self._parent.scratch, ) def pytest_runtest_teardown(self, item): """ Hook called by pytest to tear down a test. :param item: the test item to tear down (see pytest documentation) """ self._current_case_report = None self._current_result_obj = None def pytest_runtest_logreport(self, report): """ Hook called by pytest to report on the result of a test. :param report: the test report for the item just tested (see pytest documentation) """ if report.when == "setup": if report.skipped: if self._current_case_report is None: suite_name, case_name, case_params = _case_parse( report.nodeid ) testcase_report = self.case_report( suite_name, case_name, case_params ) else: testcase_report = self._current_case_report # Status set to be SKIPPED if testcase is marked skip or xfail # lower versioned PyTest does not support this feature testcase_report.status_override = Status.SKIPPED elif report.when == "call": if self._current_case_report is None: raise RuntimeError( "Cannot store testcase results to report: no report " "object was created." ) if self._current_result_obj.entries: # Add the assertion entry to the case report for entry in self._current_result_obj.entries: stdout_renderer = stdout_registry[entry]() stdout_header = stdout_renderer.get_header(entry) stdout_details = stdout_renderer.get_details(entry) or "" # Add 'stdout_header' and 'stdout_details' attributes to # serialized entries for standard output later serialized_entry = schema_registry.serialize(entry) serialized_entry.update( stdout_header=stdout_header, stdout_details=stdout_details, ) self._current_case_report.append(serialized_entry) self._current_case_report.attachments.extend( self._current_result_obj.attachments ) if report.failed: self._current_case_report.status_override = Status.FAILED # XXX: report.skipped set to True when xfail, how to distinguish? # elif report.skipped: # self._current_case_report.status_override = Status.SKIPPED else: self._current_case_report.pass_if_empty() self._current_case_report.runtime_status = RuntimeStatus.FINISHED elif report.when == "teardown": pass def pytest_exception_interact(self, node, call, report): """ Hook called when an exception raised and it can be handled. This hook is only called if the exception is not an PyTest internal exception. :param node: PyTest Function or Module object :param call: PyTest CallInfo object :param report: PyTest TestReport or CollectReport object """ if call.when in ("memocollect", "collect"): # Failed to collect tests: log to console and mark the report as # ERROR. self._report.logger.error( "".join( traceback.format_exception( call.excinfo.type, call.excinfo.value, call.excinfo.tb ) ) ) self._report.status_override = Status.ERROR elif self._current_case_report is not None: # Log assertion errors or exceptions in testcase report trace = call.excinfo.traceback[-1] message = ( getattr(call.excinfo.value, "message", None) or getattr(call.excinfo.value, "msg", None) or getattr(call.excinfo.value, "args", None) or "" ) if isinstance(message, (tuple, list)): message = message[0] header = ( ( "Assertion - Fail" if call.excinfo.typename == "AssertionError" else "Exception raised" ) if call.when == "call" else "{} - Fail".format(call.when) ) details = ( "File: {}\nLine: {}\n{}: {}".format( str(trace.path), trace.lineno + 1, call.excinfo.typename, message, ) if call.excinfo.typename == "AssertionError" else ( report.longreprtext if hasattr(report, "longreprtext") else str(report.longrepr) ) ) assertion_obj = assertions.RawAssertion( description=header, content=details, passed=False ) serialized_obj = schema_registry.serialize(assertion_obj) self._current_case_report.append(serialized_obj) self._current_case_report.status_override = Status.FAILED for capture, description in ( ("caplog", "Captured Log"), ("capstdout", "Captured Stdout"), ("capstderr", "Captured Stderr"), ): message = getattr(report, capture) if message: assertion_obj = entries_base.Log( message, description=description ) serialized_obj = schema_registry.serialize(assertion_obj) self._current_case_report.append(serialized_obj) else: self._report.logger.error( "Exception occured outside of a testcase: during %s", call.when ) self._report.logger.error( "".join( traceback.format_exception( call.excinfo.type, call.excinfo.value, call.excinfo.tb ) ) ) @pytest.hookimpl(trylast=True) def pytest_configure(self, config): """ Hook called by pytest upon startup. Disable output to terminal. :param config: pytest config object """ if self._quiet: config.pluginmanager.unregister(name="terminalreporter") def pytest_unconfigure(self, config): """ Hook called by pytest before exiting. Collate suite reports. :param config: pytest config object """ # Collate suite reports for suite_name, cases in self._suite_reports.items(): suite_report = TestGroupReport( name=suite_name, uid=suite_name, category=ReportCategories.TESTSUITE, ) for case in cases.values(): suite_report.append(case) self._report.append(suite_report) class _CollectPlugin: """ PyTest plugin used when collecting tests. Provides access to the collected test suites and testcases via the `collected` property. """ def __init__(self, quiet): self._quiet = quiet self.collected = None @pytest.hookimpl(trylast=True) def pytest_configure(self, config): """ Hook called by pytest upon startup. Disable output to terminal. :param config: pytest config object """ if self._quiet: config.pluginmanager.unregister(name="terminalreporter") def pytest_collection_finish(self, session): """ PyTest hook, called after collection is finished. """ self.collected = session.items def _case_parse(nodeid): """ Parse a nodeid into a shorterned URL-safe suite name, case name, and case parameters. :param nodeid: the test nodeid :type nodeid: ``str`` :raises ValueError: if nodeid is invalid :return: a tuple consisting of (suite name, case name, case parameters) :rtype: ``tuple`` """ suite_name, case_name, case_params = _split_nodeid(nodeid) return (_short_suite_name(suite_name), case_name, case_params) def _split_nodeid(nodeid): """ Split a nodeid into its full suite name, case name, and case parameters. :param nodeid: the test nodeid :type nodeid: ``str`` :raises ValueError: if nodeid is invalid :return: a tuple consisting of (suite name, case name, case parameters) :rtype: ``tuple`` """ match = _CASE_REGEX.match(nodeid.replace("::()::", "::")) if match is None: raise ValueError("Invalid nodeid") suite_name, case_name, case_params = match.groups() return suite_name, case_name, case_params def _short_suite_name(suite_name): """ Remove any path elements or .py extensions from the suite name. E.g. "tests/my_test.py" -> "my_test" Note that even on Windows, PyTest stores path elements separated by "/" which is why we don't split on os.sep here. """ return os.path.basename(suite_name) def _add_empty_testcase_report(item, test_report, nodeids): """Add an empty testcase report to the test report.""" full_suite_name, case_name, case_params = _split_nodeid(item.nodeid) suite_name = _short_suite_name(full_suite_name) try: suite_report = test_report[suite_name] except KeyError: suite_report = TestGroupReport( name=suite_name, uid=suite_name, category=ReportCategories.TESTSUITE, ) test_report.append(suite_report) nodeids["testsuites"][suite_name] = full_suite_name if case_params: try: param_report = suite_report[case_name] except KeyError: param_report = TestGroupReport( name=case_name, uid=case_name, category=ReportCategories.PARAMETRIZATION, ) suite_report.append(param_report) nodeids["testcases"][suite_name][case_name] = "::".join( (full_suite_name, case_name) ) param_case_name = "{}[{}]".format(case_name, case_params) param_report.append( TestCaseReport(name=param_case_name, uid=param_case_name) ) nodeids["testcases"][suite_name][param_case_name] = item.nodeid else: suite_report.append(TestCaseReport(name=case_name, uid=case_name)) nodeids["testcases"][suite_name][case_name] = item.nodeid