Source code for testplan.runners.base

"""Executor base classes."""

import threading
from collections import OrderedDict
from typing import Generator, List

from testplan.common.entity import Resource, ResourceConfig
from testplan.common.utils.selector import Expr as SExpr
from testplan.common.utils.thread import interruptible_join
from testplan.common.report import Status


[docs]class ExecutorConfig(ResourceConfig): """ Configuration object for :py:class:`Executor <testplan.runners.base.Executor>` resource. Inherits all :py:class:`~testplan.common.entity.base.ResourceConfig` options. """
[docs]class Executor(Resource): """ Receives items, executes them and create results. Subclasses must implement the ``Executor._loop`` logic. """ CONFIG = ExecutorConfig _STOP_TIMEOUT = 10 def __init__(self, **options) -> None: super(Executor, self).__init__(**options) self._loop_handler = None self._input = OrderedDict() self._results = OrderedDict() self.ongoing = [] self._discard_pending = False @property def class_name(self) -> str: """Returns the class name.""" return self.__class__.__name__ @property def results(self) -> OrderedDict: """Items results.""" return self._results @property def added_items(self) -> OrderedDict: """Returns added items.""" return self._input
[docs] def added_item(self, uid: str) -> object: """Returns the added item.""" return self._input[uid]
# TODO: based on aborting logic it is not clear why any object is # a good item even if abort_entity swallows the AttributeError on missing # abort. Perhaps a bit more clarity is needed here?
[docs] def add(self, item: object, uid: str) -> None: """ Adds an item for execution. :param item: To be executed and create a result. :param uid: Unique id. """ if self.active: self._input[uid] = item # `NoRunpathPool` adds item after calling `_prepopulate_runnables` # so the following step is still needed if uid not in self.ongoing: self.ongoing.append(uid)
[docs] def get(self, uid: str) -> object: """Get item result by uid.""" return self._results[uid]
def _loop(self) -> None: raise NotImplementedError() def _prepopulate_runnables(self) -> None: # _discard_pending can be set any time if self._discard_pending: return # If we are to apply test_sorter, it would be here # but it's not easy to implement a reasonable behavior # as _input could be a mixture of runnable/task/callable self.ongoing = list(self._input.keys())
[docs] def starting(self) -> None: """Starts the execution loop.""" self._prepopulate_runnables() self._loop_handler = threading.Thread(target=self._loop) self._loop_handler.daemon = True self._loop_handler.start()
[docs] def stopping(self) -> None: """Stop the executor.""" if self._loop_handler: interruptible_join(self._loop_handler, timeout=self._STOP_TIMEOUT)
[docs] def abort_dependencies(self) -> Generator: """Abort items running before aborting self.""" for uid in self.ongoing: yield self._input[uid]
@property def is_alive(self) -> bool: """Poll the loop handler thread to check it is running as expected.""" if self._loop_handler: return self._loop_handler.is_alive() else: return False
[docs] def pending_work(self) -> bool: """Resource has pending work.""" return len(self.ongoing) > 0
[docs] def discard_pending_tasks( self, report_status: Status = Status.NONE, report_reason: str = "" ): # NOTE: should discard currently running task as well # NOTE: currently Task class is defined under sub-package pool, which # NOTE: doesn't reflect the fact that LocalRunner is also able to # NOTE: consume them # NOTE: src to be re-composed, types (TaskResult and TestResult) to be # NOTE: uniformed, before similar logic could be promoted to their # NOTE: common ancestor - Executor raise NotImplementedError()
[docs] def bubble_up_discard_tasks( self, exec_selector: SExpr, report_status: Status = Status.NONE, report_reason: str = "", ): # used by "skip-remaining" feature # should only be triggered when executors living under TestRunner from testplan.runnable.base import TestRunner if self.parent is not None and isinstance(self.parent, TestRunner): self.parent.discard_pending_tasks( exec_selector, report_status=report_status, report_reason=report_reason, )
[docs] def get_current_status_for_debug(self) -> List[str]: """ Gets information about items in ``Executor`` for debugging. Subclasses can override this method and implement a well suited method to get items current status. :return: Status of items in ``Executor``. """ msgs = [] if self.added_items: msgs.append(f"{self.class_name} {self.cfg.name} added items:") for item in self.added_items: msgs.append(f"\t{item}") else: msgs.append(f"No added items in {self.class_name}") if self.ongoing: msgs.append(f"{self.class_name} {self.cfg.name} pending items:") for item in self.ongoing: msgs.append(f"\t{item}") else: msgs.append(f"No pending items in {self.class_name}") return msgs