Source code for testplan.report.testing.base

"""
Report classes that will store the test results.

Assuming we have a Testplan setup like this:
.. code-block:: python
  Testplan MyPlan
    Multitest A
      Suite A-1
        TestCase test_method_a_1_x
        TestCase test_method_a_1_y
        TestCase (parametrized, with 3 scenarios) test_method_a_1_z
      Suite A-2
        Testcase test_method_a_2_x
    Multitest B
      Suite B-1
        Testcase test_method_b_1_x
    GTest C
We will have a report tree like:
.. code-block:: python
  TestReport(name='MyPlan')
    TestGroupReport(name='A', category='Multitest')
      TestGroupReport(name='A-1', category='TestSuite')
        TestCaseReport(name='test_method_a_1_x')
        TestCaseReport(name='test_method_a_1_y')
        TestGroupReport(name='test_method_a_1_z', category='parametrization')
          TestCaseReport(name='test_method_a_1_z_1')
          TestCaseReport(name='test_method_a_1_z_2')
          TestCaseReport(name='test_method_a_1_z_3')
      TestGroupReport(name='A-2', category='TestSuite')
        TestCaseReport(name='test_method_a_2_x')
    TestGroupReport(name='B', category='MultiTest')
      TestGroupReport(name='B-1', category='TestSuite')
        TestCaseReport(name='test_method_b_1_x')
    TestGroupReport(name='C', category='GTest')
      TestCaseReport(name='<first test of Gtest>') -> can only be retrieved
                                                      after GTest is run
      TestCaseReport(name='<second test of Gtest>') -> can only be retrieved
                                                       after GTest is run
    ...
"""
import copy
import getpass
import hashlib
import itertools
import os
import platform
import sys
from collections import Counter
from typing import Dict, Optional
from typing_extensions import Self

from testplan.common.report import (
    Status,
    RuntimeStatus,
    ReportCategories,
    ExceptionLogger,
    Report,
    BaseReportGroup,
)
from testplan.testing import tagging


[docs]class TestReport(BaseReportGroup): """ Report for a Testplan test run, sits at the root of the report tree. Only contains TestGroupReports as children. """ def __init__( self, name, meta=None, attachments=None, information=None, timeout=None, label=None, **kwargs, ): self._tags_index = None self.meta = meta or {} self.label = label self.information = information or [] self.resource_meta_path: Optional[str] = None try: user = getpass.getuser() except (ImportError, OSError): # if the USERNAME env variable is unset on Windows, this fails # with ImportError user = "unknown" self.information.extend( [ ("user", user), ("command_line_string", " ".join(sys.argv)), ("python_version", platform.python_version()), ] ) if self.label: self.information.append(("label", label)) # Report attachments: Dict[dst: str, src: str]. # Maps from destination path (relative from attachments root dir) # to the full source path (absolute or relative from cwd). self.attachments = attachments or {} self.timeout = timeout self.category = ReportCategories.TESTPLAN super(TestReport, self).__init__(name=name, **kwargs) @property def tags_index(self): """ Root report only has tag indexes, which is only useful when we run searches against multiple test reports. (e.g Give me all test runs from all projects that have these tags) """ from testplan.testing.tagging import merge_tag_dicts if self._tags_index is None: self._tags_index = merge_tag_dicts( *[child.tags_index for child in self] ) return self._tags_index
[docs] def propagate_tag_indices(self): """ TestReport does not have native tag data, so it just triggers children's tag updates. """ for child in self: child.propagate_tag_indices() # reset tags index, so it gets repopulated on the next call self._tags_index = None
[docs] def bubble_up_attachments(self): """ Attachments are saved at various levels of the report: * Fix spec file attached to multitests. * When implemented result.attach will attach files to assertions. This iterates through the report entries and bubbles up all the attachments to the top level. This top level dictionary of attachments will be used by Exporters to export attachments as well as the report. """ for child in self: if getattr(child, "fix_spec_path", None): self._bubble_up_fix_spec(child) for attachment in child.attachments: self.attachments[attachment.dst_path] = attachment.source_path
def _bubble_up_fix_spec(self, child): """Bubble up a "fix_spec_path" from a child report.""" real_path = child.fix_spec_path hash_dir = hashlib.md5(real_path.encode("utf-8")).hexdigest() hash_path = os.path.join( hash_dir, os.path.basename(child.fix_spec_path) ) child.fix_spec_path = hash_path self.attachments[hash_path] = real_path def _get_comparison_attrs(self): return super(TestReport, self)._get_comparison_attrs() + [ "tags_index", "meta", ]
[docs] def serialize(self): """ Shortcut for serializing test report data to nested python dictionaries. """ from .schemas import TestReportSchema return TestReportSchema().dump(self)
[docs] @classmethod def deserialize(cls, data): """ Shortcut for instantiating ``TestReport`` object (and its children) from nested python dictionaries. """ from .schemas import TestReportSchema return TestReportSchema().load(data)
[docs] def shallow_serialize(self): """Shortcut for shallow-serializing test report data.""" from .schemas import ShallowTestReportSchema return ShallowTestReportSchema().dump(self)
[docs] @classmethod def shallow_deserialize(cls, data, old_report): """ Shortcut for deserializing a ``TestReport`` object from its shallow serialized representation. """ from .schemas import ShallowTestReportSchema deserialized = ShallowTestReportSchema().load(data) deserialized.entries = old_report.entries deserialized._index = old_report._index return deserialized
[docs] def filter(self, *functions, **kwargs) -> Self: """ Tag indices are updated after filter operations. """ result = super(TestReport, self).filter(*functions, **kwargs) # We'd like to call tag propagation before returning the root node, # so we rely on absence of implicit `__copy` arg to decide if we should # trigger tag index propagation or not. If we don't do this check # then tag propagation will be called for each filter call on # sub-nodes which is going to be a redundant operation. if kwargs.get("__copy", True): result.propagate_tag_indices() return result
[docs] def inherit(self, deceased: Self) -> Self: self.timer = deceased.timer self.status_override = deceased.status_override self.status_reason = deceased.status_reason self.attachments = deceased.attachments self.logs = deceased.logs for uid in set(self.entry_uids) & set(deceased.entry_uids): self_ = self[uid] deceased_ = deceased[uid] if isinstance(self_, TestGroupReport) and isinstance( deceased_, TestGroupReport ): self_.inherit(deceased_) return self
[docs]class TestGroupReport(BaseReportGroup): """ A middle-level container report, can contain both TestGroupReports and TestCaseReports. """ def __init__( self, name, category=ReportCategories.TESTGROUP, tags=None, part=None, fix_spec_path=None, env_status=None, strict_order=False, **kwargs, ): super(TestGroupReport, self).__init__(name=name, **kwargs) # This will be used for distinguishing test type (Multitest, GTest # etc). Expected to be one of the ReportCategories enum, otherwise # the report node will not be correctly rendered in the UI. self.category = category self.tags = tagging.validate_tag_value(tags) if tags else {} self.tags_index = copy.deepcopy(self.tags) # A test can be split into many parts and the report of each part # can be hold back for merging (if necessary) self.part = part # i.e. (m, n), while 0 <= m < n and n > 1 self.part_report_lookup = {} self.fix_spec_path = fix_spec_path if self.entries: self.propagate_tag_indices() # Expected to be one of ResourceStatus, or None. self.env_status = env_status # Can be True For group report in category "testsuite" self.strict_order = strict_order self.covered_lines: Optional[dict] = None def __str__(self): return ( f'{self.__class__.__name__}(name="{self.name}", category="{self.category}",' f' id="{self.uid}"), tags={self.tags or None})' ) def __repr__(self): return ( f'{self.__class__.__name__}(name="{self.name}", category="{self.category}",' f' id="{self.uid}", entries={repr(self.entries)}, tags={self.tags or None})' ) def _get_comparison_attrs(self): return super(TestGroupReport, self)._get_comparison_attrs() + [ "category", "tags", "tags_index", ]
[docs] def append(self, item): """Update tag indices if item or self has tag data.""" super(TestGroupReport, self).append(item) if self.tags_index or item.tags_index: self.propagate_tag_indices()
[docs] def serialize(self): """ Shortcut for serializing TestGroupReport data to nested python dictionaries. """ from .schemas import TestGroupReportSchema return TestGroupReportSchema().dump(self)
[docs] @classmethod def deserialize(cls, data): """ Shortcut for instantiating ``TestGroupReport`` object (and its children) from nested python dictionaries. """ from .schemas import TestGroupReportSchema return TestGroupReportSchema().load(data)
[docs] def shallow_serialize(self): """Shortcut for shallow-serializing test report data.""" from .schemas import ShallowTestGroupReportSchema return ShallowTestGroupReportSchema().dump(self)
[docs] @classmethod def shallow_deserialize(cls, data, old_report): """ Shortcut for deserializing a ``TestGroupReport`` object from its shallow serialized representation. """ from .schemas import ShallowTestGroupReportSchema deserialized = ShallowTestGroupReportSchema().load(data) deserialized.entries = old_report.entries deserialized._index = old_report._index return deserialized
def _collect_tag_indices(self): """ Recursively collect tag indices from children (and their children etc) """ tag_dicts = [self.tags] for child in self: if isinstance(child, TestGroupReport): tag_dicts.append(child._collect_tag_indices()) elif isinstance(child, TestCaseReport): tag_dicts.append(child.tags) return tagging.merge_tag_dicts(*tag_dicts)
[docs] def propagate_tag_indices(self, parent_tags=None): """ Distribute native tag data onto `tags_index` attributes on the nodes of the test report. This distribution happens 2 ways. """ tags_index = tagging.merge_tag_dicts(self.tags, parent_tags or {}) for child in self: if isinstance(child, TestGroupReport): child.propagate_tag_indices(parent_tags=tags_index) elif isinstance(child, TestCaseReport): child.tags_index = tagging.merge_tag_dicts( child.tags, tags_index ) self.tags_index = tagging.merge_tag_dicts( tags_index, self._collect_tag_indices() )
[docs] def merge(self, report, strict=True): """Propagate tag indices after merge operations.""" super(TestGroupReport, self).merge(report, strict=strict) self.propagate_tag_indices()
@property def attachments(self): """Return all attachments from child reports.""" return itertools.chain.from_iterable( child.attachments for child in self ) @property def hash(self): """ Generate a hash of this report object, including its entries. This hash is used to detect when changes are made under particular nodes in the report tree. Since all report entries are mutable, this hash should NOT be used to index the report entry in a set or dict - we have avoided using the magic __hash__ method for this reason. Always use the UID for indexing purposes. :return: a hash of all entries in this report group. :rtype: ``int`` """ return hash( ( self.uid, self.status, self.runtime_status, self.env_status, tuple(entry.hash for entry in self.entries), tuple(entry["uid"] for entry in self.logs), ) )
[docs] def filter(self, *functions, **kwargs) -> Self: """ Tag indices are updated after filter operations. """ result = super(TestGroupReport, self).filter(*functions, **kwargs) # We'd like to call tag propagation before returning the root node, # so we rely on absence of implicit `__copy` arg to decide if we should # trigger tag index propagation or not. If we don't do this check # then tag propagation will be called for each filter call on # sub-nodes which is going to be a redundant operation. if kwargs.get("__copy", True): result.propagate_tag_indices() return result
[docs] def inherit(self, deceased: Self) -> Self: self.timer = deceased.timer self.status_override = deceased.status_override self.status_reason = deceased.status_reason self.env_status = deceased.env_status self.logs = deceased.logs for uid in set(self.entry_uids) & set(deceased.entry_uids): self_ = self[uid] deceased_ = deceased[uid] if isinstance(self_, TestGroupReport) and isinstance( deceased_, TestGroupReport ): self_.inherit(deceased_) elif isinstance(self_, TestCaseReport) and isinstance( deceased_, TestCaseReport ): self_.inherit(deceased_) return self
[docs]class TestCaseReport(Report): """ Leaf of the report tree, contains serialized assertion / log entries. """ exception_logger = ExceptionLogger def __init__( self, name, tags=None, category=ReportCategories.TESTCASE, **kwargs, ): super(TestCaseReport, self).__init__(name=name, **kwargs) self.tags = tagging.validate_tag_value(tags) if tags else {} self.tags_index = copy.deepcopy(self.tags) self.attachments = [] self.category = category self.covered_lines: Optional[dict] = None def _get_comparison_attrs(self): return super(TestCaseReport, self)._get_comparison_attrs() + [ "status_override", "timer", "tags", "tags_index", ] @property def passed(self) -> bool: """Shortcut for getting if report status should be considered passed.""" return self.status.normalised() == Status.PASSED @property def failed(self) -> bool: """ Shortcut for checking if report status should be considered failed. """ return self.status <= Status.FAILED @property def unstable(self) -> bool: """ Shortcut for checking if report status should be considered unstable. """ return self.status.normalised() == Status.UNSTABLE @property def unknown(self) -> bool: """ Shortcut for checking if report status is unknown. """ return self.status.normalised() == Status.UNKNOWN @property def status(self) -> Status: """ Entries in this context correspond to serialized (raw) assertions / custom logs in dictionary form. Assertion dicts will have a `passed` key which will be set to `False` for failed assertions. """ if self.status_override: return self.status_override if self.entries: return self._assertions_status() return self._status @status.setter def status(self, new_status): self._status = new_status @property def runtime_status(self): """ Used for interactive mode, the runtime status of a testcase may be one of ``RuntimeStatus``. """ return self._runtime_status @runtime_status.setter def runtime_status(self, new_status): """ Set the runtime status. As a special case, when a testcase is re-run we clear out the assertion entries from any previous run. """ self._runtime_status = new_status if self.entries and new_status in ( RuntimeStatus.RUNNING, RuntimeStatus.RESETTING, ): self.entries = [] self._status = Status.UNKNOWN if new_status == RuntimeStatus.FINISHED: self._status = Status.PASSED # passed if case report has no entry # NOTE: this is only for compatibility with the API for filtering.
[docs] def set_runtime_status_filtered( self, new_status: str, entries: Dict, ) -> None: """ Alternative setter for the runtime status of an entry, here it is equivalent to simply setting the runtime status. :param new_status: new runtime status to be set :param entries: tree-like structure of entries names, unused, but needed for current API compatibility """ self.runtime_status = new_status
def _assertions_status(self): # entries already serialized here for entry in self: if entry.get("passed") is False: return Status.FAILED return Status.PASSED
[docs] def merge(self, report, strict=True): """ TestCaseReport merge overwrites everything in place, as assertions of a test case won't be split among different runners. For some special test cases, choose the one whose status is of higher precedence. """ self._check_report(report) if ( self.category == ReportCategories.SYNTHESIZED and self.status.precede(report.status) ): return self.status_override = Status.precedent( [self.status_override, report.status_override] ) self.runtime_status = report.runtime_status self.logs = report.logs self.entries = report.entries self.timer = report.timer self.status_reason = report.status_reason
[docs] def flattened_entries(self, depth): """Need to take assertion groups into account.""" def flatten_dicts(dicts, _depth): """Recursively flatten serialized entry list.""" result = [] for d in dicts: result.append((_depth, d)) if d["type"] == "Group" or d["type"] == "Summary": result.extend(flatten_dicts(d["entries"], _depth + 1)) return result return flatten_dicts(self.entries, depth)
[docs] def serialize(self): """ Shortcut for serializing test report data to nested python dictionaries. """ from .schemas import TestCaseReportSchema return TestCaseReportSchema().dump(self)
[docs] @classmethod def deserialize(cls, data): """ Shortcut for instantiating ``TestCaseReport`` object from nested python dictionaries. """ from .schemas import TestCaseReportSchema return TestCaseReportSchema().load(data)
@property def hash(self): """ Generate a hash of this report object, including its entries. This hash is used to detect when changes are made under particular nodes in the report tree. Since all report entries are mutable, this hash should NOT be used to index the report entry in a set or dict - we have avoided using the magic __hash__ method for this reason. Always use the UID for indexing purposes. :return: a hash of all entries in this report group. :rtype: ``int`` """ return hash( ( self.uid, self.status, self.runtime_status, tuple(id(entry) for entry in self.entries), tuple(entry["uid"] for entry in self.logs), ) )
[docs] def xfail(self, strict): """ Override report status for test that is marked xfail by user :param strict: whether consider XPASS as failure """ if self.failed: self.status_override = Status.XFAIL elif self.passed: if strict: self.status_override = Status.XPASS_STRICT else: self.status_override = Status.XPASS
@property def counter(self): """ Return counts for current status. """ counter = Counter( { Status.PASSED.to_json_compatible(): 0, Status.FAILED.to_json_compatible(): 0, "total": 0, } ) counter.update({self.status.to_json_compatible(): 1, "total": 1}) return counter
[docs] def pass_if_empty(self): """Mark as PASSED if this testcase contains no entries.""" if not self.entries: self._status = Status.PASSED
[docs] def inherit(self, deceased: Self) -> Self: self.timer = deceased.timer self.runtime_status = deceased.runtime_status self.status = deceased.status self.status_override = deceased.status_override self.status_reason = deceased.status_reason self.attachments = deceased.attachments self.logs = deceased.logs self.entries = deceased.entries return self