Pools¶
Pools are Test executors
that instantiate an array of
workers
that pull and
execute them in parallel. Test instances are generally not serializable so
Task
s are being scheduled to
the pools instead.
Pools are resources that can be added in a plan using
add_resource()
method of the
plan
object.
@test_plan(name='PoolDemo')
def main(plan):
...
pool = Pool(**options)
plan.add_resource(pool)
...
Task / Result¶
Task¶
A Task is an object that holds the path to a target that can be materialized
at runtime and executed by worker instances. For example, if a target function
that creates a MultiTest is in file ./tasks.py
:
# ./tasks.py
def make_multitest():
test = MultiTest(name='TestName',
suites=[Testsuite1(), Testsuite2()],
environment=[Server(name='server'), ...])
return test
the task that holds the information to materialize the MultiTest at runtime is the following:
# ./test_plan.py
task = Task(target='make_multitest',
module='tasks',
path=os.path.dirname(os.path.abspath(__file__))) # same dir
The target function can accept arguments:
# ./tasks.py
def make_multitest(index):
test = MultiTest(name='Test_{}'.format(index),
...)
return test
and many Test instances can be created from the same target function:
# ./test_plan.py
for idx in range(10):
task = Task(target='make_multitest',
module='tasks',
path=os.path.dirname(os.path.abspath(__file__)),
args=(idx,)) # or kwargs={'index': idx}
With argument rerun testplan can rerun the task up to user specified times until it passes:
# ./test_plan.py
task = Task(target='make_multitest',
module='tasks',
path=os.path.dirname(os.path.abspath(__file__)),
rerun=3) # default value 0 means no rerun
Task rerun can be disabled at pool level with allow_task_rerun
parameter.
# ./test_plan.py
pool = ThreadPool(name="MyPool", allow_task_rerun=False)
Task can associate with a weight value, and it affects task scheduling - the larger the weight, the sooner task will be assigned to a worker. Default weight is 0, and tasks with the same weight will be scheduled in the order they are added.
# ./test_plan.py
task = Task(target='make_multitest',
module='tasks',
path=os.path.dirname(os.path.abspath(__file__)),
weight=100)
TaskResult¶
A TaskResult
is the object that
is returned to the pool by the worker and contains either the actual result,
or the error that prevented the execution.
plan.schedule¶
plan.schedule
is used to
schedule a
Task to a Pool and once it’s scheduled and pool is started, it will be pulled
and executed by a worker.
# add a pool to the plan
pool = Pool(name='PoolName', ...)
plan.add_resource(pool)
# schedule a task to the pool
task = Task(target='make_multitest', ...)
plan.schedule(task, resource='PoolName')
Basic pool types¶
The base pool object accepts some
configuration
options that
may be vary based on pool implementations.
These are the current built-in pool types that can be added to a plan:
ThreadPool¶
In a thread pool,
workers
are started in separate
threads and they pull tasks from the pool using a transport layer that lives in
the same memory space. The workers are materializing the actual Tests, execute
them and send results
back to the main pool.
from testplan.runners.pools import ThreadPool
@test_plan(name='ThreadPoolPlan')
def main(plan):
# Add a thread pool of 4 workers.
pool = ThreadPool(name='MyPool', size=4)
plan.add_resource(pool)
# Schedule 10 tasks to the thread pool to execute them 4 in parallel.
for idx in range(10):
task = Task(target='make_multitest',
module='tasks')
plan.schedule(task, resource='MyPool')
See a downloadable example of a thread pool.
ProcessPool¶
Similar to the thread pool, the worker interpreters are
started in separate processes and communicate with the pool via
ZMQ transport
with TCP
connection using localhost
. During this communication process, the Tasks
and TaskResults are being serialized and de-serialized so all they input
arguments need to support that as well.
from testplan.runners.pools.process import ProcessPool
@test_plan(name='ProcessPoolPlan')
def main(plan):
# A pool with 4 child process workers.
pool = ProcessPool(name='MyPool', size=4)
plan.add_resource(pool)
# Schedule 10 tasks to the process pool to execute them 4 in parallel.
for idx in range(10):
# All Task arguments need to be serializable.
task = Task(target='make_multitest',
module='tasks',
path='.')
plan.schedule(task, resource='MyPool')
See a downloadable example of a process pool.
RemotePool¶
Remote pool is using ssh to start remote worker interpreters that are
communicating with the local pool with the
ZMQ
transport as well.
During this process, the local workspace will be transferred to the remote
workers (if needed) and the workers will start local ‘thread’ or ‘process’
pools, based on their configuration.
from testplan.runners.pools.remote import RemotePool
@test_plan(name='RemotePoolPlan')
def main(plan):
# A pool with 2 remote workers.
# One with 2 local workers and the other with 1.
pool = RemotePool(name='MyPool',
hosts={'hostname1': 2,
'hostname2': 1})
plan.add_resource(pool)
# Schedule 10 tasks to the remote pool to execute them 3 in parallel.
for idx in range(10):
# All Task arguments need to be serializable.
task = Task(target='make_multitest',
module='tasks',
path='.')
plan.schedule(task, resource='MyPool')
See a downloadable example of a remote pool.
Fault tolerance¶
There are some mechanisms enabled to prevent failures of Tests due to system
failures and their behaviour is a part of
pool configuration
:
- Worker not responsive: Workers (excluding Thread workers) are sending heartbeat messages back to the pool and the frequency can be set using
worker_heartbeat
option. If worker fails to send a number of heartbeats (heartbeats_miss_limit
option), all tasks assigned to the worker will be reassigned to the pool.- Task retry: If a worker dies while running a task, testplan will restart the worker and retry the task (for 2 times max). Note that this retry behavior doesn’t have to do with the Task’s rerun setting.
MultiTest parts scheduling¶
A Task that returns a MultiTest can be scheduled in parts in one or more pools. Each MultiTest will have its own environment and will run a subtotal of testcases based on which part of the total number of parts it is. So each MultiTest part will produce its own report entry, these entries can be merged before exported.
To split a MultiTest task into several parts, we can provide a tuple of 2 elements as a parameter, the first element indicates the sequence number of part, and the second one is the number of parts in total. For the tuple (M, N), make sure that N > 1 and 0 <= M < N, where M and N are both integers.
from testplan.runners.pools import ThreadPool
@test_plan(name='ThreadPoolPlan', merge_scheduled_parts=False)
def main(plan):
# Add a thread pool of 3 workers.
# Also you can use process pool or remote pool instead.
pool = ThreadPool(name='MyPool', size=3)
plan.add_resource(pool)
# Schedule 10 tasks to the thread pool.
# A parameter `part_tuple` is provided to indicate which part it is.
for idx in range(10):
task = Task(target='make_multitest',
module='tasks',
kwargs={'part_tuple': (i, 10)})
plan.schedule(task, resource='MyPool')
If you set merge_scheduled_parts=True, please be sure that all parts of a MultiTest will be executed, for example, if a MultiTest is split into 3 parts, then 3 tasks containing MultiTest part should be scheduled, with the parameter tuple (0, 3), (1, 3) and (2, 3) for each task, also note that a MultiTest can only be schedule once, or there will be error during merging reports.
See a downloadable example of MultiTest parts scheduling.
Task discover¶
For some projects, user may find task target definition (e.g the make_multitest
function) and plan.schedule
call become rather repetitive. To reduce boilerplate
code, @task_target
and
plan.schedule_all
are introduced
to do task discovery.
plan.schedule_all(
path=".",
name_pattern=r".*tasks\.py$",
resource="MyPool",
)
In the code above, testplan will go look for @task_target decorated functions
in modules that matches the name_pattern
under current working directory.
@task_target
def make_multitest():
# A test target shall only return 1 runnable object
test = MultiTest(name="MTest", suites=[Suite()])
return test
Once found, task object will be created from the target, and scheduled to pool. It is possible to create multiple task objects out of one target with parameters specified:
@task_target(
parameters=(
# positional args to be passed to target, as a tuple or list
("MTest1", None, [SimpleSuite1, SimpleSuite2]),
# keyword args to be passed to target, as a dict
dict(
name="MTest2-1",
part_tuple=(0, 2),
suites=[ComplicatedSuite],
),
dict(
name="MTest2-2",
part_tuple=(1, 2),
suites=[ComplicatedSuite],
),
),
# additional arguments of Task class
rerun=1,
weight=1,
)
def make_multitest(name, part_tuple=None, suites=None):
# A test target shall only return 1 runnable object
test = MultiTest(
name=name, suites=[cls() for cls in suites], part=part_tuple
)
return test
The code above specifies a collections of parameters in @task_target, and each entry will be used create one task - thus 3 tasks will be created from the target.
For a complete and downloadable example, see here.
Auto-Part and Smart Scheduling¶
This feature allows schedule_all()
to optimize testplan overall execution time based on historical runtime data. It is
enabled by providing runtime data like following via --runtime-data
command line argument:
{
"<Multitest>": {
"execution_time": 199.99,
"setup_time": 39.99,
},
......
}
The optimization goal is to create just enough number of pool size and allow all tests to finish as soon as possible. This is achieved by 3 technics:
1. Auto-part: automatically slice multitests discovered from @task_target with multitest_parts="auto"
argument into
optimal number of parts subject to auto_part_runtime_limit
- default to 30 minutes.
@task_target(multitest_parts="auto")
def make_multitest():
# A test target shall only return 1 runnable object
test = MultiTest(name="MTest", suites=[Suite()])
return test
2. Weight-based scheduling: each multitest (part) will be associated with a weight value that represents historical runtime. Multitest (part) with larger weight will be scheduled with higher priority.
3. Auto-size: if the target pool is specified to have "auto"
size, schedule_all()
will calculate a right number
of pool size so that all tests finishes within plan_runtime_target
- default to 30 minutes.
# Enable smart-schedule pool size
pool = ProcessPool(name="MyPool", size="auto")
# Add a process pool test execution resource to the plan of given size.
plan.add_resource(pool)
# Discover tasks and calculate the right size of the pool based on the weight (runtime) of the
# tasks so that runtime of all tasks meets the plan_runtime_target.
plan.schedule_all(
path=".",
name_pattern=r".*task\.py$",
resource="MyPool",
)
To tune the smart scheduling behavior, override auto_part_runtime_limit
and plan_runtime_target
default
in @test_plan
decorator.
For a complete and downloadable example, see here.