Pools

Pools are Test executors that instantiate an array of workers that pull and execute them in parallel. Test instances are generally not serializable so Task s are being scheduled to the pools instead.

_images/worker_pool.gif

Pools are resources that can be added in a plan using add_resource() method of the plan object.

@test_plan(name='PoolDemo')
def main(plan):
    ...
    pool = Pool(**options)
    plan.add_resource(pool)
    ...

Task / Result

Task

A Task is an object that holds the path to a target that can be materialized at runtime and executed by worker instances. For example, if a target function that creates a MultiTest is in file ./tasks.py:

# ./tasks.py

def make_multitest():
    test = MultiTest(name='TestName',
                     suites=[Testsuite1(), Testsuite2()],
                     environment=[Server(name='server'), ...])
    return test

the task that holds the information to materialize the MultiTest at runtime is the following:

# ./test_plan.py

task = Task(target='make_multitest',
            module='tasks',
            path=os.path.dirname(os.path.abspath(__file__)))  # same dir

The target function can accept arguments:

# ./tasks.py

def make_multitest(index):
    test = MultiTest(name='Test_{}'.format(index),
                     ...)
    return test

and many Test instances can be created from the same target function:

# ./test_plan.py

for idx in range(10):
    task = Task(target='make_multitest',
                module='tasks',
                path=os.path.dirname(os.path.abspath(__file__)),
                args=(idx,))  # or kwargs={'index': idx}

With argument rerun testplan can rerun the task up to user specified times unless it passes:

# ./test_plan.py

task = Task(target='make_multitest',
            module='tasks',
            path=os.path.dirname(os.path.abspath(__file__)),
            rerun=3)  # default value 0 means no rerun

A custom funtion can be used to determine if the task needs to run again, the default implementation is to check that task has been executed and the status of report is PASS.

# ./test_plan.py

pool = ThreadPool(name="MyPool", should_rerun=custom_func)
# can also set the custom func later
pool.set_rerun_check(custom_func)

TaskResult

A TaskResult is the object that is returned to the pool by the worker and contains either the actual result, or the error that prevented the execution.

plan.schedule

plan.schedule is used to schedule a Task to a Pool and once it’s scheduled and pool is started, it will be pulled and executed by a worker.

# add a pool to the plan
pool = Pool(name='PoolName', ...)
plan.add_resource(pool)

# schedule a task to the pool
task = Task(target='make_multitest', ...)
plan.schedule(task, resource='PoolName')

Pool types

The base pool object accepts some configuration options that may be vary based on pool implementations.

These are the current built-in pool types that can be added to a plan:

ThreadPool

In a thread pool, workers are started in separate threads and they pull tasks from the pool using a transport layer that lives in the same memory space. The workers are materializing the actual Tests, execute them and send results back to the main pool.

from testplan.runners.pools import ThreadPool

@test_plan(name='ThreadPoolPlan')
def main(plan):
    # Add a thread pool of 4 workers.
    pool = ThreadPool(name='MyPool', size=4)
    plan.add_resource(pool)

    # Schedule 10 tasks to the thread pool to execute them 4 in parallel.
    for idx in range(10):
        task = Task(target='make_multitest',
                    module='tasks')
        plan.schedule(task, resource='MyPool')

See a downloadable example of a thread pool.

ProcessPool

Similar to the thread pool, the worker interpreters are started in separate processes and communicate with the pool via ZMQ transport with TCP connection using localhost. During this communication process, the Tasks and TaskResults are being serialized and de-serialized so all they input arguments need to support that as well.

from testplan.runners.pools import ProcessPool

@test_plan(name='ProcessPoolPlan')
def main(plan):
    # A pool with 4 child process workers.
    pool = ProcessPool(name='MyPool', size=4)
    plan.add_resource(pool)

    # Schedule 10 tasks to the process pool to execute them 4 in parallel.
    for idx in range(10):
        # All Task arguments need to be serializable.
        task = Task(target='make_multitest',
                    module='tasks',
                    path='.')
        plan.schedule(task, resource='MyPool')

See a downloadable example of a process pool.

RemotePool

Remote pool is using ssh to start remote worker interpreters that are communicating with the local pool with the ZMQ transport as well. During this process, the local workspace will be transferred to the remote workers (if needed) and the workers will start local ‘thread’ or ‘process’ pools, based on their configuration.

from testplan.runners.pools import RemotePool

@test_plan(name='RemotePoolPlan')
def main(plan):
    # A pool with 2 remote workers.
    # One with 2 local workers and the other with 1.
    pool = RemotePool(name='MyPool',
                      hosts={'hostname1': 2,
                             'hostname2': 1})
    plan.add_resource(pool)

    # Schedule 10 tasks to the remote pool to execute them 3 in parallel.
    for idx in range(10):
        # All Task arguments need to be serializable.
        task = Task(target='make_multitest',
                    module='tasks',
                    path='.')
        plan.schedule(task, resource='MyPool')

See a downloadable example of a remote pool.

Fault tolerance

There are some mechanisms enabled to prevent failures of Tests due to system failures and their behaviour is a part of pool configuration:

  1. Worker not responsive: Workers (excluding Thread workers) are sending heartbeat messages back to the pool and the frequency can be set using worker_heartbeat option. If worker fails to send a number of heartbeats (heartbeats_miss_limit option), all tasks assigned to the worker will be reassigned to the pool.

  2. Task reassign limit: The maximum number of generally how many times a task can be scheduled to a worker can be configured using task_retries_limit option.

  3. Task reschedule: A user has the ability to set a custom callable to evaluate whether a task should be rescheduled (i.e failed due to a very rare system failure). In order to determine that, the callable accepts the pool object and the task_result which will contain the result report. The report may contain an error entry like out of memory or generally information that upon that the user may decide that the task should be rescheduled instead of its result to be used in the final plan report.

    def custom_reschedule(pool, task_result):
        # task_result.result -> TestResult instance
        # task_result.result.report -> TestReportInstance
        ...
        if ..should_reschedule..:
          return True
        return False
    
    # Instantiate a pool with custom configuration options.
    pool = ProcessPool(name=pool_name,
                       size=pool_size,
                       worker_heartbeat=2,
                       heartbeats_miss_limit=2)
    
    # Set custom reschedule callable logic.
    pool.set_reschedule_check(custom_reschedule)
    
    # Add the pool to the plan.
    pool_uid = plan.add_resource(pool)
    

MultiTest parts scheduling

A Task that returns a MultiTest can be scheduled in parts in one or more pools. Each MultiTest will have its own environment and will run a subtotal of testcases based on which part of the total number of parts it is. So each MultiTest part will produce its own report entry, these entries can be merged before exported.

To split a MultiTest task into several parts, we can provide a tuple of 2 elements as a parameter, the first element indicates the sequence number of part, and the second one is the number of parts in total. For the tuple (M, N), make sure that N > 1 and 0 <= M < N, where M and N are both integers.

from testplan.runners.pools import ThreadPool

@test_plan(name='ThreadPoolPlan', merge_scheduled_parts=False)
def main(plan):
    # Add a thread pool of 3 workers.
    # Also you can use process pool or remote pool instead.
    pool = ThreadPool(name='MyPool', size=3)
    plan.add_resource(pool)

    # Schedule 10 tasks to the thread pool.
    # A parameter `part_tuple` is provided to indicate which part it is.
    for idx in range(10):
        task = Task(target='make_multitest',
                    module='tasks',
                    kwargs={'part_tuple': (i, 10)})
        plan.schedule(task, resource='MyPool')

If you set merge_scheduled_parts=True, please be sure that all parts of a MultiTest will be executed, for example, if a MultiTest is split into 3 parts, then 3 tasks containing MultiTest part should be scheduled, with the parameter tuple (0, 3), (1, 3) and (2, 3) for each task, also note that a MultiTest can only be schedule once, or there will be error during merging reports.

See a downloadable example of MultiTest parts scheduling.