Source code for testplan.testing.multitest.entries.base

"""
Base classes go here.
"""
import os
import re
import datetime
import operator
import pprint
import shutil
import hashlib
import pathlib
import plotly.io

from testplan.common.utils.convert import nested_groups
from testplan.common.utils.timing import utcnow
from testplan.common.utils.table import TableEntry
from testplan.common.utils.reporting import fmt
from testplan.common.utils.convert import flatten_formatted_object
from testplan.common.utils.path import hash_file, traverse_dir, makedirs
from testplan import defaults


__all__ = ["BaseEntry", "Group", "Summary", "Log"]


# Will be used for default conversion like: NotEqual -> Not Equal
ENTRY_NAME_PATTERN = re.compile(r"([A-Z])")

DEFAULT_CATEGORY = "DEFAULT"
DEFAULT_FLAG = "DEFAULT"


def readable_name(class_name):
    """NotEqual -> Not Equal"""
    return ENTRY_NAME_PATTERN.sub(" \\1", class_name).strip()


[docs]class BaseEntry: """Base class for all entries, stores common context like time etc.""" meta_type = "entry" def __init__(self, description, category=None, flag=None): self.utc_time = utcnow() self.machine_time = datetime.datetime.now() self.description = description self.category = category or DEFAULT_CATEGORY self.flag = flag or DEFAULT_FLAG # Will be set explicitly via containers self.line_no = None self.file_path = None def __str__(self): return repr(self) def __bool__(self): return True @property def name(self): """MyClass -> My Class""" return readable_name(self.__class__.__name__)
[docs] def serialize(self): """Shortcut method for serialization via schemas""" from .schemas.base import registry return registry.serialize(self)
[docs]class Group: # we treat Groups as assertions so we can render them with pass/fail context meta_type = "assertion" def __init__(self, entries, description=None): self.utc_time = utcnow() self.description = description self.entries = entries def __bool__(self): return self.passed def __repr__(self): return "{}(entries={}, description='{}')".format( self.__class__.__name__, self.entries, self.description ) @property def passed(self): """ Empty groups are truthy AKA does not contain anything that is failing. """ return (not self.entries) or all([bool(e) for e in self.entries])
[docs]class Summary(Group): """ A meta assertion that stores a subset of given entries. Groups assertion data into a nested structure by category, assertion type and pass/fail status. If any of the entries is a Group, then its entries are expanded and the Group object is discarded. """ def __init__( self, entries, description=None, num_passing=defaults.SUMMARY_NUM_PASSING, num_failing=defaults.SUMMARY_NUM_FAILING, key_combs_limit=defaults.SUMMARY_KEY_COMB_LIMIT, ): self.num_passing = num_passing self.num_failing = num_failing self.key_combs_limit = key_combs_limit super(Summary, self).__init__( entries=self._summarize( entries, num_passing=num_passing, num_failing=num_failing, key_combs_limit=key_combs_limit, ), description=description, ) def _flatten(self, entries): """ Recursively traverse entries and expand entries of groups. """ def _flatten(items): result = [] for item in items: if isinstance(item, Group) and not isinstance(item, Summary): result.extend(_flatten(item.entries)) else: result.append(item) return result return _flatten(entries) def _summarize(self, entries, num_passing, num_failing, key_combs_limit): # Circular imports from .assertions import Assertion from .summarization import registry # Get rid of Groups (but leave summaries) entries = self._flatten(entries) summaries = [e for e in entries if isinstance(e, Summary)] # Create nested data of depth 3 # Group by category, class name and pass/fail status groups = nested_groups( iterable=(e for e in entries if isinstance(e, Assertion)), key_funcs=[ operator.attrgetter("category"), lambda obj: obj.__class__.__name__, operator.truth, ], ) result = [] limits = dict( num_passing=num_passing, num_failing=num_failing, key_combs_limit=key_combs_limit, ) for category, category_grouping in groups: cat_group = Group( entries=[], description="Category: {}".format(category) ) for class_name, assertion_grouping in category_grouping: asr_group = Group( entries=[], description="Assertion type: {}".format( readable_name(class_name) ), ) for pass_status, assertion_entries in assertion_grouping: # Apply custom grouping, otherwise just trim the # list of entries via default summarization func. summarizer = registry[class_name] summary_group = summarizer( category=category, class_name=class_name, passed=pass_status, entries=assertion_entries, limits=limits, ) if len(summary_group.entries): asr_group.entries.append(summary_group) cat_group.entries.append(asr_group) result.append(cat_group) return summaries + result
[docs]class Log(BaseEntry): """Log a str to the report.""" def __init__(self, message, description=None, flag=None): if isinstance(message, str): self.message = message elif isinstance(message, bytes): self.message = message.decode() else: self.message = pprint.pformat(message) if not description: description = next((l for l in self.message.split("\n") if l), "") if len(description) > 80: description = description[0:80] + "..." super(Log, self).__init__(description=description, flag=flag)
class TableLog(BaseEntry): """Log a table to the report.""" def __init__(self, table, display_index=False, description=None): as_list = TableEntry(table).as_list_of_list() self.columns, self.table = as_list[0], as_list[1:] self.indices = range(len(self.table)) self.display_index = display_index super(TableLog, self).__init__(description=description) class DictLog(BaseEntry): """Log a dict object to the report.""" def __init__(self, dictionary, description=None): formatted_obj = fmt(dictionary) if len(formatted_obj) != 2 or formatted_obj[0] != 2: raise TypeError("Require a formatted object of mapping type") self.flattened_dict = flatten_formatted_object(formatted_obj) super(DictLog, self).__init__(description=description) class FixLog(DictLog): """Log a fix message to the report.""" def __init__(self, msg, description=None): if not msg or not isinstance(msg, dict): raise TypeError("Invalid format of fix message") super(FixLog, self).__init__(msg, description=description) class Graph(BaseEntry): """Create a graph for the report.""" def __init__( self, graph_type, graph_data, description=None, series_options=None, graph_options=None, ): """ NOTE: When adding functionality to Graph, VALID_GRAPH_TYPES, VALID_CHART_TYPES, VALID_GRAPH_OPTIONS and VALID_SERIES_OPTIONS must be kept updated """ self.VALID_GRAPH_TYPES = [ "Line", "Scatter", "Bar", "Whisker", "Contour", "Hexbin", ] self.VALID_CHART_TYPES = ["Pie"] self.VALID_GRAPH_OPTIONS = ["xAxisTitle", "yAxisTitle", "legend"] self.VALID_SERIES_OPTIONS = ["colour"] self.graph_type = graph_type self.graph_data = graph_data if series_options is not None: self.assert_valid_series_options(series_options, graph_data) self.series_options = series_options if graph_options is not None: self.assert_valid_graph_options(graph_options) self.graph_options = graph_options self.type = "Graph" if graph_type in self.VALID_CHART_TYPES: self.discrete_chart = True elif graph_type in self.VALID_GRAPH_TYPES: self.discrete_chart = False else: raise ValueError( "Graph of type {!r} cannot " "be rendered".format(graph_type) ) super(Graph, self).__init__(description=description) def assert_valid_graph_options(self, graph_options): for option in graph_options: if option not in self.VALID_GRAPH_OPTIONS: raise ValueError( "Graph option {!r} " "is not valid".format(option) ) def assert_valid_series_options(self, series_options, graph_data): for series_name in series_options: if series_name not in graph_data: raise ValueError( "Series {!r} cannot be found in " "graph data, cannot " "apply series options".format(series_name) ) for series_option in series_options[series_name]: if series_option not in self.VALID_SERIES_OPTIONS: raise ValueError( "Series Option: {!r} is not " "valid (found in series " "{!r})".format(series_option, series_name) ) class Attachment(BaseEntry): """Entry representing a file attached to the report.""" def __init__( self, filepath, description=None, dst_path=None, scratch_path=None ): self.source_path = filepath self.orig_filename = os.path.basename(filepath) self.filesize = os.path.getsize(filepath) if dst_path is None: basename, ext = os.path.splitext(self.orig_filename) hash = hash_file(self.source_path) # To avoid file name collision self.dst_path = f"{basename}-{hash}-{self.filesize}{ext}" else: self.dst_path = dst_path if scratch_path: # will best effort make a copy of the file try: copy_of_file = os.path.join(scratch_path, self.dst_path) shutil.copyfile(filepath, copy_of_file) except Exception: pass else: self.source_path = copy_of_file super(Attachment, self).__init__(description=description) class MatPlot(Attachment): """Display a MatPlotLib graph in the report.""" def __init__( self, pyplot, image_file_path, width, height, description=None ): if width: pyplot.gcf().set_figwidth(float(width)) if height: pyplot.gcf().set_figheight(float(height)) pyplot.savefig( image_file_path, dpi=96, pad_inches=0, transparent=False ) pyplot.close() super(MatPlot, self).__init__( filepath=image_file_path, description=description ) class Plotly(Attachment): def __init__(self, fig, data_file_path, style=None, description=None): fig_json = plotly.io.to_json(fig) pathlib.Path(data_file_path).resolve().parent.mkdir( parents=True, exist_ok=True ) with open(data_file_path, "w") as f: f.write(fig_json) super(Plotly, self).__init__( filepath=data_file_path, description=description ) self.style = style class Directory(BaseEntry): """ Entry representing a bunch of files under a directory which will be attached to the report. """ def __init__( self, dirpath, description=None, ignore=None, only=None, recursive=False, dst_path=None, scratch_path=None, ): self.source_path = dirpath self.ignore = ignore self.only = only self.recursive = recursive self.dst_path = ( dst_path or hashlib.md5(self.source_path.encode("utf-8")).hexdigest() ) self.file_list = traverse_dir( self.source_path, topdown=True, ignore=ignore, only=only, recursive=recursive, include_subdir=False, ) if scratch_path: # will best effort make a copy of the file for fpath in self.file_list: src_path = os.path.join(self.source_path, fpath) dst_path = os.path.join(scratch_path, self.dst_path, fpath) makedirs(os.path.dirname(dst_path)) try: shutil.copyfile(src_path, dst_path) except Exception: break else: self.source_path = os.path.join(scratch_path, self.dst_path) super(Directory, self).__init__(description=description) class CodeLog(BaseEntry): """Save source code to the report.""" def __init__(self, code, language="python", description=None): if isinstance(code, str): self.code = code elif isinstance(code, bytes): self.code = code.decode() else: raise TypeError("Code must be a string") self.language = language super(CodeLog, self).__init__(description=description) class Markdown(BaseEntry): """Save markdown to the report.""" def __init__(self, message, description=None, escape=True): if isinstance(message, str): self.message = message elif isinstance(message, bytes): self.message = message.decode() else: raise ValueError("Message must be a string") self.escape = escape super(Markdown, self).__init__(description=description)