testplan.runners.pools package
Subpackages
Submodules
testplan.runners.pools.base module
Worker pool executor base classes.
- class testplan.runners.pools.base.Pool(name: str, size: int | str = 4, worker_type: ~typing.Type = <class 'testplan.runners.pools.base.Worker'>, worker_heartbeat: int | float | None = None, heartbeats_miss_limit: int = 3, restart_count: int = 3, max_active_loop_sleep: int | float = 5, allow_task_rerun: bool = True, **options)[source]
Bases:
ExecutorPool task executor object that initializes workers and dispatches tasks.
- Parameters:
name – Pool name.
size – Pool workers size. If you set size=”auto”, smart-scheduling feature will calculate the size. Default: 4
worker_type – Type of worker to be initialized.
worker_heartbeat – Worker heartbeat period.
heartbeats_miss_limit – Maximum times a heartbeat is missed.
restart_count – How many times a worker in pool can be restarted.
max_active_loop_sleep – Maximum value for delay logic in active sleep.
allow_task_rerun – Whether allow task to rerun when executing in this pool
Also inherits all
Executoroptions.- CONFIG
alias of
PoolConfig
- CONN_MANAGER
alias of
QueueServer
- RESERVE_WORKER_NUM = 3
- add(task: Task, uid: str) None[source]
Add a task for execution.
- Parameters:
task – Task to be scheduled to workers
uid – Task uid
- get_current_status_for_debug() List[str][source]
Get information about tasks and workers in
Poolfor debugging.- Returns:
TasksandWorkersinformation.
- handle_request(request: Message) None[source]
Handles a worker request. I.e TaskPull, TaskResults, Heartbeat etc.
- Parameters:
request – Worker request
- property size: int | str
Pool size.
- class testplan.runners.pools.base.PoolConfig(**options)[source]
Bases:
ExecutorConfigConfiguration object for
Poolexecutor resource entity.
- class testplan.runners.pools.base.TaskQueue[source]
Bases:
objectA priority queue that returns items in the order of priority small -> large. items with the same priority will be returned in the order they are added.
- class testplan.runners.pools.base.Worker(**options)[source]
Bases:
WorkerBaseWorker that runs a thread and pull tasks from transport
- execute(task: Task) TaskResult[source]
Executes a task and return the associated task result.
- Parameters:
task – Task that worker pulled for execution.
- Returns:
Task result.
- property handler
- property is_alive: bool
Poll the loop handler thread to check it is running as expected.
- class testplan.runners.pools.base.WorkerBase(**options)[source]
Bases:
ResourceWorker resource that pulls tasks from the transport provided, executes them and sends back task results.
- Parameters:
index – Worker index id.
transport – Transport class for pool/worker communication.
restart_count – How many times a worker in pool can be restarted.
stop_timeout – Timeout for graceful shutdown (in seconds).
Also inherits all
Resourceoptions.- CONFIG
alias of
WorkerConfig
- property host: str
- property metadata: Dict
Worker metadata information.
- property outfile: str
Stdout file.
- rebase_attachment(result: TestResult) None[source]
Rebase the path of attachment from remote to local
- respond(msg: Message) None[source]
Method that the pool uses to respond with a message to the worker.
- Parameters:
msg – Response message.
- start()[source]
Triggers the start logic of a Resource by executing :py:meth: Resource.starting <testplan.common.entity.base.Resource.starting> method.
- stop()[source]
Triggers the stop logic of a Resource by executing :py:meth: Resource.stopping <testplan.common.entity.base.Resource.stopping> method.
- property transport: QueueClient
Pool/Worker communication transport.
- class testplan.runners.pools.base.WorkerConfig(**options)[source]
Bases:
ResourceConfigConfiguration object for
Workerresource entity.
- testplan.runners.pools.base.get_test_status(task_result: TaskResult) Status[source]
testplan.runners.pools.child module
Child worker process module.
- class testplan.runners.pools.child.ChildLoop(index, transport, pool_type, pool_size, worker_type, logger, runpath=None)[source]
Bases:
objectChild process loop that can be started in a process and starts a local thread pool to execute the tasks received.
- property metadata
Metadata information.
- class testplan.runners.pools.child.RemoteChildLoop(*args, **kwargs)[source]
Bases:
ChildLoopChild loop for remote workers. This involved exchange of metadata for additional functionality.
- testplan.runners.pools.child.process_syspath_file(filename, working_dir=None)[source]
Process the syspath file, which should contain one sys.path entry per line Since we might be on a remote host, we need to check the accessibility of those entries. And we should always be able to directly access modules in the working directory. The result is written back to the original file for bookkeeping.
testplan.runners.pools.communication module
Communication protocol for execution pools.
- class testplan.runners.pools.communication.Message(**sender_metadata)[source]
Bases:
objectObject to be used for pool-worker communication.
- Ack = 'Ack'
- ConfigRequest = 'ConfigRequest'
- ConfigSending = 'ConfigSending'
- DiscardPending = 'DiscardPending'
- Heartbeat = 'Heartbeat'
- InitRequest = 'InitRequest'
- KeepAlive = 'KeepAlive'
- Message = 'Message'
- Metadata = 'Metadata'
- MetadataPull = 'MetadataPull'
- SetupFailed = 'SetupFailed'
- Stop = 'Stop'
- TaskPullRequest = 'TaskPullRequest'
- TaskResults = 'TaskResults'
- TaskSending = 'TaskSending'
testplan.runners.pools.connection module
Connections module.
- class testplan.runners.pools.connection.Client[source]
Bases:
LoggableWorkers are Client in Pool/Worker communication. Abstract base class for workers to communicate with its pool.
- abstractmethod send(message: Message) None[source]
Sends a message to server.
- Parameters:
message – Message to be sent.
- send_and_receive(message: Message, expect: None | Tuple | List | Message = None) Message | None[source]
Send and receive shortcut. Optionally assert that the response is of the type expected. I.e For a TaskSending message, an Ack is expected.
- Parameters:
message – Message sent.
expect – Expected command of message received.
- Returns:
Message received.
- class testplan.runners.pools.connection.QueueClient(recv_sleep: float = 0.05)[source]
Bases:
ClientQueue based client implementation, for thread pool workers to communicate with its pool.
- connect(requests: Queue) None[source]
Connect to the request queue of Pool :param requests: request queue of pool that worker should write to. :type requests: Queue
- receive() Message[source]
Worker receives response to the message sent, this method blocks.
- Returns:
Response to the message sent.
- class testplan.runners.pools.connection.QueueServer[source]
Bases:
ServerQueue based server implementation, for thread pool to get requests from workers.
- accept() Message | None[source]
Accepts the next request in the request queue.
- Returns:
Message received from worker transport, or None.
- class testplan.runners.pools.connection.Server[source]
Bases:
ResourceAbstract base class for pools to communicate to its workers.
- abstractmethod accept() Message | None[source]
Accepts a new message from worker. This method should not block - if no message is queued for receiving it should return None.
- Returns:
Message received from worker transport, or None.
- class testplan.runners.pools.connection.ZMQClient(address: str, recv_sleep: float = 0.05, recv_timeout: float = 5)[source]
Bases:
ClientZMQ based client implementation for process worker to communicate with its pool.
- Parameters:
address – Pool server address to connect to.
recv_sleep – Sleep duration in msg receive loop.
- class testplan.runners.pools.connection.ZMQClientProxy[source]
Bases:
objectRepresentative of a process worker’s transport in local worker object.
- class testplan.runners.pools.connection.ZMQServer[source]
Bases:
ServerZMQ based server implementation, for process/remote/treadmill pool to get request from workers.
- accept() Message | None[source]
Accepts a new message from worker. Doesn’t block if no message is queued for receiving.
- Returns:
Message received from worker transport, or None.
- property address
- property sock
testplan.runners.pools.process module
Process worker pool module.
- class testplan.runners.pools.process.ProcessPool(name: str, size: int | str = 4, host: str = '127.0.0.1', port: int = 0, abort_signals: ~typing.List[int] = None, worker_type: ~typing.Type = <class 'testplan.runners.pools.process.ProcessWorker'>, worker_heartbeat: int | float = 5, **options)[source]
Bases:
PoolPool task executor object that initializes process workers and dispatches tasks.
- Parameters:
name – Pool name.
size – Pool workers size. If you set size=”auto”, smart-scheduling feature will calculate the size. Default: 4
host – Host that pool binds and listens for requests.
port – Port that pool binds. Default: 0 (random)
abort_signals – Signals to trigger abort logic. Default: INT, TERM.
worker_type – Type of worker to be initialized.
worker_heartbeat – Worker heartbeat period.
Also inherits all
Pooloptions.- CONFIG
alias of
ProcessPoolConfig
- class testplan.runners.pools.process.ProcessPoolConfig(**options)[source]
Bases:
PoolConfigConfiguration object for
ProcessPoolexecutor resource entity.
- class testplan.runners.pools.process.ProcessWorker(**options)[source]
Bases:
WorkerProcess worker resource that pulls tasks from the transport provided, executes them and sends back task results.
- Parameters:
transport – Transport class for pool/worker communication.
Also inherits all
Workeroptions.- CONFIG
alias of
ProcessWorkerConfig
- property is_alive: bool
Poll the loop handler thread to check it is running as expected.
- class testplan.runners.pools.process.ProcessWorkerConfig(**options)[source]
Bases:
WorkerConfigConfiguration object for
ProcessWorkerresource entity.
testplan.runners.pools.remote module
Remote worker pool module.
- class testplan.runners.pools.remote.RemotePool(name: str, hosts: ~typing.Dict[str, int], abort_signals: ~typing.List[int] = None, worker_type: ~typing.Type = <class 'testplan.runners.pools.remote.RemoteWorker'>, pool_type: str = 'process', host: str = '172.17.0.2', port: int = 0, worker_heartbeat: float = 30, ssh_port: int = 22, ssh_cmd: ~typing.Callable = <function ssh_cmd>, copy_cmd: ~typing.Callable = <function copy_cmd>, workspace: str = None, workspace_exclude: ~typing.List[str] = None, remote_runpath: str = None, testplan_path: str = None, remote_workspace: str = None, clean_remote: bool = False, push: ~typing.List[str | ~typing.Tuple[str, str]] = None, push_exclude: ~typing.List[str] = None, delete_pushed: bool = False, fetch_runpath: bool = True, fetch_runpath_exclude: ~typing.List[str] = None, pull: ~typing.List[str] = None, pull_exclude: ~typing.List[str] = None, env: ~typing.Dict[str, str] = None, setup_script: ~typing.List[str] = None, **options)[source]
Bases:
PoolPool task executor object that initializes remote workers and dispatches tasks.
- Parameters:
name – Pool name.
hosts – Map of host(ip): number of their local thread/process workers. i.e {‘hostname1’: 2, ‘10.147.XX.XX’: 4}
abort_signals – Signals to trigger abort logic. Default: INT, TERM.
worker_type – Type of worker to be initialized.
pool_type – Child pool type that remote workers will use.
can be
threadorprocess, default tothreadifworkersis 1 and otherwiseprocess. :param host: Host that pool binds and listens for requests. Defaults tolocal hostname.
- Parameters:
port – Port that pool binds. Default: 0 (random)
worker_heartbeat – Worker heartbeat period.
ssh_port – The ssh port number of remote host, default is 22.
ssh_cmd – callable that prefix a command with ssh binary and options
copy_cmd – callable that returns the cmdline to do copy on remote host
workspace – Current project workspace to be transferred, default is pwd.
workspace_exclude – Patterns to exclude files when pushing workspace.
remote_runpath – Root runpath on remote host, default is same as local (Linux->Linux) or /var/tmp/$USER/testplan/$plan_name (Window->Linux).
testplan_path – Path to import testplan from on remote host, default is testplan_lib under remote_runpath
remote_workspace – The path of the workspace on remote host, default is fetched_workspace under remote_runpath
clean_remote – Deleted root runpath on remote at exit.
push (
listthat containsstrortuple: -str: Name of the file or directory -tuple: A (src, dst) pair) – Files and directories to push to the remote.push_exclude – Patterns to exclude files on push stage.
delete_pushed – Deleted pushed files on remote at exit.
fetch_runpath – The flag of fetch remote resource’s runpath, default to True.
fetch_runpath_exclude – Exclude files matching PATTERN.
pull – Files and directories to be pulled from the remote at the end.
pull_exclude – Patterns to exclude files on pull stage..
env – Environment variables to be propagated.
setup_script – Script to be executed on remote as very first thing.
Also inherits all
Pooloptions.- CONFIG
alias of
RemotePoolConfig
- MAX_THREAD_POOL_SIZE = 5
- QUEUE_WAIT_INTERVAL = 3
- get_current_status_for_debug() List[str][source]
Gets
HostsandWorkersinfromation for debugging.- Returns:
Status information of Hosts and Workers.
- property resource_monitor_address: str | None
- class testplan.runners.pools.remote.RemotePoolConfig(**options)[source]
Bases:
PoolConfigConfiguration object for
RemotePoolexecutor resource entity.- default_hostname = '172.17.0.2'
- ignore_extra_keys = True
- class testplan.runners.pools.remote.RemoteWorker(**options)[source]
Bases:
ProcessWorker,RemoteResourceRemote worker resource that pulls tasks from the transport provided, executes them in a local pool of workers and sends back task results.
- Parameters:
pool_type – Child pool type that remote workers will use.
can be
threadorprocess, default tothreadifworkersis 1 and otherwiseprocess. :param workers: Number of thread/process workers of the child pool, default to 1.Also inherits all
ProcessWorkerConfigandRemoteResourceoptions.- CONFIG
alias of
RemoteWorkerConfig
- property host: str
- class testplan.runners.pools.remote.UnboundRemoteWorkerConfig(**options)[source]
Bases:
ProcessWorkerConfig,UnboundRemoteResourceConfigConfiguration object for
RemoteWorkerresource entity.- ignore_extra_keys = True
Module contents
Execution pools module.