Execution Pools

Thread pool

Required files:

test_plan.py

#!/usr/bin/env python
"""
This example is to demonstrate parallel test execution in a thread pool.
"""

import sys

from testplan import test_plan
from testplan import Task
from testplan.parser import TestplanParser
from testplan.runners.pools import ThreadPool
from testplan.report.testing.styles import Style, StyleEnum


OUTPUT_STYLE = Style(StyleEnum.ASSERTION_DETAIL, StyleEnum.ASSERTION_DETAIL)


class CustomParser(TestplanParser):
    """Inheriting base parser."""

    def add_arguments(self, parser):
        """Defining custom arguments for this Testplan."""
        parser.add_argument(
            "--tasks-num",
            action="store",
            type=int,
            default=8,
            help="Number of tests to be scheduled.",
        )
        parser.add_argument(
            "--pool-size",
            action="store",
            type=int,
            default=4,
            help="How many thread workers assigned to pool.",
        )


# Using a custom parser to support `--tasks-num` and `--pool-size` command
# line arguments so that users can experiment with thread pool test execution.

# Hard-coding `pdf_path`, 'stdout_style' and 'pdf_style' so that the
# downloadable example gives meaningful and presentable output.
# NOTE: this programmatic arguments passing approach will cause Testplan
# to ignore any command line arguments related to that functionality.
@test_plan(
    name="ThreadPoolExecution",
    parser=CustomParser,
    pdf_path="report.pdf",
    stdout_style=OUTPUT_STYLE,
    pdf_style=OUTPUT_STYLE,
)
def main(plan):
    """
    Testplan decorated main function to add and execute MultiTests.

    :return: Testplan result object.
    :rtype:  ``testplan.base.TestplanResult``
    """
    # Add a thread pool test execution resource to the plan of given size.
    pool = ThreadPool(name="MyPool", size=plan.args.pool_size)
    plan.add_resource(pool)

    # Add a given number of similar tests to the thread pool
    # to be executed in parallel.
    for idx in range(plan.args.tasks_num):
        task = Task(
            target="make_multitest", module="tasks", kwargs={"index": idx}
        )
        plan.schedule(task, resource="MyPool")


if __name__ == "__main__":
    res = main()
    print("Exiting code: {}".format(res.exit_code))
    sys.exit(res.exit_code)

tasks.py

"""TCP connections tests to be executed in parallel in a thread pool."""

import threading
import time

from testplan.testing.multitest import MultiTest, testsuite, testcase

from testplan.common.utils.context import context
from testplan.testing.multitest.driver.tcp import TCPServer, TCPClient


def after_start(env):
    """
    Called right after MultiTest starts.
    """
    # Server accepts connection request made by client.
    env.server.accept_connection()


@testsuite
class TCPTestsuite(object):
    """TCP communication tests."""

    def __init__(self):
        self._thread_id = threading.current_thread().name

    @testcase
    def send_and_receive_msg(self, env, result):
        """
        Server client communication with a sleep in the middle that
        represents processing time by the server before respond.
        """
        # Client sends a message.
        msg = env.client.cfg.name
        result.log(
            "Client on thread {} is sending: {}".format(self._thread_id, msg)
        )
        bytes_sent = env.client.send_text(msg)
        received = env.server.receive_text(size=bytes_sent)
        result.equal(received, msg, "Server received")

        start_time = time.time()
        # Sleeping here to represent a time consuming processing
        # of the message received by the server before replying back.
        time.sleep(1)
        result.log(
            "Server was processing message for {}s".format(
                round(time.time() - start_time, 1)
            )
        )
        response = "Hello {}".format(received)

        result.log(
            "Server on thread {} is responding: {}".format(
                self._thread_id, response
            )
        )
        # Server sends the reply.
        bytes_sent = env.server.send_text(response)
        received = env.client.receive_text(size=bytes_sent)
        result.equal(received, response, "Client received")


def make_multitest(index=0):
    """
    Creates a new MultiTest that runs TCP connection tests.
    This will be created inside a thread worker.
    """
    print(
        "Creating a MultiTest on {}.".format(threading.current_thread().name)
    )
    test = MultiTest(
        name="TCPMultiTest_{}".format(index),
        suites=[TCPTestsuite()],
        environment=[
            TCPServer(name="server"),
            TCPClient(
                name="client",
                host=context("server", "{{host}}"),
                port=context("server", "{{port}}"),
            ),
        ],
        after_start=after_start,
    )
    return test

Process pool

Required files:

test_plan.py

#!/usr/bin/env python
"""
This example is to demonstrate parallel test execution in a process pool.
"""

import sys

from testplan import test_plan
from testplan import Task
from testplan.runners.pools import ProcessPool

from testplan.parser import TestplanParser
from testplan.report.testing.styles import Style, StyleEnum

OUTPUT_STYLE = Style(StyleEnum.ASSERTION_DETAIL, StyleEnum.ASSERTION_DETAIL)


class CustomParser(TestplanParser):
    """Inheriting base parser."""

    def add_arguments(self, parser):
        """Defining custom arguments for this Testplan."""
        parser.add_argument("--tasks-num", action="store", type=int, default=8)
        parser.add_argument("--pool-size", action="store", type=int, default=4)


# Using a custom parser to support `--tasks-num` and `--pool-size` command
# line arguments so that users can experiment with process pool test execution.

# Hard-coding `pdf_path`, 'stdout_style' and 'pdf_style' so that the
# downloadable example gives meaningful and presentable output.
# NOTE: this programmatic arguments passing approach will cause Testplan
# to ignore any command line arguments related to that functionality.
@test_plan(
    name="ProcessPoolExecution",
    parser=CustomParser,
    pdf_path="report.pdf",
    stdout_style=OUTPUT_STYLE,
    pdf_style=OUTPUT_STYLE,
)
def main(plan):
    """
    Testplan decorated main function to add and execute MultiTests.

    :return: Testplan result object.
    :rtype:  ``testplan.base.TestplanResult``
    """
    # Add a process pool test execution resource to the plan of given size.
    pool = ProcessPool(name="MyPool", size=plan.args.pool_size)
    plan.add_resource(pool)

    # Add a given number of similar tests to the process pool
    # to be executed in parallel.
    for idx in range(plan.args.tasks_num):
        # All Task arguments need to be serializable.
        task = Task(
            target="make_multitest", module="tasks", kwargs={"index": idx}
        )
        plan.schedule(task, resource="MyPool")


if __name__ == "__main__":
    res = main()
    print("Exiting code: {}".format(res.exit_code))
    sys.exit(res.exit_code)

tasks.py

"""TCP connections tests to be executed in parallel in a process pool."""

import os
import time

from testplan.testing.multitest import MultiTest, testsuite, testcase

from testplan.common.utils.context import context
from testplan.testing.multitest.driver.tcp import TCPServer, TCPClient


def after_start(env):
    """
    Called right after MultiTest starts.
    """
    # Server accepts connection request made by client.
    env.server.accept_connection()


@testsuite
class TCPTestsuite(object):
    """TCP communication tests."""

    def __init__(self):
        self._process_id = os.getpid()

    @testcase
    def send_and_receive_msg(self, env, result):
        """
        Server client communication with a sleep in the middle that
        represents processing time by the server before respond.
        """
        # Client sends a message.
        msg = env.client.cfg.name
        result.log(
            "Client with process id {} is sending: {}".format(
                self._process_id, msg
            )
        )
        bytes_sent = env.client.send_text(msg)
        received = env.server.receive_text(size=bytes_sent)
        result.equal(received, msg, "Server received")

        start_time = time.time()
        # Sleeping here to represent a time consuming processing
        # of the message received by the server before replying back.
        time.sleep(1)
        result.log(
            "Server was processing message for {}s".format(
                round(time.time() - start_time, 1)
            )
        )
        response = "Hello {}".format(received)

        result.log(
            "Server with process id {} is responding: {}".format(
                self._process_id, response
            )
        )
        # Server sends the reply.
        bytes_sent = env.server.send_text(response)
        received = env.client.receive_text(size=bytes_sent)
        result.equal(received, response, "Client received")


def make_multitest(index=0):
    """
    Creates a new MultiTest that runs TCP connection tests.
    This will be created inside a process worker.
    """
    print("Creating a MultiTest on process id {}.".format(os.getpid()))
    test = MultiTest(
        name="TCPMultiTest_{}".format(index),
        suites=[TCPTestsuite()],
        environment=[
            TCPServer(name="server"),
            TCPClient(
                name="client",
                host=context("server", "{{host}}"),
                port=context("server", "{{port}}"),
            ),
        ],
        after_start=after_start,
    )
    return test

Remote pool

Required files:

test_plan.py

#!/usr/bin/env python
# This plan contains tests that demonstrate failures as well.
"""
Parallel test execution in a remote pool.
"""

import os
import sys
import socket
import getpass
import shutil
import tempfile

from testplan import test_plan
from testplan import Task
from testplan.runners.pools import RemotePool

from testplan.common.utils.path import module_abspath, pwd

from testplan.parser import TestplanParser
from testplan.report.testing.styles import Style, StyleEnum

OUTPUT_STYLE = Style(StyleEnum.ASSERTION_DETAIL, StyleEnum.ASSERTION_DETAIL)
TEMP_DIR = None


class CustomParser(TestplanParser):
    """Inheriting base parser."""

    def add_arguments(self, parser):
        """Defining custom arguments for this Testplan."""
        parser.add_argument("--tasks-num", action="store", type=int, default=8)
        parser.add_argument("--pool-size", action="store", type=int, default=4)


# Function that creates a file with some content
# to demonstrate custom file transferring.
def make_file(filename, dirname, content):
    path = os.path.join(dirname, filename)
    with open(path, "w") as fobj:
        fobj.write(content)
    return path


# Using a custom parser to support `--tasks-num` and `--pool-size` command
# line arguments so that users can experiment with remote pool test execution.

# Hard-coding `pdf_path`, 'stdout_style' and 'pdf_style' so that the
# downloadable example gives meaningful and presentable output.
# NOTE: this programmatic arguments passing approach will cause Testplan
# to ignore any command line arguments related to that functionality.
@test_plan(
    name="RemotePoolExecution",
    parser=CustomParser,
    pdf_path=os.path.join(pwd(), "report.pdf"),
    stdout_style=OUTPUT_STYLE,
    pdf_style=OUTPUT_STYLE,
)
def main(plan):
    """
    Testplan decorated main function to add and execute MultiTests.

    :return: Testplan result object.
    :rtype:  ``testplan.base.TestplanResult``
    """

    workspace = os.path.dirname(__file__)

    # Create two temporary files locally. For demonstration, just write the
    # filename as the content of each.
    assert TEMP_DIR is not None
    for filename in ("file1", "file2"):
        make_file(filename, TEMP_DIR, content=filename)

    # Explicitly specify the full paths to both the local source files just
    # created and the destination filepaths on the remote host.
    push_files = [
        (os.path.join(TEMP_DIR, "file1"), "/tmp/remote_example/file1"),
        (os.path.join(TEMP_DIR, "file2"), "/tmp/remote_example/file2"),
    ]

    # Check if the remote host has been specified in the environment. Remote
    # hosts can only be Linux systems. If none is specified when running on a
    # Linux system we can default to using the localhost as our "remote"
    # worker. Whichever remote host is used must be configured to accept SSH
    # connections from the localhost.
    remote_host = os.environ.get("TESTPLAN_REMOTE_HOST")
    if not remote_host:
        if os.name == "posix":
            remote_host = socket.gethostname()
        else:
            raise RuntimeError(
                "You must specify a remote host via the TESTPLAN_REMOTE_HOST "
                "environment var on non-Linux systems."
            )

    # Add a remote pool test execution resource to the plan of given size.
    pool = RemotePool(
        name="MyPool",
        # Create 3 workers on the same remote host.
        hosts={remote_host: 3},
        # Allow the remote port to be overridden by the
        # environment. Default to 0, which will make testplan use
        # the default SSH port for connections.
        port=int(os.environ.get("TESTPLAN_REMOTE_PORT", 0)),
        setup_script=["/bin/bash", "setup_script.ksh"],
        env={"LOCAL_USER": getpass.getuser(), "LOCAL_WORKSPACE": workspace},
        workspace_exclude=[".git/", ".cache/", "doc/", "test/"],
        # We push local files to the remote worker using the
        # explicit source and destination locations defined above.
        push=push_files,
        workspace=workspace,
    )

    plan.add_resource(pool)

    # Add a given number of similar tests to the remote pool
    # to be executed in parallel.
    for idx in range(plan.args.tasks_num):
        # All Task arguments need to be serializable.
        task = Task(
            target="make_multitest",
            module="tasks",
            # We specify the full paths to files as they will be found
            # on the remote host.
            kwargs={
                "index": idx,
                "files": [
                    "/tmp/remote_example/file1",
                    "/tmp/remote_example/file2",
                ],
            },
        )
        plan.schedule(task, resource="MyPool")


if __name__ == "__main__":
    # Create a new temporary directory for this test plan.
    TEMP_DIR = tempfile.mkdtemp()

    # Run the test plan.
    res = main()

    # Clean up all the temporary files used by this test plan.
    shutil.rmtree(TEMP_DIR)

    print("Exiting code: {}".format(res.exit_code))
    sys.exit(res.exit_code)

tasks.py

"""TCP connections tests to be executed in parallel in a remote pool."""

import os
import time

from testplan.testing.multitest import MultiTest, testsuite, testcase

from testplan.common.utils.context import context
from testplan.testing.multitest.driver.tcp import TCPServer, TCPClient


def after_start(env):
    """
    Called right after MultiTest starts.
    """
    # Server accepts connection request made by client.
    env.server.accept_connection()


@testsuite
class TCPTestsuite(object):
    """TCP communication tests."""

    def __init__(self, files):
        self._process_id = os.getpid()
        self._files = files

    def setup(self, env, result):
        result.log("LOCAL_USER: {}".format(os.environ["LOCAL_USER"]))

        for _file in self._files:
            with open(_file) as fobj:
                result.log("Source file contents: {}".format(fobj.read()))

    @testcase
    def send_and_receive_msg(self, env, result):
        """
        Server client communication with a sleep in the middle that
        represents processing time by the server before respond.
        """
        # Client sends a message.
        msg = env.client.cfg.name
        result.log(
            "Client with process id {} is sending: {}".format(
                self._process_id, msg
            )
        )
        bytes_sent = env.client.send_text(msg)
        received = env.server.receive_text(size=bytes_sent)
        result.equal(received, msg, "Server received")

        start_time = time.time()
        # Sleeping here to represent a time consuming processing
        # of the message received by the server before replying back.
        time.sleep(1)
        result.log(
            "Server was processing message for {}s".format(
                round(time.time() - start_time, 1)
            )
        )
        response = "Hello {}".format(received)

        result.log(
            "Server with process id {} is responding: {}".format(
                self._process_id, response
            )
        )
        # Server sends the reply.
        bytes_sent = env.server.send_text(response)
        received = env.client.receive_text(size=bytes_sent)
        result.equal(received, response, "Client received")


def make_multitest(index=0, files=None):
    """
    Creates a new MultiTest that runs TCP connection tests.
    This will be created inside a remote worker.
    """
    print("Creating a MultiTest on process id {}.".format(os.getpid()))
    test = MultiTest(
        name="TCPMultiTest_{}".format(index),
        suites=[TCPTestsuite(files)],
        environment=[
            TCPServer(name="server"),
            TCPClient(
                name="client",
                host=context("server", "{{host}}"),
                port=context("server", "{{port}}"),
            ),
        ],
        after_start=after_start,
    )
    return test

setup_script.ksh

#!/bin/bash
echo 'Executing setup commands.'

echo 'Environment:'
env

echo 'User:'
echo $USER

echo 'Hostname:'
hostname

# Make a soft link in remote host to make the local workspace
# absolute path available for hardcoded entries.
if [ ! -d $TESTPLAN_LOCAL_WORKSPACE ];
then
    mkdir -p `dirname $TESTPLAN_LOCAL_WORKSPACE`
    ln -s $TESTPLAN_REMOTE_WORKSPACE $TESTPLAN_LOCAL_WORKSPACE
else
    echo 'Local workspace is visible from the remote!'
fi

echo 'Finished setup commands.'
exit 0

Task Rerun

Required files:

test_plan.py

#!/usr/bin/env python
"""
This example is to demonstrate task level rerun feature in a pool.
"""

import os
import sys
import uuid
import getpass
import tempfile

from testplan import test_plan
from testplan import Task
from testplan.runners.pools import ThreadPool

from testplan.parser import TestplanParser
from testplan.report.testing.styles import Style, StyleEnum

OUTPUT_STYLE = Style(StyleEnum.ASSERTION_DETAIL, StyleEnum.ASSERTION_DETAIL)


class CustomParser(TestplanParser):
    """Inheriting base parser."""

    def add_arguments(self, parser):
        """Defining custom arguments for this Testplan."""
        parser.add_argument("--pool-size", action="store", type=int, default=4)


# Using a custom parser to support `--tasks-num` and `--pool-size` command
# line arguments so that users can experiment with process pool test execution.

# Hard-coding `pdf_path`, 'stdout_style' and 'pdf_style' so that the
# downloadable example gives meaningful and presentable output.
# NOTE: this programmatic arguments passing approach will cause Testplan
# to ignore any command line arguments related to that functionality.
@test_plan(
    name="PoolExecutionAndTaskRerun",
    parser=CustomParser,
    pdf_path="report.pdf",
    stdout_style=OUTPUT_STYLE,
    pdf_style=OUTPUT_STYLE,
)
def main(plan):
    """
    Testplan decorated main function to add and execute MultiTests.

    :return: Testplan result object.
    :rtype:  ``testplan.base.TestplanResult``
    """
    # Add a thread pool test execution resource to the plan of given size.
    # Can also use a process pool instead.
    pool = ThreadPool(name="MyPool", size=plan.args.pool_size)
    plan.add_resource(pool)

    # Add a task with `rerun` argument to the thread pool
    tmp_file = os.path.join(
        tempfile.gettempdir(), getpass.getuser(), "{}.tmp".format(uuid.uuid4())
    )
    task = Task(
        target="make_multitest", module="tasks", args=(tmp_file,), rerun=2
    )
    plan.schedule(task, resource="MyPool")


if __name__ == "__main__":
    res = main()
    print("Exiting code: {}".format(res.exit_code))
    sys.exit(res.exit_code)

tasks.py

"""An unstable tests to be executed until pass."""

import os

from testplan.testing.multitest import MultiTest, testsuite, testcase
from testplan.common.utils.path import makedirs


@testsuite
class Unstablesuite(object):
    """
    A test suite which has an unstable testcase.
    The multitest containing this suite has to re-run twice
    (3 times in total) in order to get passed.
    """

    def __init__(self, tmp_file):
        self._iteration = None
        self._max_rerun = 2
        self._tmp_file = tmp_file

    def setup(self, env, result):
        """
        Create a tmp text file which record how many times the suite
        has been executed, should remove it after the last rerun.
        """
        makedirs(os.path.dirname(self._tmp_file))
        if not os.path.exists(self._tmp_file):
            self._iteration = 0
        else:
            with open(self._tmp_file, "r") as fp:
                self._iteration = int(fp.read())

        if self._iteration == self._max_rerun:
            os.remove(self._tmp_file)
        else:
            with open(self._tmp_file, "w") as fp:
                fp.write(str(self._iteration + 1))

        result.log("Suite setup in iteration {}".format(self._iteration))

    @testcase
    def unstable_testcase(self, env, result):
        """
        An unstable testcase which can only pass at 3rd run (2nd rerun).
        """
        if self._iteration == 2:
            result.log("Test passes")
        else:
            result.fail("Test fails")


def make_multitest(tmp_file):
    """
    Creates a new MultiTest that runs unstable tests.
    """
    return MultiTest(
        name="UnstableMultiTest",
        suites=[Unstablesuite(tmp_file=tmp_file)],
        environment=[],
    )

MultiTest parts scheduling

Required files:

test_plan.py

#!/usr/bin/env python
"""
This example is to demonstrate parallel test execution with parts in a pool.
"""

import sys

from testplan import test_plan
from testplan import Task
from testplan.parser import TestplanParser
from testplan.runners.pools import ThreadPool
from testplan.report.testing.styles import Style, StyleEnum

OUTPUT_STYLE = Style(StyleEnum.ASSERTION_DETAIL, StyleEnum.ASSERTION_DETAIL)


class CustomParser(TestplanParser):
    """Inheriting base parser."""

    def add_arguments(self, parser):
        """Defining custom arguments for this Testplan."""
        parser.add_argument(
            "--parts-num",
            action="store",
            type=int,
            default=3,
            help="Number of parts to be split.",
        )
        parser.add_argument(
            "--pool-size",
            action="store",
            type=int,
            default=3,
            help="How many thread workers assigned to pool.",
        )


# Using a custom parser to support `--tasks-num` and `--pool-size` command
# line arguments so that users can experiment with thread pool test execution.

# Hard-coding `pdf_path`, 'stdout_style' and 'pdf_style' so that the
# downloadable example gives meaningful and presentable output.
# NOTE: this programmatic arguments passing approach will cause Testplan
# to ignore any command line arguments related to that functionality.
@test_plan(
    name="MultiTestPartsExecution",
    parser=CustomParser,
    pdf_path="report.pdf",
    stdout_style=OUTPUT_STYLE,
    pdf_style=OUTPUT_STYLE,
    merge_scheduled_parts=False,
)
def main(plan):
    """
    Testplan decorated main function to add and execute MultiTests.

    :return: Testplan result object.
    :rtype:  ``testplan.base.TestplanResult``
    """
    # Add a thread pool test execution resource to the plan of given size.
    # Also you can use process pool or remote pool instead.
    pool = ThreadPool(name="MyPool", size=plan.args.pool_size)
    plan.add_resource(pool)

    # Add a given number of similar tests to the thread pool
    # to be executed in parallel.
    for idx in range(plan.args.parts_num):
        task = Task(
            target="make_multitest",
            module="tasks",
            kwargs={"part_tuple": (idx, plan.args.parts_num)},
        )
        plan.schedule(task, resource="MyPool")


if __name__ == "__main__":
    res = main()
    print("Exiting code: {}".format(res.exit_code))
    sys.exit(res.exit_code)

tasks.py

"""Tests to be executed in multi-parts and task will be schedule in a pool."""

from testplan.testing.multitest import MultiTest, testsuite, testcase


@testsuite
class Suite1(object):
    """A test suite with several normal testcases."""

    @testcase
    def test_equal(self, env, result):
        result.equal("foo", "foo", description="Equality example")

    @testcase
    def test_not_equal(self, env, result):
        result.not_equal("foo", "bar", description="Inequality example")

    @testcase
    def test_less(self, env, result):
        result.less(2, 12, description="Less comparison example")

    @testcase
    def test_greater(self, env, result):
        result.greater(10, 5, description="Greater comparison example")

    @testcase
    def test_approximate_equal(self, env, result):
        result.isclose(
            100,
            101,
            rel_tol=0.01,
            abs_tol=0.0,
            description="Approximate equality example",
        )


@testsuite
class Suite2(object):
    """A test suite with parameterized testcases."""

    @testcase(parameters=tuple(range(6)))
    def test_bool(self, env, result, val):
        if val > 0:
            result.true(val, description="Check if value is true")
        else:
            result.false(val, description="Check if value is false")


def make_multitest(part_tuple=None):
    test = MultiTest(
        name="MultitestParts", suites=[Suite1(), Suite2()], part=part_tuple
    )
    return test