testplan.runners.pools package
Subpackages
Submodules
testplan.runners.pools.base module
Worker pool executor base classes.
- class testplan.runners.pools.base.Pool(name: str, size: int | str = 4, worker_type: ~typing.Type = <class 'testplan.runners.pools.base.Worker'>, worker_heartbeat: int | float | None = None, heartbeats_miss_limit: int = 3, restart_count: int = 3, max_active_loop_sleep: int | float = 5, allow_task_rerun: bool = True, **options)[source]
Bases:
Executor
Pool task executor object that initializes workers and dispatches tasks.
- Parameters:
name – Pool name.
size – Pool workers size. If you set size=”auto”, smart-scheduling feature will calculate the size. Default: 4
worker_type – Type of worker to be initialized.
worker_heartbeat – Worker heartbeat period.
heartbeats_miss_limit – Maximum times a heartbeat is missed.
restart_count – How many times a worker in pool can be restarted.
max_active_loop_sleep – Maximum value for delay logic in active sleep.
allow_task_rerun – Whether allow task to rerun when executing in this pool
Also inherits all
Executor
options.- CONFIG
alias of
PoolConfig
- CONN_MANAGER
alias of
QueueServer
- RESERVE_WORKER_NUM = 3
- add(task: Task, uid: str) None [source]
Add a task for execution.
- Parameters:
task – Task to be scheduled to workers
uid – Task uid
- get_current_status_for_debug() List[str] [source]
Get information about tasks and workers in
Pool
for debugging.- Returns:
Tasks
andWorkers
information.
- handle_request(request: Message) None [source]
Handles a worker request. I.e TaskPull, TaskResults, Heartbeat etc.
- Parameters:
request – Worker request
- property size: int | str
Pool size.
- class testplan.runners.pools.base.PoolConfig(**options)[source]
Bases:
ExecutorConfig
Configuration object for
Pool
executor resource entity.
- class testplan.runners.pools.base.TaskQueue[source]
Bases:
object
A priority queue that returns items in the order of priority small -> large. items with the same priority will be returned in the order they are added.
- class testplan.runners.pools.base.Worker(**options)[source]
Bases:
WorkerBase
Worker that runs a thread and pull tasks from transport
- execute(task: Task) TaskResult [source]
Executes a task and return the associated task result.
- Parameters:
task – Task that worker pulled for execution.
- Returns:
Task result.
- property handler
- property is_alive: bool
Poll the loop handler thread to check it is running as expected.
- class testplan.runners.pools.base.WorkerBase(**options)[source]
Bases:
Resource
Worker resource that pulls tasks from the transport provided, executes them and sends back task results.
- Parameters:
index – Worker index id.
transport – Transport class for pool/worker communication.
restart_count – How many times a worker in pool can be restarted.
stop_timeout – Timeout for graceful shutdown (in seconds).
Also inherits all
Resource
options.- CONFIG
alias of
WorkerConfig
- property host: str
- property metadata: Dict
Worker metadata information.
- property outfile: str
Stdout file.
- rebase_attachment(result: TestResult) None [source]
Rebase the path of attachment from remote to local
- respond(msg: Message) None [source]
Method that the pool uses to respond with a message to the worker.
- Parameters:
msg – Response message.
- start()[source]
Triggers the start logic of a Resource by executing :py:meth: Resource.starting <testplan.common.entity.base.Resource.starting> method.
- stop()[source]
Triggers the stop logic of a Resource by executing :py:meth: Resource.stopping <testplan.common.entity.base.Resource.stopping> method.
- property transport: QueueClient
Pool/Worker communication transport.
- class testplan.runners.pools.base.WorkerConfig(**options)[source]
Bases:
ResourceConfig
Configuration object for
Worker
resource entity.
- testplan.runners.pools.base.get_test_status(task_result: TaskResult) Status [source]
testplan.runners.pools.child module
Child worker process module.
- class testplan.runners.pools.child.ChildLoop(index, transport, pool_type, pool_size, worker_type, logger, runpath=None)[source]
Bases:
object
Child process loop that can be started in a process and starts a local thread pool to execute the tasks received.
- property metadata
Metadata information.
- class testplan.runners.pools.child.RemoteChildLoop(*args, **kwargs)[source]
Bases:
ChildLoop
Child loop for remote workers. This involved exchange of metadata for additional functionality.
- testplan.runners.pools.child.process_syspath_file(filename, working_dir=None)[source]
Process the syspath file, which should contain one sys.path entry per line Since we might be on a remote host, we need to check the accessibility of those entries. And we should always be able to directly access modules in the working directory. The result is written back to the original file for bookkeeping.
testplan.runners.pools.communication module
Communication protocol for execution pools.
- class testplan.runners.pools.communication.Message(**sender_metadata)[source]
Bases:
object
Object to be used for pool-worker communication.
- Ack = 'Ack'
- ConfigRequest = 'ConfigRequest'
- ConfigSending = 'ConfigSending'
- DiscardPending = 'DiscardPending'
- Heartbeat = 'Heartbeat'
- InitRequest = 'InitRequest'
- KeepAlive = 'KeepAlive'
- Message = 'Message'
- Metadata = 'Metadata'
- MetadataPull = 'MetadataPull'
- SetupFailed = 'SetupFailed'
- Stop = 'Stop'
- TaskPullRequest = 'TaskPullRequest'
- TaskResults = 'TaskResults'
- TaskSending = 'TaskSending'
testplan.runners.pools.connection module
Connections module.
- class testplan.runners.pools.connection.Client[source]
Bases:
Loggable
Workers are Client in Pool/Worker communication. Abstract base class for workers to communicate with its pool.
- abstract send(message: Message) None [source]
Sends a message to server.
- Parameters:
message – Message to be sent.
- send_and_receive(message: Message, expect: None | Tuple | List | Message = None) Message | None [source]
Send and receive shortcut. Optionally assert that the response is of the type expected. I.e For a TaskSending message, an Ack is expected.
- Parameters:
message – Message sent.
expect – Expected command of message received.
- Returns:
Message received.
- class testplan.runners.pools.connection.QueueClient(recv_sleep: float = 0.05)[source]
Bases:
Client
Queue based client implementation, for thread pool workers to communicate with its pool.
- connect(requests: Queue) None [source]
Connect to the request queue of Pool :param requests: request queue of pool that worker should write to. :type requests: Queue
- receive() Message [source]
Worker receives response to the message sent, this method blocks.
- Returns:
Response to the message sent.
- class testplan.runners.pools.connection.QueueServer[source]
Bases:
Server
Queue based server implementation, for thread pool to get requests from workers.
- accept() Message | None [source]
Accepts the next request in the request queue.
- Returns:
Message received from worker transport, or None.
- class testplan.runners.pools.connection.Server[source]
Bases:
Resource
Abstract base class for pools to communicate to its workers.
- abstract accept() Message | None [source]
Accepts a new message from worker. This method should not block - if no message is queued for receiving it should return None.
- Returns:
Message received from worker transport, or None.
- class testplan.runners.pools.connection.ZMQClient(address: str, recv_sleep: float = 0.05, recv_timeout: float = 5)[source]
Bases:
Client
ZMQ based client implementation for process worker to communicate with its pool.
- Parameters:
address – Pool server address to connect to.
recv_sleep – Sleep duration in msg receive loop.
- class testplan.runners.pools.connection.ZMQClientProxy[source]
Bases:
object
Representative of a process worker’s transport in local worker object.
- class testplan.runners.pools.connection.ZMQServer[source]
Bases:
Server
ZMQ based server implementation, for process/remote/treadmill pool to get request from workers.
- accept() Message | None [source]
Accepts a new message from worker. Doesn’t block if no message is queued for receiving.
- Returns:
Message received from worker transport, or None.
- property address
- property sock
testplan.runners.pools.process module
Process worker pool module.
- class testplan.runners.pools.process.ProcessPool(name: str, size: int | str = 4, host: str = '127.0.0.1', port: int = 0, abort_signals: ~typing.List[int] = None, worker_type: ~typing.Type = <class 'testplan.runners.pools.process.ProcessWorker'>, worker_heartbeat: int | float = 5, **options)[source]
Bases:
Pool
Pool task executor object that initializes process workers and dispatches tasks.
- Parameters:
name – Pool name.
size – Pool workers size. If you set size=”auto”, smart-scheduling feature will calculate the size. Default: 4
host – Host that pool binds and listens for requests.
port – Port that pool binds. Default: 0 (random)
abort_signals – Signals to trigger abort logic. Default: INT, TERM.
worker_type – Type of worker to be initialized.
worker_heartbeat – Worker heartbeat period.
Also inherits all
Pool
options.- CONFIG
alias of
ProcessPoolConfig
- class testplan.runners.pools.process.ProcessPoolConfig(**options)[source]
Bases:
PoolConfig
Configuration object for
ProcessPool
executor resource entity.
- class testplan.runners.pools.process.ProcessWorker(**options)[source]
Bases:
Worker
Process worker resource that pulls tasks from the transport provided, executes them and sends back task results.
- Parameters:
transport – Transport class for pool/worker communication.
Also inherits all
Worker
options.- CONFIG
alias of
ProcessWorkerConfig
- property is_alive: bool
Poll the loop handler thread to check it is running as expected.
- class testplan.runners.pools.process.ProcessWorkerConfig(**options)[source]
Bases:
WorkerConfig
Configuration object for
ProcessWorker
resource entity.
testplan.runners.pools.remote module
Remote worker pool module.
- class testplan.runners.pools.remote.RemotePool(name: str, hosts: ~typing.Dict[str, int], abort_signals: ~typing.List[int] = None, worker_type: ~typing.Type = <class 'testplan.runners.pools.remote.RemoteWorker'>, pool_type: str = 'process', host: str = '172.17.0.2', port: int = 0, worker_heartbeat: float = 30, ssh_port: int = 22, ssh_cmd: ~typing.Callable = <function ssh_cmd>, copy_cmd: ~typing.Callable = <function copy_cmd>, workspace: str = None, workspace_exclude: ~typing.List[str] = None, remote_runpath: str = None, testplan_path: str = None, remote_workspace: str = None, clean_remote: bool = False, push: ~typing.List[str | ~typing.Tuple[str, str]] = None, push_exclude: ~typing.List[str] = None, delete_pushed: bool = False, fetch_runpath: bool = True, fetch_runpath_exclude: ~typing.List[str] = None, pull: ~typing.List[str] = None, pull_exclude: ~typing.List[str] = None, env: ~typing.Dict[str, str] = None, setup_script: ~typing.List[str] = None, **options)[source]
Bases:
Pool
Pool task executor object that initializes remote workers and dispatches tasks.
- Parameters:
name – Pool name.
hosts – Map of host(ip): number of their local thread/process workers. i.e {‘hostname1’: 2, ‘10.147.XX.XX’: 4}
abort_signals – Signals to trigger abort logic. Default: INT, TERM.
worker_type – Type of worker to be initialized.
pool_type – Child pool type that remote workers will use.
can be
thread
orprocess
, default tothread
ifworkers
is 1 and otherwiseprocess
. :param host: Host that pool binds and listens for requests. Defaults tolocal hostname.
- Parameters:
port – Port that pool binds. Default: 0 (random)
worker_heartbeat – Worker heartbeat period.
ssh_port – The ssh port number of remote host, default is 22.
ssh_cmd – callable that prefix a command with ssh binary and options
copy_cmd – callable that returns the cmdline to do copy on remote host
workspace – Current project workspace to be transferred, default is pwd.
workspace_exclude – Patterns to exclude files when pushing workspace.
remote_runpath – Root runpath on remote host, default is same as local (Linux->Linux) or /var/tmp/$USER/testplan/$plan_name (Window->Linux).
testplan_path – Path to import testplan from on remote host, default is testplan_lib under remote_runpath
remote_workspace – The path of the workspace on remote host, default is fetched_workspace under remote_runpath
clean_remote – Deleted root runpath on remote at exit.
push (
list
that containsstr
ortuple
: -str
: Name of the file or directory -tuple
: A (src, dst) pair) – Files and directories to push to the remote.push_exclude – Patterns to exclude files on push stage.
delete_pushed – Deleted pushed files on remote at exit.
fetch_runpath – The flag of fetch remote resource’s runpath, default to True.
fetch_runpath_exclude – Exclude files matching PATTERN.
pull – Files and directories to be pulled from the remote at the end.
pull_exclude – Patterns to exclude files on pull stage..
env – Environment variables to be propagated.
setup_script – Script to be executed on remote as very first thing.
Also inherits all
Pool
options.- CONFIG
alias of
RemotePoolConfig
- MAX_THREAD_POOL_SIZE = 5
- QUEUE_WAIT_INTERVAL = 3
- get_current_status_for_debug() List[str] [source]
Gets
Hosts
andWorkers
infromation for debugging.- Returns:
Status information of Hosts and Workers.
- property resource_monitor_address: str | None
- class testplan.runners.pools.remote.RemotePoolConfig(**options)[source]
Bases:
PoolConfig
Configuration object for
RemotePool
executor resource entity.- default_hostname = '172.17.0.2'
- ignore_extra_keys = True
- class testplan.runners.pools.remote.RemoteWorker(**options)[source]
Bases:
ProcessWorker
,RemoteResource
Remote worker resource that pulls tasks from the transport provided, executes them in a local pool of workers and sends back task results.
- Parameters:
pool_type – Child pool type that remote workers will use.
can be
thread
orprocess
, default tothread
ifworkers
is 1 and otherwiseprocess
. :param workers: Number of thread/process workers of the child pool, default to 1.Also inherits all
ProcessWorkerConfig
andRemoteResource
options.- CONFIG
alias of
RemoteWorkerConfig
- property host: str
- class testplan.runners.pools.remote.UnboundRemoteWorkerConfig(**options)[source]
Bases:
ProcessWorkerConfig
,UnboundRemoteResourceConfig
Configuration object for
RemoteWorker
resource entity.- ignore_extra_keys = True
Module contents
Execution pools module.