Custom Application

FXConverter

Required files:

test_plan.py

#!/usr/bin/env python
"""
This example is to test the converter.py binary application with the following
functionality:

    1. Connects to an external service that provides FX exchange rates.
    2. Accepts requests from clients i.e: 1000 GBP to be converted to EUR
    3. Requests the currency rate from the FX rates service. i.e: 1.15
    4. Responds to the client with the converted amount: i.e: 1150

    (Requests)        GBP:EUR:1000                 GBP:EUR
    ------------------          -----------------          ------------------
    |                | -------> |  converter.py | -------> | Exchange rates |
    |     Client     |          |       -       |          |    Server      |
    |                | <------- |  Application  | <------- |     Mock       |
    ------------------          -----------------          ------------------
    (Responses)          1150                       1.15
"""

import os
import re
import sys

from driver import FXConverter
from suites import EdgeCases, ConversionTests, RestartEvent
from testplan.testing.multitest import MultiTest

from testplan import test_plan
from testplan.common.utils.context import context
from testplan.report.testing.styles import Style, StyleEnum
from testplan.testing.multitest.driver.tcp import TCPServer, TCPClient

OUTPUT_STYLE = Style(StyleEnum.ASSERTION_DETAIL, StyleEnum.ASSERTION_DETAIL)


def after_start(env):
    """
    Called right after MultiTest starts.
    """
    # Server accepts connection request by convert app.
    env.server.accept_connection()


def before_stop(env, result):
    """
    Called right before MultiTest stops.
    """
    # Clients sends stop command to the converted app.
    env.client.send(bytes("Stop".encode("utf-8")))


def converter_environment():
    """
    MultiTest environment that will be made available within the testcases.
    """
    # Server that will respond with FX exchange rates.
    server = TCPServer(name="server")

    # Converter application that accepts configuration template that
    # after install process it will contain the host/port information of
    # the 'server' to connect to.
    # It also reads from the output file the address that the converter
    # listens for incoming connections and makes host/port info available
    # through its context so that the client can connect as well.
    converter_name = "converter"
    config = "converter.cfg"
    regexps = [
        re.compile(r"Converter started."),
        re.compile(r".*Listener on: (?P<listen_address>.*)"),
    ]
    converter = FXConverter(
        name=converter_name,
        pre_args=[sys.executable],
        binary=os.path.join(
            os.path.dirname(os.path.abspath(__file__)), "converter.py"
        ),
        args=[config],
        install_files=[
            os.path.join(os.path.dirname(os.path.abspath(__file__)), config)
        ],
        log_regexps=regexps,
    )

    # Client that connects to the converted application using host/port
    # information that FXConverter driver made available through its context.
    client = TCPClient(
        name="client",
        host=context(converter_name, "{{host}}"),
        port=context(converter_name, "{{port}}"),
    )

    return [server, converter, client]


# Hard-coding `pdf_path`, 'stdout_style' and 'pdf_style' so that the
# downloadable example gives meaningful and presentable output.
# NOTE: this programmatic arguments passing approach will cause Testplan
# to ignore any command line arguments related to that functionality.
@test_plan(
    name="FXConverter",
    pdf_path="report.pdf",
    stdout_style=OUTPUT_STYLE,
    pdf_style=OUTPUT_STYLE,
)
def main(plan):
    """
    Testplan decorated main function to add and execute MultiTests.

    :return: Testplan result object.
    :rtype:  ``testplan.base.TestplanResult``
    """
    test = MultiTest(
        name="TestFXConverter",
        suites=[ConversionTests(), EdgeCases(), RestartEvent()],
        environment=converter_environment(),
        after_start=after_start,
        before_stop=before_stop,
    )
    plan.add(test)


if __name__ == "__main__":
    res = main()
    print("Exiting code: {}".format(res.exit_code))
    sys.exit(res.exit_code)

suites.py

"""FX conversion tests."""

import os

from testplan.testing.multitest import testsuite, testcase


def msg_to_bytes(msg, standard="utf-8"):
    """Encode text to bytes."""
    return bytes(msg.encode(standard))


def bytes_to_msg(seq, standard="utf-8"):
    """Decode bytes to text."""
    return seq.decode(standard)


def custom_docstring_func(_, kwargs):
    """
    Return original docstring (if available) and
    parametrization arguments in the format ``key: value``.
    """
    kwargs_strings = [
        "{}: {}".format(arg_name, arg_value)
        for arg_name, arg_value in kwargs.items()
    ]
    return os.linesep.join(kwargs_strings)


@testsuite
class ConversionTests(object):
    """Sample currency conversion operations."""

    def __init__(self):
        self._rates = {"EUR": {"GBP": "0.90000"}, "GBP": {"EUR": "1.10000"}}

    @testcase(
        parameters=(
            ("EUR:GBP:1000", "900"),
            ("GBP:EUR:1000", "1100"),
            ("EUR:GBP:1500", "1350"),
            ("GBP:EUR:1500", "1650"),
        ),
        docstring_func=custom_docstring_func,
    )
    def conversion_parameterized(self, env, result, request, expect):
        """
        Client sends a request to the currency converter app.
        The converter retrieves the conversion rate from the downstream
        and sends back the converted result to the client.
        """
        env.client.send(msg_to_bytes(request))

        # App requests rates from server
        received = bytes_to_msg(env.server.receive(size=7))
        result.equal(request[:7], received, "Downstream receives rate query.")
        source, target = received.split(":")
        rate = self._rates[source][target]
        result.log("Downstream sends rate: {}".format(rate))
        env.server.send(msg_to_bytes(rate))

        # Client receives response.
        result.equal(
            int(expect),
            int(bytes_to_msg(env.client.receive(size=1024))),
            "Client received converted value.",
        )


@testsuite
class EdgeCases(object):
    """Suite containing edge case scenarios."""

    @testcase
    def same_currency(self, env, result):
        """
        Client requests conversion to the same currency.
        No downstream is involved as no rate is needed.
        """
        request = "EUR:EUR:2000"
        expect = "2000"
        result.log("Client request: {}".format(request))
        env.client.send(msg_to_bytes(request))

        # Client receives response.
        result.equal(
            int(expect),
            int(bytes_to_msg(env.client.receive(size=1024))),
            "Client received converted value.",
        )

    @testcase
    def zero_amount(self, env, result):
        """
        Client requests conversion of 0 amount.
        No downstream is involved as no rate is needed.
        """
        request = "EUR:GBP:0"
        expect = "0"
        result.log("Client request: {}".format(request))
        env.client.send(msg_to_bytes(request))

        # Client receives response.
        result.equal(
            int(expect),
            int(bytes_to_msg(env.client.receive(size=1024))),
            "Client received converted value.",
        )

    @testcase(
        parameters=(
            "GBP.EUR.1000",
            "EUR::GBP:500",
            "GBP:EURO:1000",
            "GBP:EUR:ABC",
        ),
        docstring_func=custom_docstring_func,
    )
    def invalid_requests(self, env, result, request):
        """
        Client sends a request with incorrect format.
        Requests are matched by [A-Z]{3}:[A-Z]{3}:[0-9]+ regex.
        """
        env.client.send(msg_to_bytes(request))

        # Client receives response.
        result.contain(
            "Invalid request format",
            bytes_to_msg(env.client.receive(size=1024)),
            "Invalid request error received.",
        )


@testsuite
class RestartEvent(object):
    """Converter app restart and reconnect scenarios."""

    def _send_and_receive(self, env, result, request, rate, expect):
        """
        Client sends a request to the currency converter app.
        The converter retrieves the conversion rate from the downstream
        and sends back the converted result to the client.
        """
        result.log("Client sends request: {}".format(request))
        env.client.send(msg_to_bytes(request))

        # App requests rates from server
        received = bytes_to_msg(env.server.receive(size=7))
        result.equal(request[:7], received, "Downstream receives rate query.")
        result.log("Downstream sends rate: {}".format(rate))
        env.server.send(msg_to_bytes(rate))

        # Client receives response.
        result.equal(
            int(expect),
            int(bytes_to_msg(env.client.receive(size=1024))),
            "Client received converted value.",
        )

    def _restart_components(self, env, result):
        """
        Restart converter app.
        Accept new connection from rate sending server.
        Restart client driver to connect to new host:port.
        """
        result.log("Restarting converter app.")
        env.converter.restart()
        result.log(
            "App is now listening on {}:{}".format(
                env.converter.host, env.converter.port
            )
        )
        env.server.accept_connection()
        env.client.restart()

    @testcase
    def restart_app(self, env, result):
        """
        Restarting converter app and reconnect with rate
        server and client components before doing new requests.
        """
        result.log(
            "App is listening on {}:{}".format(
                env.converter.host, env.converter.port
            )
        )
        self._send_and_receive(env, result, "EUR:GBP:1000", "0.8500", "850")
        self._restart_components(env, result)
        self._send_and_receive(env, result, "EUR:GBP:1000", "0.8700", "870")
        self._restart_components(env, result)
        self._send_and_receive(env, result, "EUR:GBP:2000", "0.8300", "1660")

converter.cfg

[Listener]
Host: localhost
Port: 0

[Downstream]
Host: {{context['server'].host}}
Port: {{context['server'].port}}

converter.py

"""
Converter application that:

    1. Connects to an external service that provides FX exchange rates.
    2. Accepts requests from clients i.e: 1000 GBP to be converted to EUR
    3. Requests the currency rate from the FX rates service. i.e: 1.15
    4. Responds to the client with the converted amount: i.e: 1150
"""

import os
import re
import sys
import socket
import logging

from six.moves.configparser import ConfigParser


logging.basicConfig(stream=sys.stdout, format="%(message)s")


class FXConverter(object):
    """FXConverter class that accepts a config file."""

    def __init__(self, config_file):
        self._logger = logging.getLogger()
        self._logger.setLevel(logging.INFO)
        self._config = self.load_config(
            os.path.join(os.getcwd(), "etc", config_file)
        )
        self._server = None

    def load_config(self, filename):
        """
        Reads from the config file the host/port information of the downstream
        service and also the host/port to bind and listen for clients.
        """
        config = ConfigParser()
        config.read(filename)

        self._logger.info("Configuration read:")
        for section in ("Listener", "Downstream"):
            self._logger.info(section)
            self._logger.info("\tHost: {}".format(config.get(section, "Host")))
            self._logger.info("\tPort: {}".format(config.get(section, "Port")))
        return config

    def _server_init(self):
        self._server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self._server.bind(
            (
                self._config.get("Listener", "Host"),
                int(self._config.get("Listener", "Port")),
            )
        )
        self._server.listen(10)
        return self._server.getsockname()

    def _validate_request(self, msg):
        if not re.match(r"[A-Z]{3}:[A-Z]{3}:[0-9]+", msg):
            raise ValueError("Invalid request format ({}).".format(msg))

    def _process_request(self, downstream_cli, msg):
        value = float(msg[8:])  # i.e 1000
        if value == 0:
            return "0"
        currencies = msg[:7]  # i.e EUR:GBP
        source, target = currencies.split(":")
        if source == target:
            rate = 1
        else:
            downstream_cli.sendall(bytes(currencies.encode("utf-8")))
            rate = float(downstream_cli.recv(1024).decode("utf-8"))
        result = str(int(rate * value))
        self._logger.info(
            "Request result for {} with rate {}: {}".format(
                msg[8:], rate, result
            )
        )
        return result

    def _loop(self, upstream_conn, downstream_cli, downstream_addr):
        while True:
            msg = str(upstream_conn.recv(1024).decode("utf-8"))
            self._logger.info("Client msg: {}".format(msg))
            if msg == "Stop":
                self._server.close()
                self._logger.info("Converter stopped.")
                break
            else:
                try:
                    self._validate_request(msg)
                except Exception as exc:
                    upstream_conn.sendall(bytes(str(exc).encode("utf-8")))
                    continue
                else:
                    self._logger.info(
                        "Propagating query {} to {}".format(
                            msg, downstream_addr
                        )
                    )

                result = self._process_request(downstream_cli, msg)
                upstream_conn.sendall(bytes(result.encode("utf-8")))

    def loop(self):
        """
        Starts the application.

            1. Connect to downstream server with FX exchange rates.
            2. Accepts client connection.
            3. Start the loop to handle client requests.
        """
        host, port = self._server_init()
        self._logger.info("Listener on: {}:{}".format(host, port))
        downstream_addr = (
            self._config.get("Downstream", "Host"),
            int(self._config.get("Downstream", "Port")),
        )
        self._logger.info(
            "Connected to downstream: {}:{}".format(
                downstream_addr[0], downstream_addr[1]
            )
        )
        downstream_cli = socket.create_connection(downstream_addr)
        self._logger.info("Converter started.")
        upstream_conn, client_address = self._server.accept()
        self._logger.info("Client connected: {}".format(client_address))
        self._loop(upstream_conn, downstream_cli, downstream_addr)


if __name__ == "__main__":
    sys.stderr.flush()
    _, config_file = sys.argv
    converter = FXConverter(config_file)
    converter.loop()

driver.py

"""Driver for converter.py application binary."""

import re

from testplan.testing.multitest.driver.app import App


class FXConverter(App):
    """
    Inherits the generic ``testplan.testing.multitest.driver.app.App`` driver
    and expose host/port values read from log extracts.
    """

    def __init__(self, **options):
        super(FXConverter, self).__init__(**options)
        self.host = None
        self.port = None

    def started_check(self, timeout=None):
        """
        Checks that binary started and input regex conditions matching, while
        also storing host/port information to be made available in its context
        so that client driver can connect to it.
        """
        super(FXConverter, self).started_check(timeout=timeout)
        self.host, self.port = re.split(":", self.extracts["listen_address"])

PDF report

Sample detailed PDF report.

../_images/fxconverter_example.png