Source code for testplan.runners.local

"""Basic local executor."""
import threading
import time
from typing import List

from testplan.common.utils import selector
from testplan.report import ReportCategories, Status, TestGroupReport
from testplan.runners.base import Executor
from testplan.runners.pools import tasks
from testplan.testing.base import Test, TestResult


[docs]class LocalRunner(Executor): """ Basic local execution that inherits :py:class:`Executor <testplan.runners.base.Executor>` and accepts all :py:class:`ExecutorConfig <testplan.runners.base.ExecutorConfig>` options. """ def __init__(self, uid="local_runner", **options) -> None: super(LocalRunner, self).__init__(**options) self._uid = uid # ``_loop`` & ``discard_pending_tasks`` are triggered in different # threads self._curr_runnable_lock = threading.Lock() self._curr_runnable = None
[docs] def execute(self, uid: str) -> TestResult: """Execute item implementation.""" # First retrieve the input from its UID. target = self._input[uid] if self._discard_pending: # to skip materialize, should be disposed immediately return TestResult() with self._curr_runnable_lock: # Inspect the input type. Tasks must be materialized before # they can be run. if isinstance(target, Test): runnable = target elif isinstance(target, tasks.Task): runnable = target.materialize() elif callable(target): runnable = target() else: raise TypeError( f"Cannot execute target of type {type(target)}" ) if not isinstance(runnable, Test): raise TypeError( f"Cannot execute target of type {type(runnable)}" ) if not runnable.parent: runnable.parent = self runnable.cfg.parent = self.cfg self._curr_runnable = runnable result = self._curr_runnable.run() with self._curr_runnable_lock: self._curr_runnable = None return result
def _loop(self) -> None: """Execution loop implementation for local runner.""" while self.active: if self.status == self.status.STARTING: self.status.change(self.status.STARTED) elif self.status == self.status.STARTED: try: next_uid = self.ongoing[0] except IndexError: pass else: try: result = self.execute(next_uid) except Exception as exc: result = TestResult() result.report = TestGroupReport( name=next_uid, category=ReportCategories.ERROR ) result.report.status_override = Status.ERROR result.report.logger.exception( "Exception for %s on %s execution: %s", next_uid, self, exc, ) finally: with self._curr_runnable_lock: if not self._discard_pending: # otherwise result from aborted test is used self._results[next_uid] = result self.ongoing.pop(0) if self.cfg.skip_strategy.should_skip_rest_tests( result.report.status ): self.bubble_up_discard_tasks( selector.Not(selector.Eq(self.uid())), report_reason="per skip strategy", ) self.discard_pending_tasks( report_reason="per skip strategy" ) elif self.status == self.status.STOPPING: self.status.change(self.status.STOPPED) return time.sleep(self.cfg.active_loop_sleep)
[docs] def starting(self) -> None: """Starting the local runner.""" if self.parent: self._runpath = self.parent.runpath super(LocalRunner, self).starting() # start the loop self.status.change(self.status.STARTED) # Start is async
[docs] def stopping(self) -> None: """Stopping the local runner.""" super(LocalRunner, self).stopping() # stop the loop self.status.change(self.status.STOPPED) # Stop is async
[docs] def aborting(self) -> None: """Aborting logic.""" self.discard_pending_tasks( report_status=Status.ERROR, report_reason=f"due to {self} aborted" )
[docs] def discard_pending_tasks( self, report_status: Status = Status.NONE, report_reason: str = "" ): with self._curr_runnable_lock: self._discard_pending = True if self._curr_runnable: self._curr_runnable.abort() self.logger.warning("Discard pending tasks of %s.", self) while self.ongoing: uid = self.ongoing.pop(0) if report_status: result = TestResult() result.report = TestGroupReport( name=uid, category=ReportCategories.ERROR, ) result.report.status_override = report_status result.report.logger.warning( "Test[%s] discarded%s.", uid, " " + report_reason if report_reason else "", ) self._results[uid] = result if report_reason: self.logger.warning( "Discarding Test[%s] %s.", uid, report_reason )
[docs] def get_current_status_for_debug(self) -> List[str]: """ Get current status of ``LocalRunner`` for debugging. :return: Status of ``LocalRunner``. :rtype: ``List[str]`` """ msgs = [f"{self.class_name} status: {self.status.tag}"] msgs.extend(super().get_current_status_for_debug()) return msgs