testplan.runners.pools package

Submodules

testplan.runners.pools.base module

Worker pool executor base classes.

class testplan.runners.pools.base.Pool(name: str, size: Union[int, str] = 4, worker_type: Type[CT_co] = <class 'testplan.runners.pools.base.Worker'>, worker_heartbeat: Union[int, float, None] = None, heartbeats_miss_limit: int = 3, restart_count: int = 3, max_active_loop_sleep: Union[int, float] = 5, allow_task_rerun: bool = True, **options)[source]

Bases: testplan.runners.base.Executor

Pool task executor object that initializes workers and dispatches tasks.

Parameters:
  • name – Pool name.
  • size – Pool workers size. If you set size=”auto”, smart-scheduling feature will calculate the size. Default: 4
  • worker_type – Type of worker to be initialized.
  • worker_heartbeat – Worker heartbeat period.
  • heartbeats_miss_limit – Maximum times a heartbeat is missed.
  • restart_count – How many times a worker in pool can be restarted.
  • max_active_loop_sleep – Maximum value for delay logic in active sleep.
  • allow_task_rerun – Whether allow task to rerun when executing in this pool

Also inherits all Executor options.

CONFIG

alias of PoolConfig

CONN_MANAGER

alias of testplan.runners.pools.connection.QueueServer

abort_dependencies() → Generator[T_co, T_contra, V_co][source]

Abort items running before aborting self.

aborting() → None[source]

Aborting logic.

add(task: testplan.runners.pools.tasks.base.Task, uid: str) → None[source]

Add a task for execution.

Parameters:
  • task – Task to be scheduled to workers
  • uid – Task uid
discard_pending_tasks(report_status: testplan.report.testing.base.Status = <Status.NONE: 59>, report_reason: str = '')[source]
get_current_status_for_debug() → List[str][source]

Get information about tasks and workers in Pool for debugging.

Returns:Tasks and Workers information.
handle_request(request: testplan.runners.pools.communication.Message) → None[source]

Handles a worker request. I.e TaskPull, TaskResults, Heartbeat etc.

Parameters:request – Worker request
record_execution(uid) → None[source]
size

Pool size.

starting() → None[source]

Starting the pool and workers.

stopping() → None[source]

Stop connections and workers.

uid() → str[source]

Pool name.

workers_requests() → int[source]

Count how many tasks workers are requesting.

class testplan.runners.pools.base.PoolConfig(**options)[source]

Bases: testplan.runners.base.ExecutorConfig

Configuration object for Pool executor resource entity.

classmethod get_options()[source]

Schema for options validation and assignment of default values.

class testplan.runners.pools.base.TaskQueue[source]

Bases: object

A priority queue that returns items in the order of priority small -> large. items with the same priority will be returned in the order they are added.

get() → Tuple[source]
put(priority: int, item: str) → None[source]
class testplan.runners.pools.base.Worker(**options)[source]

Bases: testplan.runners.pools.base.WorkerBase

Worker that runs a thread and pull tasks from transport

aborting() → None[source]

Aborting logic, will not wait running tasks.

discard_running_tasks()[source]
execute(task: testplan.runners.pools.tasks.base.Task) → testplan.runners.pools.tasks.base.TaskResult[source]

Executes a task and return the associated task result.

Parameters:task – Task that worker pulled for execution.
Returns:Task result.
handler
is_alive

Poll the loop handler thread to check it is running as expected.

starting() → None[source]

Starts the daemonic worker loop.

stopping() → None[source]

Stops the worker.

class testplan.runners.pools.base.WorkerBase(**options)[source]

Bases: testplan.common.entity.base.Resource

Worker resource that pulls tasks from the transport provided, executes them and sends back task results.

Parameters:
  • index (int or str) – Worker index id.
  • transport (Client) – Transport class for pool/worker communication.
  • restart_count (int) – How many times a worker in pool can be restarted.

Also inherits all Resource options.

CONFIG

alias of WorkerConfig

discard_running_tasks()[source]
host
metadata

Worker metadata information.

outfile

Stdout file.

rebase_attachment(result: testplan.testing.base.TestResult) → None[source]

Rebase the path of attachment from remote to local

rebase_task_path(task: testplan.runners.pools.tasks.base.Task) → None[source]

Rebase the path of task from local to remote

respond(msg: testplan.runners.pools.communication.Message) → None[source]

Method that the pool uses to respond with a message to the worker.

Parameters:msg – Response message.
start()[source]

Triggers the start logic of a Resource by executing :py:meth: Resource.starting <testplan.common.entity.base.Resource.starting> method.

stop()[source]

Triggers the stop logic of a Resource by executing :py:meth: Resource.stopping <testplan.common.entity.base.Resource.stopping> method.

transport

Pool/Worker communication transport.

uid() → Union[int, str][source]

Worker unique index.

class testplan.runners.pools.base.WorkerConfig(**options)[source]

Bases: testplan.common.entity.base.ResourceConfig

Configuration object for Worker resource entity.

classmethod get_options()[source]

Schema for options validation and assignment of default values.

testplan.runners.pools.child module

Child worker process module.

class testplan.runners.pools.child.ChildLoop(index, transport, pool_type, pool_size, worker_type, logger, runpath=None)[source]

Bases: object

Child process loop that can be started in a process and starts a local thread pool to execute the tasks received.

exit_loop()[source]
metadata

Metadata information.

worker_loop()[source]

Child process worker loop. Manages an underlying thread pool, pulls and sends back results to the main pool.

class testplan.runners.pools.child.RemoteChildLoop(*args, **kwargs)[source]

Bases: testplan.runners.pools.child.ChildLoop

Child loop for remote workers. This involved exchange of metadata for additional functionality.

exit_loop()[source]
testplan.runners.pools.child.child_logic(args)[source]

Able to be imported child logic.

testplan.runners.pools.child.parse_cmdline()[source]

Child worker command line parsing

testplan.runners.pools.child.process_syspath_file(filename, working_dir=None)[source]

Process the syspath file, which should contain one sys.path entry per line Since we might be on a remote host, we need to check the accessibility of those entries. And we should always be able to directly access modules in the working directory. The result is written back to the original file for bookkeeping.

testplan.runners.pools.communication module

Communication protocol for execution pools.

class testplan.runners.pools.communication.Message(**sender_metadata)[source]

Bases: object

Object to be used for pool-worker communication.

Ack = 'Ack'
ConfigRequest = 'ConfigRequest'
ConfigSending = 'ConfigSending'
DiscardPending = 'DiscardPending'
Heartbeat = 'Heartbeat'
InitRequest = 'InitRequest'
KeepAlive = 'KeepAlive'
Message = 'Message'
Metadata = 'Metadata'
MetadataPull = 'MetadataPull'
SetupFailed = 'SetupFailed'
Stop = 'Stop'
TaskPullRequest = 'TaskPullRequest'
TaskResults = 'TaskResults'
TaskSending = 'TaskSending'
make(cmd: str, data: object = None) → testplan.runners.pools.communication.Message[source]

Crete a new message for communication.

Parameters:
  • cmd – Command representing message purpose.
  • data – Data of message object.
Returns:

self

testplan.runners.pools.connection module

Connections module.

class testplan.runners.pools.connection.Client[source]

Bases: testplan.common.utils.logger.Loggable

Workers are Client in Pool/Worker communication. Abstract base class for workers to communicate with its pool.

connect(server) → None[source]

Connect client to server

disconnect() → None[source]

Disconnect client from server

receive() → Optional[testplan.runners.pools.communication.Message][source]

Receives response to the message sent

send(message: testplan.runners.pools.communication.Message) → None[source]

Sends a message to server.

Parameters:message – Message to be sent.
send_and_receive(message: testplan.runners.pools.communication.Message, expect: Union[None, Tuple, List[T], testplan.runners.pools.communication.Message] = None) → Optional[testplan.runners.pools.communication.Message][source]

Send and receive shortcut. Optionally assert that the response is of the type expected. I.e For a TaskSending message, an Ack is expected.

Parameters:
  • message – Message sent.
  • expect – Expected command of message received.
Returns:

Message received.

class testplan.runners.pools.connection.QueueClient(recv_sleep: float = 0.05)[source]

Bases: testplan.runners.pools.connection.Client

Queue based client implementation, for thread pool workers to communicate with its pool.

connect(requests: queue.Queue) → None[source]

Connect to the request queue of Pool :param requests: request queue of pool that worker should write to. :type requests: Queue

disconnect() → None[source]

Disconnect worker from pool

receive() → testplan.runners.pools.communication.Message[source]

Worker receives response to the message sent, this method blocks.

Returns:Response to the message sent.
respond(message: testplan.runners.pools.communication.Message) → None[source]

Used by Pool to respond to worker request.

Parameters:message – Respond message.
send(message: testplan.runners.pools.communication.Message) → None[source]

Worker sends a message

Parameters:message – Message to be sent.
class testplan.runners.pools.connection.QueueServer[source]

Bases: testplan.runners.pools.connection.Server

Queue based server implementation, for thread pool to get requests from workers.

accept() → Optional[testplan.runners.pools.communication.Message][source]

Accepts the next request in the request queue.

Returns:Message received from worker transport, or None.
register(worker) → None[source]

Register a new worker. Workers should be registered after the connection manager is started and will be automatically unregistered when it is stopped.

starting() → None[source]

Server starting logic.

class testplan.runners.pools.connection.Server[source]

Bases: testplan.common.entity.base.Resource

Abstract base class for pools to communicate to its workers.

aborting() → None[source]

Abort policy - no abort actions are required in the base class.

accept() → Optional[testplan.runners.pools.communication.Message][source]

Accepts a new message from worker. This method should not block - if no message is queued for receiving it should return None.

Returns:Message received from worker transport, or None.
register(worker: Worker) → None[source]

Register a new worker. Workers should be registered after the connection manager is started and will be automatically unregistered when it is stopped.

starting() → None[source]

Server starting logic.

stopping() → None[source]

Server stopping logic.

class testplan.runners.pools.connection.ZMQClient(address: str, recv_sleep: float = 0.05, recv_timeout: float = 5)[source]

Bases: testplan.runners.pools.connection.Client

ZMQ based client implementation for process worker to communicate with its pool.

Parameters:
  • address – Pool server address to connect to.
  • recv_sleep – Sleep duration in msg receive loop.
connect() → None[source]

Connect to a ZMQ Server

disconnect() → None[source]

Disconnect from Server

receive() → Optional[testplan.runners.pools.communication.Message][source]

Worker tries to receive the response to the message sent until timeout.

Returns:Response to the message sent.
send(message: testplan.runners.pools.communication.Message) → None[source]

Worker sends a message.

Parameters:message – Message to be sent.
class testplan.runners.pools.connection.ZMQClientProxy[source]

Bases: object

Representative of a process worker’s transport in local worker object.

connect(server) → None[source]
disconnect() → None[source]
respond(message: testplan.runners.pools.communication.Message) → None[source]

Used by Pool to respond to worker request.

Parameters:message – Respond message.
class testplan.runners.pools.connection.ZMQServer[source]

Bases: testplan.runners.pools.connection.Server

ZMQ based server implementation, for process/remote/treadmill pool to get request from workers.

aborting() → None[source]

Terminate the ZMQ context and socket when aborting.

accept() → Optional[testplan.runners.pools.communication.Message][source]

Accepts a new message from worker. Doesn’t block if no message is queued for receiving.

Returns:Message received from worker transport, or None.
address
register(worker) → None[source]

Register a new worker.

sock
starting()[source]

Create a ZMQ context and socket to handle TCP communication.

stopping() → None[source]

Terminate the ZMQ context and socket when stopping. We require that all workers are stopped before stopping the connection manager, so that we can safely remove references to connection sockets from the worker.

testplan.runners.pools.process module

Process worker pool module.

class testplan.runners.pools.process.ProcessPool(name: str, size: Union[int, str] = 4, host: str = '127.0.0.1', port: int = 0, abort_signals: List[int] = None, worker_type: Type[CT_co] = <class 'testplan.runners.pools.process.ProcessWorker'>, worker_heartbeat: Union[int, float] = 5, **options)[source]

Bases: testplan.runners.pools.base.Pool

Pool task executor object that initializes process workers and dispatches tasks.

Parameters:
  • name – Pool name.
  • size – Pool workers size. If you set size=”auto”, smart-scheduling feature will calculate the size. Default: 4
  • host – Host that pool binds and listens for requests.
  • port – Port that pool binds. Default: 0 (random)
  • abort_signals – Signals to trigger abort logic. Default: INT, TERM.
  • worker_type – Type of worker to be initialized.
  • worker_heartbeat – Worker heartbeat period.

Also inherits all Pool options.

CONFIG

alias of ProcessPoolConfig

CONN_MANAGER

alias of testplan.runners.pools.connection.ZMQServer

add(task: testplan.runners.pools.tasks.base.Task, uid: str) → None[source]

Before adding Tasks to a ProcessPool, check that the Task target does not come from __main__.

class testplan.runners.pools.process.ProcessPoolConfig(**options)[source]

Bases: testplan.runners.pools.base.PoolConfig

Configuration object for ProcessPool executor resource entity.

classmethod get_options()[source]

Schema for options validation and assignment of default values.

class testplan.runners.pools.process.ProcessWorker(sigint_timeout: int = 5, **options)[source]

Bases: testplan.runners.pools.base.Worker

Process worker resource that pulls tasks from the transport provided, executes them and sends back task results.

Parameters:
  • transport – Transport class for pool/worker communication.
  • sigint_timeout – number of seconds to wait between SIGINT and SIGKILL

Also inherits all Worker options.

CONFIG

alias of ProcessWorkerConfig

aborting() → None[source]

Process worker abort logic.

discard_running_tasks()[source]
is_alive

Poll the loop handler thread to check it is running as expected.

pre_start() → None[source]

Steps to be executed right before resource starts.

starting() → None[source]

Start a child process worker.

stopping() → None[source]

Stop child process worker.

class testplan.runners.pools.process.ProcessWorkerConfig(**options)[source]

Bases: testplan.runners.pools.base.WorkerConfig

Configuration object for ProcessWorker resource entity.

classmethod get_options()[source]

Schema for options validation and assignment of default values.

testplan.runners.pools.remote module

Remote worker pool module.

class testplan.runners.pools.remote.RemotePool(name: str, hosts: Dict[str, int], abort_signals: List[int] = None, worker_type: Type[CT_co] = <class 'testplan.runners.pools.remote.RemoteWorker'>, pool_type: str = 'process', host: str = '172.17.0.2', port: int = 0, worker_heartbeat: float = 30, ssh_port: int = 22, ssh_cmd: Callable = <function ssh_cmd>, copy_cmd: Callable = <function copy_cmd>, workspace: str = None, workspace_exclude: List[str] = None, remote_runpath: str = None, testplan_path: str = None, remote_workspace: str = None, clean_remote: bool = False, push: List[Union[str, Tuple[str, str]]] = None, push_exclude: List[str] = None, delete_pushed: bool = False, fetch_runpath: bool = True, fetch_runpath_exclude: List[str] = None, pull: List[str] = None, pull_exclude: List[str] = None, env: Dict[str, str] = None, setup_script: List[str] = None, **options)[source]

Bases: testplan.runners.pools.base.Pool

Pool task executor object that initializes remote workers and dispatches tasks.

Parameters:
  • name – Pool name.
  • hosts – Map of host(ip): number of their local thread/process workers. i.e {‘hostname1’: 2, ‘10.147.XX.XX’: 4}
  • abort_signals – Signals to trigger abort logic. Default: INT, TERM.
  • worker_type – Type of worker to be initialized.
  • pool_type – Child pool type that remote workers will use.

can be thread or process, default to thread if workers is 1 and otherwise process. :param host: Host that pool binds and listens for requests. Defaults to

local hostname.
Parameters:
  • port – Port that pool binds. Default: 0 (random)
  • worker_heartbeat – Worker heartbeat period.
  • ssh_port – The ssh port number of remote host, default is 22.
  • ssh_cmd – callable that prefix a command with ssh binary and options
  • copy_cmd – callable that returns the cmdline to do copy on remote host
  • workspace – Current project workspace to be transferred, default is pwd.
  • workspace_exclude – Patterns to exclude files when pushing workspace.
  • remote_runpath – Root runpath on remote host, default is same as local (Linux->Linux) or /var/tmp/$USER/testplan/$plan_name (Window->Linux).
  • testplan_path – Path to import testplan from on remote host, default is testplan_lib under remote_runpath
  • remote_workspace – The path of the workspace on remote host, default is fetched_workspace under remote_runpath
  • clean_remote – Deleted root runpath on remote at exit.
  • push (list that contains str or tuple: - str: Name of the file or directory - tuple: A (src, dst) pair) – Files and directories to push to the remote.
  • push_exclude – Patterns to exclude files on push stage.
  • delete_pushed – Deleted pushed files on remote at exit.
  • fetch_runpath – The flag of fetch remote resource’s runpath, default to True.
  • fetch_runpath_exclude – Exclude files matching PATTERN.
  • pull – Files and directories to be pulled from the remote at the end.
  • pull_exclude – Patterns to exclude files on pull stage..
  • env – Environment variables to be propagated.
  • setup_script – Script to be executed on remote as very first thing.

Also inherits all Pool options.

CONFIG

alias of RemotePoolConfig

CONN_MANAGER

alias of testplan.runners.pools.connection.ZMQServer

aborting() → None[source]

Aborting logic.

get_current_status_for_debug() → List[str][source]

Gets Hosts and Workers infromation for debugging.

Returns:Status information of Hosts and Workers.
resource_monitor_address
starting() → None[source]

Starting the pool and workers.

stopping() → None[source]

Stop connections and workers.

class testplan.runners.pools.remote.RemotePoolConfig(**options)[source]

Bases: testplan.runners.pools.base.PoolConfig

Configuration object for RemotePool executor resource entity.

default_hostname = '172.17.0.2'
classmethod get_options()[source]

Schema for options validation and assignment of default values.

ignore_extra_keys = True
class testplan.runners.pools.remote.RemoteWorker(**options)[source]

Bases: testplan.runners.pools.process.ProcessWorker, testplan.common.remote.remote_resource.RemoteResource

Remote worker resource that pulls tasks from the transport provided, executes them in a local pool of workers and sends back task results.

Parameters:pool_type – Child pool type that remote workers will use.

can be thread or process, default to thread if workers is 1 and otherwise process. :param workers: Number of thread/process workers of the child pool, default to 1.

Also inherits all ProcessWorkerConfig and RemoteResource options.

CONFIG

alias of RemoteWorkerConfig

host
post_stop() → None[source]

Steps to be executed right after resource is stopped.

pre_start() → None[source]

Steps to be executed right before resource starts.

pre_stop() → None[source]

Stop child process worker.

rebase_attachment(result) → None[source]

Rebase the path of attachment from remote to local

rebase_task_path(task) → None[source]

Rebase the path of task from local to remote

class testplan.runners.pools.remote.RemoteWorkerConfig(**options)[source]

Bases: testplan.runners.pools.remote.UnboundRemoteWorkerConfig, testplan.common.remote.remote_resource.RemoteResourceConfig

class testplan.runners.pools.remote.UnboundRemoteWorkerConfig(**options)[source]

Bases: testplan.runners.pools.process.ProcessWorkerConfig, testplan.common.remote.remote_resource.UnboundRemoteResourceConfig

Configuration object for RemoteWorker resource entity.

classmethod get_options()[source]

Schema for options validation and assignment of default values.

ignore_extra_keys = True

Module contents

Execution pools module.