Source code for testplan.runners.pools.base

"""Worker pool executor base classes."""
import datetime
import numbers
import os
import pprint
import queue
import threading
import time
import traceback
from typing import Dict, Generator, List, Optional, Tuple, Type, Union

from schema import And, Or

from testplan.common import entity
from testplan.common.config import ConfigOption
from testplan.common.utils import selector, strings
from testplan.common.utils.thread import interruptible_join
from testplan.common.utils.timing import wait_until_predicate
from testplan.common.report import Status, ReportCategories
from testplan.report.testing.base import TestGroupReport
from testplan.runners.base import Executor, ExecutorConfig
from testplan.testing.base import Test, TestResult

from .communication import Message
from .connection import QueueClient, QueueServer
from .tasks import Task, TaskResult


[docs]class TaskQueue: """ A priority queue that returns items in the order of priority small -> large. items with the same priority will be returned in the order they are added. """ def __init__(self) -> None: self.q = queue.PriorityQueue() self.count = 0
[docs] def put(self, priority: int, item: str) -> None: self.q.put((priority, self.count, item)) self.count += 1
[docs] def get(self) -> Tuple: entry = self.q.get_nowait() return entry[0], entry[2]
def __getattr__(self, name: str) -> None: return self.q.__getattribute__(name)
[docs]class WorkerConfig(entity.ResourceConfig): """ Configuration object for :py:class:`~testplan.runners.pools.base.Worker` resource entity. """
[docs] @classmethod def get_options(cls): """ Schema for options validation and assignment of default values. """ return { "index": Or(int, str), ConfigOption("transport", default=QueueClient): object, ConfigOption("restart_count", default=3): int, }
[docs]class WorkerBase(entity.Resource): """ Worker resource that pulls tasks from the transport provided, executes them and sends back task results. :param index: Worker index id. :type index: ``int`` or ``str`` :param transport: Transport class for pool/worker communication. :type transport: :py:class:`~testplan.runners.pools.connection.Client` :param restart_count: How many times a worker in pool can be restarted. :type restart_count: ``int`` Also inherits all :py:class:`~testplan.common.entity.base.Resource` options. """ CONFIG = WorkerConfig def __init__(self, **options) -> None: super().__init__(**options) self._metadata = None self._transport = self.cfg.transport() self.last_heartbeat = None self.assigned = set() self.requesting = 0 self.restart_count = self.cfg.restart_count self._discard_running = threading.Event() @property def host(self) -> str: return "localhost"
[docs] def start(self): super(WorkerBase, self).start()
[docs] def stop(self): super(WorkerBase, self).stop()
@property def transport(self) -> QueueClient: """Pool/Worker communication transport.""" return self._transport @transport.setter def transport(self, transport: QueueClient) -> None: self._transport = transport @property def metadata(self) -> Dict: """Worker metadata information.""" if not self._metadata: self._metadata = { "thread": threading.current_thread(), "index": self.cfg.index, } return self._metadata @property def outfile(self) -> str: """Stdout file.""" return os.path.join(self.parent.runpath, f"{self.uid()}_startup")
[docs] def uid(self) -> Union[int, str]: """Worker unique index.""" return self.cfg.index
[docs] def respond(self, msg: Message) -> None: """ Method that the pool uses to respond with a message to the worker. :param msg: Response message. """ self._transport.respond(msg)
[docs] def rebase_attachment(self, result: TestResult) -> None: """Rebase the path of attachment from remote to local""" pass
[docs] def rebase_task_path(self, task: Task) -> None: """Rebase the path of task from local to remote""" pass
[docs] def discard_running_tasks(self): self._discard_running.set()
def __str__(self): return f"{self.__class__.__name__}[{self.cfg.index}]"
[docs]class Worker(WorkerBase): """ Worker that runs a thread and pull tasks from transport """ _STOP_TIMEOUT = 10 def __init__(self, **options) -> None: super().__init__(**options) self._handler = None self._curr_runnable_lock = threading.Lock() self._curr_runnable = None @property def handler(self): return self._handler
[docs] def starting(self) -> None: """Starts the daemonic worker loop.""" self.make_runpath_dirs() self._handler = threading.Thread( target=self._loop, args=(self._transport,) ) self._handler.daemon = True self._handler.start()
[docs] def stopping(self) -> None: """Stops the worker.""" if self._handler: interruptible_join(self._handler, self._STOP_TIMEOUT) self._handler = None
[docs] def aborting(self) -> None: """Aborting logic, will not wait running tasks.""" self._transport.disconnect() # NOTE: set to None to skip gracefully waiting till stop logic, # NOTE: it would be killed when process exits self._handler = None
def _wait_started(self, timeout: int = None) -> None: """Ready to communicate with pool.""" self.last_heartbeat = time.time() super(Worker, self)._wait_started(timeout=timeout) @property def is_alive(self) -> bool: """Poll the loop handler thread to check it is running as expected.""" return self._handler.is_alive() def _loop(self, transport: QueueClient) -> None: message = Message(**self.metadata) while self.active and self.status.tag not in ( self.status.STOPPING, self.status.STOPPED, ): received = transport.send_and_receive( message.make(message.TaskPullRequest, data=1) ) if received is None or received.cmd == Message.Stop: break elif received.cmd == Message.TaskSending: results = [] for item in received.data: if self._discard_running.is_set(): break task_result = self.execute(item) if not self._discard_running.is_set(): results.append(task_result) if self.cfg.skip_strategy.should_skip_rest_tests( task_result.result.report.status ): break received_ = transport.send_and_receive( message.make(message.TaskResults, data=results), expect=(message.Ack, message.Stop), ) if received_.cmd == Message.Stop: break elif received.cmd == Message.Ack: pass time.sleep(self.cfg.active_loop_sleep)
[docs] def execute(self, task: Task) -> TaskResult: """ Executes a task and return the associated task result. :param task: Task that worker pulled for execution. :return: Task result. """ try: with self._curr_runnable_lock: runnable: Test = task.materialize() if isinstance(runnable, entity.Runnable): if not runnable.parent: runnable.parent = self if not runnable.cfg.parent: runnable.cfg.parent = self.cfg self._curr_runnable = runnable result: TestResult = runnable.run() with self._curr_runnable_lock: self._curr_runnable = None except BaseException: task_result = TaskResult( task=task, result=None, status=False, reason=traceback.format_exc(), ) else: task_result = TaskResult(task=task, result=result, status=True) return task_result
[docs] def discard_running_tasks(self): super().discard_running_tasks() with self._curr_runnable_lock: if self._curr_runnable is not None: self._curr_runnable.abort()
[docs]class PoolConfig(ExecutorConfig): """ Configuration object for :py:class:`~testplan.runners.pools.base.Pool` executor resource entity. """
[docs] @classmethod def get_options(cls): """ Schema for options validation and assignment of default values. """ return { "name": str, ConfigOption("size", default=4): Or( And(int, lambda x: x > 0), lambda x: x == "auto" ), ConfigOption("worker_type", default=Worker): lambda x: issubclass( x, Worker ), ConfigOption("worker_heartbeat", default=None): Or( int, float, None ), ConfigOption("heartbeats_miss_limit", default=3): int, ConfigOption("restart_count", default=3): int, ConfigOption("max_active_loop_sleep", default=5): numbers.Number, ConfigOption("allow_task_rerun", default=True): bool, }
[docs]class Pool(Executor): """ Pool task executor object that initializes workers and dispatches tasks. :param name: Pool name. :param size: Pool workers size. If you set size="auto", smart-scheduling feature will calculate the size. Default: 4 :param worker_type: Type of worker to be initialized. :param worker_heartbeat: Worker heartbeat period. :param heartbeats_miss_limit: Maximum times a heartbeat is missed. :param restart_count: How many times a worker in pool can be restarted. :param max_active_loop_sleep: Maximum value for delay logic in active sleep. :param allow_task_rerun: Whether allow task to rerun when executing in this pool Also inherits all :py:class:`~testplan.runners.base.Executor` options. """ CONFIG = PoolConfig CONN_MANAGER = QueueServer def __init__( self, name: str, size: Union[int, str] = 4, worker_type: Type = Worker, worker_heartbeat: Optional[Union[int, float]] = None, heartbeats_miss_limit: int = 3, restart_count: int = 3, max_active_loop_sleep: Union[int, float] = 5, allow_task_rerun: bool = True, **options, ) -> None: options.update(self.filter_locals(locals())) super(Pool, self).__init__(**options) self.unassigned = TaskQueue() # unassigned tasks self.is_auto_size = size == "auto" self._executed_tests = [] self._task_retries_cnt = {} # uid: times_reassigned_without_result self._task_retries_limit = 2 self._workers = entity.Environment(parent=self) self._conn = self.CONN_MANAGER() self._conn.parent = self self._pool_lock = threading.Lock() self._metadata = None # Will set False when Pool is starting. self._exit_loop = True self._start_monitor_thread = True # Methods for handling different Message types. These are expected to # take the worker, request and response objects as the only required # positional args. self._request_handlers = { Message.ConfigRequest: self._handle_cfg_request, Message.TaskPullRequest: self._handle_taskpull_request, Message.TaskResults: self._handle_taskresults, # process & remote only Message.Heartbeat: self._handle_heartbeat, Message.SetupFailed: self._handle_setupfailed, } # for skip-remaining feature # like LocalRunner, ``_loop`` & ``discard_pending_tasks`` will be # triggered in different threads # NOTE: pool operations should all take little time, to narrow down # NOTE: the critical section would be rather meaningless self._discard_pending_lock = threading.RLock() self._discard_pending = False
[docs] def uid(self) -> str: """Pool name.""" return self.cfg.name
@property def size(self) -> Union[int, str]: """Pool size.""" return self.cfg.size @size.setter def size(self, value: int) -> None: self.cfg.size = value
[docs] def add(self, task: Task, uid: str) -> None: """ Add a task for execution. :param task: Task to be scheduled to workers :param uid: Task uid """ # FIXME: signature mismatch with ``Executor.add`` if not isinstance(task, Task): raise ValueError(f"Task was expected, got {type(task)} instead.") super(Pool, self).add(task, uid) self.unassigned.put(task.priority, uid) self._task_retries_cnt[uid] = 0
def _can_assign_task(self, task: Task) -> bool: """ Is this pool able to execute the task. :param task: Task to be scheduled to pool :return: True if can assign task to pool, otherwise False """ # TODO: always returns True, what is the point? return True def _can_assign_task_to_worker(self, task: Task, worker: Worker) -> bool: """ When a worker requests a task, it is necessary to verify that the worker is suitable to execute the task. :param task: Task to be scheduled to worker. :param worker: A worker created by pool executor. :return: True if can assign task to worker, otherwise False """ # TODO: always returns True, what is the point? return True def _loop(self) -> None: """ Main executor work loop - runs in a separate thread when the Pool is started. """ if self._start_monitor_thread: self.logger.debug("Starting worker monitor thread.") self._worker_monitor = threading.Thread( target=self._workers_monitoring ) self._worker_monitor.daemon = True self._worker_monitor.start() while self.active and not self._exit_loop: msg = self._conn.accept() if msg: try: self.handle_request(msg) except Exception: self.logger.error(traceback.format_exc()) time.sleep(self.cfg.active_loop_sleep)
[docs] def handle_request(self, request: Message) -> None: """ Handles a worker request. I.e TaskPull, TaskResults, Heartbeat etc. :param request: Worker request """ sender_index = request.sender_metadata["index"] worker: Worker = self._workers[sender_index] self.logger.debug( "Pool %s received message from worker %s - %s, %s", self.cfg.name, worker, request.cmd, request.data, ) if not worker.active: self.logger.warning( "Message from inactive worker %s - %s, %s", worker, request.cmd, request.data, ) response = Message(**self._metadata) if not self.active or self.status == self.STATUS.STOPPING: worker.respond(response.make(Message.Stop)) elif request.cmd in self._request_handlers: try: with self._discard_pending_lock: self._request_handlers[request.cmd]( worker, request, response ) except Exception: self.logger.error(traceback.format_exc()) self.logger.debug( "Not able to handle request from worker, sending Stop cmd" ) worker.respond(response.make(Message.Stop)) else: self.logger.error( "Unknown request: %s %s %s %s", request, dir(request), request.cmd, request.data, ) worker.respond(response.make(Message.Ack))
def _handle_cfg_request( self, worker: Worker, _: Message, response: Message ) -> None: """Handle a ConfigRequest from a worker.""" options = [] cfg = self.cfg while cfg: options.append(cfg.denormalize()) cfg = cfg.parent worker.respond(response.make(Message.ConfigSending, data=options)) def _handle_taskpull_request( self, worker: Worker, request: Message, response: Message ) -> None: """Handle a TaskPullRequest from a worker.""" tasks = [] if self.status == self.status.STARTED: for _ in range(request.data): try: priority, uid = self.unassigned.get() except queue.Empty: break task = self._input[uid] worker.rebase_task_path(task) if self._can_assign_task(task): if self._task_retries_cnt[uid] > self._task_retries_limit: self._discard_task( uid, f"{self._input[uid]} already reached max retries limit:" f" {self._task_retries_limit}", ) continue else: if self._can_assign_task_to_worker(task, worker): self.logger.user_info( "Scheduling %s to %s %s", task, worker, "(rerun {})".format(task.reassign_cnt) if task.reassign_cnt > 0 else "", ) worker.assigned.add(uid) tasks.append(task) task.executors.setdefault(self.cfg.name, set()) task.executors[self.cfg.name].add(worker.uid()) self.record_execution(uid) else: self.logger.user_info( "Cannot schedule %s to %s", task, worker ) self.unassigned.put(task.priority, uid) self._task_retries_cnt[uid] += 1 else: # Later may create a default local pool as failover option self._discard_task( uid, f"{self._input[uid]} cannot be executed in {self}", ) if tasks: worker.respond(response.make(Message.TaskSending, data=tasks)) worker.requesting = request.data - len(tasks) return worker.requesting = request.data worker.respond(response.make(Message.Ack)) def _handle_taskresults( self, worker: Worker, request: Message, response: Message ) -> None: """Handle a TaskResults message from a worker.""" worker.respond(response.make(Message.Ack)) if self._discard_pending: # real clean up done in ``discard_pending_tasks`` return def task_should_rerun(): if not self.cfg.allow_task_rerun: return False if self.cfg.skip_strategy: return False if not task_result.task: return False if task_result.task.rerun == 0: return False result = task_result.result if ( task_result.status and result and result.run and result.report.passed ): return False if task_result.task.reassign_cnt >= task_result.task.rerun: self.logger.user_info( "Will not rerun %(input)s again as it already " "reached max rerun limit %(reruns)d", { "input": self._input[uid], "reruns": task_result.task.rerun, }, ) return False return True agg_report_status = Status.NONE for task_result in request.data: uid = task_result.task.uid() worker.assigned.remove(uid) self.logger.user_info( "De-assign %s from %s", task_result.task, worker ) if isinstance(task_result.result, TestResult): report: TestGroupReport = task_result.result.report report.host = worker.host agg_report_status = Status.precedent( [agg_report_status, report.status] ) worker.rebase_attachment(task_result.result) if task_should_rerun(): self.logger.user_info( "Will rerun %(task)s for max %(rerun)d more times", { "task": task_result.task, "rerun": task_result.task.rerun - task_result.task.reassign_cnt, }, ) self.unassigned.put(task_result.task.priority, uid) self._task_retries_cnt[uid] = 0 self._input[uid].reassign_cnt += 1 # Will rerun task, but still need to retain the result self._append_temporary_task_result(task_result) continue self._print_test_result(task_result) self._results[uid] = task_result self.ongoing.remove(uid) if self.cfg.skip_strategy.should_skip_rest_tests(agg_report_status): self.bubble_up_discard_tasks( selector.Not(selector.Eq(self.uid())), report_reason="per skip strategy", ) self.discard_pending_tasks(report_reason="per skip strategy") def _handle_heartbeat( self, worker: Worker, request: Message, response: Message ) -> None: """Handle a Heartbeat message received from a worker.""" worker.last_heartbeat = time.time() self.logger.debug( "Received heartbeat from %s at %s after %ss.", worker, request.data, time.time() - request.data, ) if self._discard_pending: worker.respond( response.make( Message.DiscardPending, data=worker.last_heartbeat ) ) else: worker.respond( response.make(Message.Ack, data=worker.last_heartbeat) ) def _handle_setupfailed( self, worker: Worker, request: Message, response: Message ) -> None: """Handle a SetupFailed message received from a worker.""" self.logger.user_info( "Worker %s setup failed:%s%s", worker, os.linesep, request.data ) worker.respond(response.make(Message.Ack)) self._decommission_worker(worker, "Aborting {}, setup failed.") def _decommission_worker(self, worker: Worker, message: str) -> None: """ Decommission a worker by move all assigned task back to pool """ self.logger.warning(message.format(worker)) if os.path.exists(worker.outfile): self.logger.user_info("\tlogfile: %s", worker.outfile) while worker.assigned: uid = worker.assigned.pop() task = self._input[uid] self.logger.user_info( "Re-collect %s from %s to %s.", task, worker, self ) self.unassigned.put(task.priority, uid) self._task_retries_cnt[uid] += 1 def _workers_monitoring(self) -> None: """ Worker fault tolerance logic. Check is based on: 1) handler status 2) heartbeat if available """ previous_status = { "active": [], "inactive": [], "initializing": [], "abort": [], } loop_interval = self.cfg.worker_heartbeat or 5 # seconds break_outer_loop = False while self.active: hosts_status = { "active": [], "inactive": [], "initializing": [], "abort": [], } for worker in self._workers: status, reason = self._query_worker_status(worker) if status == "inactive": with self._pool_lock: if ( self.active and self.status != self.status.STOPPING and self.status != self.status.STOPPED ): if self._handle_inactive(worker, reason): status = "active" else: self.logger.user_info( "%s is aborting/stopping, exit monitor.", self ) break_outer_loop = True break hosts_status[status].append(worker) if break_outer_loop: break if hosts_status != previous_status: self.logger.info( "Hosts status update at %s", datetime.datetime.now() ) self.logger.info(pprint.pformat(hosts_status)) previous_status = hosts_status if ( not hosts_status["active"] and not hosts_status["initializing"] and not hosts_status["inactive"] and hosts_status["abort"] ): # all workers aborting / aborted if not self._exit_loop: self.logger.critical( "All workers are aborting / aborted, abort %s.", self ) self.abort() # TODO: abort pool in a monitor thread ? break try: # For early finish of worker monitoring thread. wait_until_predicate( lambda: not self.is_alive, timeout=loop_interval, interval=0.05, ) except RuntimeError: self.logger.user_info("%s is not alive, exit monitor.", self) break def _query_worker_status( self, worker: Worker ) -> Tuple[str, Optional[str]]: """ Query the current status of a worker. If heartbeat monitoring is enabled, check the last heartbeat time is within threshold. :param worker: Pool worker to query :return: worker status string - one of 'initializing', 'inactive' or 'active', and an optional reason string """ if not worker.active: return "abort", f"Worker {worker} aborting or aborted" if worker.status in (worker.status.STOPPING, worker.status.STOPPED): return "inactive", f"Worker {worker} stopping or stopped" if ( worker.status == worker.status.NONE or worker.status == worker.status.STARTING ): return "initializing", None # else: worker must be in state STARTED if worker.status != worker.status.STARTED: raise RuntimeError( f"Worker in unexpected state {worker.status.tag}" ) if not worker.is_alive: # handler based monitoring return ( "inactive", f"Decommission {worker}, handler no longer alive", ) # If no heartbeat is configured, we treat the worker as "active" # since it is in state STARTED and its handler is alive. if not self.cfg.worker_heartbeat: return "active", None # else: do heartbeat based monitoring lag = time.time() - worker.last_heartbeat if lag > self.cfg.worker_heartbeat * self.cfg.heartbeats_miss_limit: return ( "inactive", f"Has not been receiving heartbeat from {worker} for {lag} sec", ) return "active", None def _handle_inactive(self, worker: Worker, reason: str) -> bool: """ Handle an inactive worker. :param worker: worker object :param reason: why worker is considered inactive :return: True if worker restarted, else False """ if worker.status != worker.status.STARTED: return False self._decommission_worker(worker, reason) if worker.restart_count: worker.restart_count -= 1 try: worker.restart() self.logger.info("Worker %s has restarted", worker) return True except Exception as exc: self.logger.critical( "Worker %s failed to restart: %s", worker, exc ) self.logger.warning("Worker %s is inactive and will abort", worker) worker.abort() return False def _discard_task(self, uid, reason: str) -> None: self.logger.critical( "Discard task %s of %s - %s", self._input[uid], self, reason ) self._results[uid] = TaskResult( task=self._input[uid], status=False, reason=f"Task discarded by {self} - {reason}", ) self.ongoing.remove(uid)
[docs] def discard_pending_tasks( self, report_status: Status = Status.NONE, report_reason: str = "" ): with self._discard_pending_lock: self._discard_pending = True for w in self._workers: w: WorkerBase # do real discard w.discard_running_tasks() self.logger.warning("Discard pending tasks of %s.", self) while self.ongoing: uid = self.ongoing[0] task = self._input[uid] if report_status: reason = ( f"{task} discarded" + (" " + report_reason if report_reason else "") + "." ) result = TestResult() result.report = TestGroupReport( name=str(task), category=ReportCategories.ERROR ) result_lines = [ "{}: {}".format(attr, getattr(task, attr, None) or "") for attr in task.serializable_attrs ] result.report.logger.error(os.linesep.join(result_lines)) result.report.logger.error(reason) result.report.status_override = report_status self._results[uid] = TaskResult( task=self._input[uid], status=False, reason=reason, result=result, ) if report_reason: self.logger.warning( "Discarding %s %s.", task, report_reason ) self.ongoing.pop(0) self.unassigned = TaskQueue()
def _append_temporary_task_result(self, task_result: TaskResult) -> None: """If a task should rerun, append the task result already fetched.""" test_report = task_result.result.report uid = task_result.task.uid() if uid not in self._task_retries_cnt: return postfix = f" => Run {task_result.task.reassign_cnt}" test_report.name = f"{test_report.name}{postfix}" test_report.uid = f"{test_report.uid}{postfix}" test_report.category = ReportCategories.TASK_RERUN test_report.status_override = Status.XFAIL new_uuid = strings.uuid4() self._results[new_uuid] = task_result self.parent._tests[new_uuid] = self.cfg.name self.record_execution(new_uuid) def _print_test_result(self, task_result: TaskResult) -> None: if (not isinstance(task_result.result, entity.RunnableResult)) or ( not hasattr(task_result.result, "report") ): return # Currently prints report top level result and not details. name = task_result.result.report.name self.logger.log_test_status(name, task_result.result.report.status) def _add_workers(self) -> None: """Initialise worker instances.""" for idx in (str(i) for i in range(self.cfg.size)): worker = self.cfg.worker_type( index=idx, restart_count=self.cfg.restart_count, active_loop_sleep=0.01, ) worker.parent = self worker.cfg.parent = self.cfg self._workers.add(worker, uid=idx) self.logger.debug( "Added worker %(index)s (outfile = %(outfile)s)", {"index": idx, "outfile": worker.outfile}, ) def _start_workers(self) -> None: """Start all workers of the pool.""" for worker in self._workers: self._conn.register(worker) self._workers.start() def _reset_workers(self) -> None: """ Reset all workers in case that pool restarts but still use the existed workers. A worker in STOPPED status can make monitor think it is dead. """ for worker in self._workers: worker.status.reset()
[docs] def starting(self) -> None: """Starting the pool and workers.""" self.make_runpath_dirs() if self.runpath is None: raise RuntimeError("runpath was not set correctly") self._metadata = {"runpath": self.runpath} self._conn.start() self._exit_loop = False if self._workers: self._reset_workers() super(Pool, self).starting() # start the loop & monitor if not self._workers: self._add_workers() self._start_workers() if self._workers.start_exceptions: for msg in self._workers.start_exceptions.values(): self.logger.error(msg) self.abort() raise RuntimeError(f"All workers of {self} failed to start")
[docs] def workers_requests(self) -> int: """Count how many tasks workers are requesting.""" return sum(worker.requesting for worker in self._workers)
def _stop_workers(self): self._workers.stop()
[docs] def stopping(self) -> None: """Stop connections and workers.""" with self._pool_lock: self._stop_workers() for worker in self._workers: worker.transport.disconnect() self._exit_loop = True super(Pool, self).stopping() # stop the loop (monitor will stop later) self._conn.stop()
[docs] def abort_dependencies(self) -> Generator: return yield
[docs] def aborting(self) -> None: """Aborting logic.""" for worker in self._workers: worker.abort() self._exit_loop = True super(Pool, self).stopping() # stop the loop and the monitor self._conn.abort() self.discard_pending_tasks( report_status=Status.ERROR, report_reason=f"due to {self} aborted" )
[docs] def record_execution(self, uid) -> None: self._executed_tests.append(uid)
[docs] def get_current_status_for_debug(self) -> List[str]: """ Get information about tasks and workers in ``Pool`` for debugging. :return: ``Tasks`` and ``Workers`` information. """ msgs = [] if self.added_items: msgs.append(f"{self.class_name} {self.cfg.name} added tasks:") for task in self.added_items: msgs.append(f"\t{task}") else: msgs.append(f"No added tasks in {self.class_name}") if self.ongoing: msgs.append(f"{self.class_name} {self.cfg.name} pending tasks:") for task in self.ongoing: msgs.append(f"\t{task}") else: msgs.append(f"No pending tasks in {self.class_name}") if self._workers: msgs.append( f"Workers in {self.class_name} {self.cfg.name} with status and waiting assigned tasks:" ) for worker in self._workers: status, reason = self._query_worker_status(worker) msgs.append(f"\t{worker.uid()}") msgs.append(f"\t\tStatus: {status}, Reason: {reason}") if worker.assigned: msgs.append( f"\t\tWaiting for completion of tasks: {worker.assigned}" ) else: msgs.append(f"\t\tNo tasks to complete.") else: msgs.append(f"No workers in {self.class_name} {self.cfg.name}.") return msgs