Pools

Pools are Test executors that instantiate an array of workers that pull and execute them in parallel. Test instances are generally not serializable so Task s are being scheduled to the pools instead.

_images/worker_pool.gif

Pools are resources that can be added in a plan using add_resource() method of the plan object.

@test_plan(name='PoolDemo')
def main(plan):
    ...
    pool = Pool(**options)
    plan.add_resource(pool)
    ...

Task / Result

Task

A Task is an object that holds the path to a target that can be materialized at runtime and executed by worker instances. For example, if a target function that creates a MultiTest is in file ./tasks.py:

# ./tasks.py

def make_multitest():
    test = MultiTest(name='TestName',
                     suites=[Testsuite1(), Testsuite2()],
                     environment=[Server(name='server'), ...])
    return test

the task that holds the information to materialize the MultiTest at runtime is the following:

# ./test_plan.py

task = Task(target='make_multitest',
            module='tasks',
            path=os.path.dirname(os.path.abspath(__file__)))  # same dir

The target function can accept arguments:

# ./tasks.py

def make_multitest(index):
    test = MultiTest(name='Test_{}'.format(index),
                     ...)
    return test

and many Test instances can be created from the same target function:

# ./test_plan.py

for idx in range(10):
    task = Task(target='make_multitest',
                module='tasks',
                path=os.path.dirname(os.path.abspath(__file__)),
                args=(idx,))  # or kwargs={'index': idx}

With argument rerun testplan can rerun the task up to user specified times until it passes:

# ./test_plan.py

task = Task(target='make_multitest',
            module='tasks',
            path=os.path.dirname(os.path.abspath(__file__)),
            rerun=3)  # default value 0 means no rerun

Task rerun can be disabled at pool level with allow_task_rerun parameter.

# ./test_plan.py

pool = ThreadPool(name="MyPool", allow_task_rerun=False)

Task can associate with a weight value, and it affects task scheduling - the larger the weight, the sooner task will be assigned to a worker. Default weight is 0, and tasks with the same weight will be scheduled in the order they are added.

# ./test_plan.py

task = Task(target='make_multitest',
            module='tasks',
            path=os.path.dirname(os.path.abspath(__file__)),
            weight=100)

TaskResult

A TaskResult is the object that is returned to the pool by the worker and contains either the actual result, or the error that prevented the execution.

plan.schedule

plan.schedule is used to schedule a Task to a Pool and once it’s scheduled and pool is started, it will be pulled and executed by a worker.

# add a pool to the plan
pool = Pool(name='PoolName', ...)
plan.add_resource(pool)

# schedule a task to the pool
task = Task(target='make_multitest', ...)
plan.schedule(task, resource='PoolName')

Basic pool types

The base pool object accepts some configuration options that may be vary based on pool implementations.

These are the current built-in pool types that can be added to a plan:

ThreadPool

In a thread pool, workers are started in separate threads and they pull tasks from the pool using a transport layer that lives in the same memory space. The workers are materializing the actual Tests, execute them and send results back to the main pool.

from testplan.runners.pools import ThreadPool

@test_plan(name='ThreadPoolPlan')
def main(plan):
    # Add a thread pool of 4 workers.
    pool = ThreadPool(name='MyPool', size=4)
    plan.add_resource(pool)

    # Schedule 10 tasks to the thread pool to execute them 4 in parallel.
    for idx in range(10):
        task = Task(target='make_multitest',
                    module='tasks')
        plan.schedule(task, resource='MyPool')

See a downloadable example of a thread pool.

ProcessPool

Similar to the thread pool, the worker interpreters are started in separate processes and communicate with the pool via ZMQ transport with TCP connection using localhost. During this communication process, the Tasks and TaskResults are being serialized and de-serialized so all they input arguments need to support that as well.

from testplan.runners.pools.process import ProcessPool

@test_plan(name='ProcessPoolPlan')
def main(plan):
    # A pool with 4 child process workers.
    pool = ProcessPool(name='MyPool', size=4)
    plan.add_resource(pool)

    # Schedule 10 tasks to the process pool to execute them 4 in parallel.
    for idx in range(10):
        # All Task arguments need to be serializable.
        task = Task(target='make_multitest',
                    module='tasks',
                    path='.')
        plan.schedule(task, resource='MyPool')

See a downloadable example of a process pool.

RemotePool

Remote pool is using ssh to start remote worker interpreters that are communicating with the local pool with the ZMQ transport as well. During this process, the local workspace will be transferred to the remote workers (if needed) and the workers will start local ‘thread’ or ‘process’ pools, based on their configuration.

from testplan.runners.pools.remote import RemotePool

@test_plan(name='RemotePoolPlan')
def main(plan):
    # A pool with 2 remote workers.
    # One with 2 local workers and the other with 1.
    pool = RemotePool(name='MyPool',
                      hosts={'hostname1': 2,
                             'hostname2': 1})
    plan.add_resource(pool)

    # Schedule 10 tasks to the remote pool to execute them 3 in parallel.
    for idx in range(10):
        # All Task arguments need to be serializable.
        task = Task(target='make_multitest',
                    module='tasks',
                    path='.')
        plan.schedule(task, resource='MyPool')

See a downloadable example of a remote pool.

Fault tolerance

There are some mechanisms enabled to prevent failures of Tests due to system failures and their behaviour is a part of pool configuration:

  1. Worker not responsive: Workers (excluding Thread workers) are sending heartbeat messages back to the pool and the frequency can be set using worker_heartbeat option. If worker fails to send a number of heartbeats (heartbeats_miss_limit option), all tasks assigned to the worker will be reassigned to the pool.
  2. Task retry: If a worker dies while running a task, testplan will restart the worker and retry the task (for 2 times max). Note that this retry behavior doesn’t have to do with the Task’s rerun setting.

MultiTest parts scheduling

A Task that returns a MultiTest can be scheduled in parts in one or more pools. Each MultiTest will have its own environment and will run a subtotal of testcases based on which part of the total number of parts it is. So each MultiTest part will produce its own report entry, these entries can be merged before exported.

To split a MultiTest task into several parts, we can provide a tuple of 2 elements as a parameter, the first element indicates the sequence number of part, and the second one is the number of parts in total. For the tuple (M, N), make sure that N > 1 and 0 <= M < N, where M and N are both integers.

from testplan.runners.pools import ThreadPool

@test_plan(name='ThreadPoolPlan', merge_scheduled_parts=False)
def main(plan):
    # Add a thread pool of 3 workers.
    # Also you can use process pool or remote pool instead.
    pool = ThreadPool(name='MyPool', size=3)
    plan.add_resource(pool)

    # Schedule 10 tasks to the thread pool.
    # A parameter `part_tuple` is provided to indicate which part it is.
    for idx in range(10):
        task = Task(target='make_multitest',
                    module='tasks',
                    kwargs={'part_tuple': (i, 10)})
        plan.schedule(task, resource='MyPool')

If you set merge_scheduled_parts=True, please be sure that all parts of a MultiTest will be executed, for example, if a MultiTest is split into 3 parts, then 3 tasks containing MultiTest part should be scheduled, with the parameter tuple (0, 3), (1, 3) and (2, 3) for each task, also note that a MultiTest can only be schedule once, or there will be error during merging reports.

See a downloadable example of MultiTest parts scheduling.

Task discover

For some projects, user may find task target definition (e.g the make_multitest function) and plan.schedule call become rather repetitive. To reduce boilerplate code, @task_target and plan.schedule_all are introduced to do task discovery.

plan.schedule_all(
    path=".",
    name_pattern=r".*tasks\.py$",
    resource="MyPool",
)

In the code above, testplan will go look for @task_target decorated functions in modules that matches the name_pattern under current working directory.

@task_target
def make_multitest():
    # A test target shall only return 1 runnable object
    test = MultiTest(name="MTest", suites=[Suite()])
    return test

Once found, task object will be created from the target, and scheduled to pool. It is possible to create multiple task objects out of one target with parameters specified:

@task_target(
    parameters=(
        # positional args to be passed to target, as a tuple or list
        ("MTest1", None, [SimpleSuite1, SimpleSuite2]),
        # keyword args to be passed to target, as a dict
        dict(
            name="MTest2-1",
            part_tuple=(0, 2),
            suites=[ComplicatedSuite],
        ),
        dict(
            name="MTest2-2",
            part_tuple=(1, 2),
            suites=[ComplicatedSuite],
        ),
    ),
    # additional arguments of Task class
    rerun=1,
    weight=1,
)
def make_multitest(name, part_tuple=None, suites=None):
    # A test target shall only return 1 runnable object
    test = MultiTest(
        name=name, suites=[cls() for cls in suites], part=part_tuple
    )
    return test

The code above specifies a collections of parameters in @task_target, and each entry will be used create one task - thus 3 tasks will be created from the target.

For a complete and downloadable example, see here.

Auto-Part and Smart Scheduling

This feature allows schedule_all() to optimize testplan overall execution time based on historical runtime data. It is enabled by providing runtime data like following via --runtime-data command line argument:

{
    "<Multitest>": {
        "execution_time": 199.99,
        "setup_time": 39.99,
    },
    ......
}

The optimization goal is to create just enough number of pool size and allow all tests to finish as soon as possible. This is achieved by 3 technics:

1. Auto-part: automatically slice multitests discovered from @task_target with multitest_parts="auto" argument into optimal number of parts subject to auto_part_runtime_limit - default to 30 minutes.

@task_target(multitest_parts="auto")
def make_multitest():
    # A test target shall only return 1 runnable object
    test = MultiTest(name="MTest", suites=[Suite()])
    return test

2. Weight-based scheduling: each multitest (part) will be associated with a weight value that represents historical runtime. Multitest (part) with larger weight will be scheduled with higher priority.

3. Auto-size: if the target pool is specified to have "auto" size, schedule_all() will calculate a right number of pool size so that all tests finishes within plan_runtime_target - default to 30 minutes.

# Enable smart-schedule pool size
pool = ProcessPool(name="MyPool", size="auto")

# Add a process pool test execution resource to the plan of given size.
plan.add_resource(pool)

# Discover tasks and calculate the right size of the pool based on the weight (runtime) of the
# tasks so that runtime of all tasks meets the plan_runtime_target.
plan.schedule_all(
    path=".",
    name_pattern=r".*task\.py$",
    resource="MyPool",
)

To tune the smart scheduling behavior, override auto_part_runtime_limit and plan_runtime_target default in @test_plan decorator.

For a complete and downloadable example, see here.