App Driver¶
Basic Usage¶
- Required files:
test_plan.py¶
#!/usr/bin/env python
"""
Example demonstrating usage of App driver to start an arbitrary binary as subprocess
- /bin/echo in this case - and checks its stdout output and return code.
"""
import sys, re
from testplan import test_plan
from testplan.testing.multitest import MultiTest, testsuite, testcase
from testplan.testing.multitest.driver.app import App
from testplan.common.utils.match import LogMatcher
@testsuite
class MyTestsuite:
"""
A testsuite that uses helper utilities in setup/teardown.
"""
@testcase
def my_testcase(self, env, result):
matcher = LogMatcher(log_path=env.echo.outpath)
matched = matcher.match(re.compile(r"testplan"))
result.true(matched, description="testplan in stdout")
def after_stop_fn(env, result):
result.equal(env.echo.retcode, 0, description="echo exit with 0")
@test_plan(name="App driver example")
def main(plan):
"""
A simple example that demonstrate App driver usage. App prints 'testplan' to
standard output on startup and then waits for a user input simulating a long running app.
"""
plan.add(
MultiTest(
name="TestEcho",
suites=[MyTestsuite()],
environment=[
App(
"echo",
binary="/bin/echo",
args=["testplan"],
stdout_regexps=[
re.compile(r"testplan")
], # argument inherited from Driver class
)
],
after_stop=after_stop_fn,
)
)
if __name__ == "__main__":
sys.exit(main().exit_code)
Start Drivers Manually¶
- Required files:
test_plan.py¶
#!/usr/bin/env python
"""
Example demonstrating usage of App driver to manually start it and stop it.
"""
import sys, re
from testplan import test_plan
from testplan.testing.multitest import MultiTest, testsuite, testcase
from testplan.testing.multitest.driver.app import App
from testplan.common.utils.match import LogMatcher
@testsuite
class MyTestsuite:
"""
A testsuite that uses helper utilities in setup/teardown.
"""
@testcase
def manual_start(self, env, result):
result.equal(env.cat_app.proc, None, description="App is not running.")
env.cat_app.start()
env.cat_app.wait(env.cat_app.status.STARTED)
matcher = LogMatcher(log_path=env.cat_app.logpath)
env.cat_app.proc.stdin.write(b"testplan\n")
matched = matcher.match(re.compile(r"testplan"))
result.true(matched, description="testplan in stdin")
result.not_equal(env.cat_app.proc, None, description="App is running.")
env.cat_app.stop()
env.cat_app.wait(env.cat_app.status.STOPPED)
result.equal(env.cat_app.proc, None, description="App is not running.")
@testcase
def manual_start_using_context_manager(self, env, result):
result.equal(env.cat_app.proc, None, description="App is not running.")
with env.cat_app:
matcher = LogMatcher(log_path=env.cat_app.logpath)
env.cat_app.proc.stdin.write(b"testplan\n")
matched = matcher.match(re.compile(r"testplan"))
result.true(matched, description="testplan in stdin")
result.not_equal(
env.cat_app.proc, None, description="App is running."
)
result.equal(env.cat_app.proc, None, description="App is not running.")
@test_plan(name="App driver example")
def main(plan):
"""
A simple example that demonstrate manually starting and stopping the App driver.
"""
plan.add(
MultiTest(
name="TestCat",
suites=[MyTestsuite()],
environment=[
App(
name="cat_app",
binary="/bin/cat",
auto_start=False,
)
],
)
)
if __name__ == "__main__":
sys.exit(main().exit_code)
Custom Application - FXConverter¶
- Required files:
test_plan.py¶
#!/usr/bin/env python
"""
This example is to test the converter.py binary application with the following
functionality:
1. Connects to an external service that provides FX exchange rates.
2. Accepts requests from clients i.e: 1000 GBP to be converted to EUR
3. Requests the currency rate from the FX rates service. i.e: 1.15
4. Responds to the client with the converted amount: i.e: 1150
(Requests) GBP:EUR:1000 GBP:EUR
------------------ ----------------- ------------------
| | -------> | converter.py | -------> | Exchange rates |
| Client | | - | | Server |
| | <------- | Application | <------- | Mock |
------------------ ----------------- ------------------
(Responses) 1150 1.15
"""
import os
import re
import sys
from driver import FXConverter
from suites import EdgeCases, ConversionTests, RestartEvent
from testplan.testing.multitest import MultiTest
from testplan import test_plan
from testplan.common.utils.context import context
from testplan.report.testing.styles import Style, StyleEnum
from testplan.testing.multitest.driver.tcp import TCPServer, TCPClient
OUTPUT_STYLE = Style(StyleEnum.ASSERTION_DETAIL, StyleEnum.ASSERTION_DETAIL)
def after_start(env):
"""
Called right after MultiTest starts.
"""
# Server accepts connection request by convert app.
env.server.accept_connection()
def before_stop(env, result):
"""
Called right before MultiTest stops.
"""
# Clients sends stop command to the converted app.
env.client.send(bytes("Stop".encode("utf-8")))
def converter_environment():
"""
MultiTest environment that will be made available within the testcases.
"""
# Server that will respond with FX exchange rates.
server = TCPServer(name="server")
# Converter application that accepts configuration template that
# after install process it will contain the host/port information of
# the 'server' to connect to.
# It also reads from the output file the address that the converter
# listens for incoming connections and makes host/port info available
# through its context so that the client can connect as well.
converter_name = "converter"
config = "converter.cfg"
regexps = [
re.compile(r"Converter started."),
re.compile(r".*Listener on: (?P<listen_address>.*)"),
]
converter = FXConverter(
name=converter_name,
pre_args=[sys.executable],
binary=os.path.join(
os.path.dirname(os.path.abspath(__file__)), "converter.py"
),
args=[config],
install_files=[
os.path.join(os.path.dirname(os.path.abspath(__file__)), config)
],
log_regexps=regexps,
)
# Client that connects to the converted application using host/port
# information that FXConverter driver made available through its context.
client = TCPClient(
name="client",
host=context(converter_name, "{{host}}"),
port=context(converter_name, "{{port}}"),
)
return [server, converter, client]
# Hard-coding `pdf_path`, 'stdout_style' and 'pdf_style' so that the
# downloadable example gives meaningful and presentable output.
# NOTE: this programmatic arguments passing approach will cause Testplan
# to ignore any command line arguments related to that functionality.
@test_plan(
name="FXConverter",
pdf_path="report.pdf",
stdout_style=OUTPUT_STYLE,
pdf_style=OUTPUT_STYLE,
)
def main(plan):
"""
Testplan decorated main function to add and execute MultiTests.
:return: Testplan result object.
:rtype: ``testplan.base.TestplanResult``
"""
test = MultiTest(
name="TestFXConverter",
suites=[ConversionTests(), EdgeCases(), RestartEvent()],
environment=converter_environment(),
after_start=after_start,
before_stop=before_stop,
)
plan.add(test)
if __name__ == "__main__":
res = main()
print("Exiting code: {}".format(res.exit_code))
sys.exit(res.exit_code)
suites.py¶
"""FX conversion tests."""
import os
from testplan.testing.multitest import testsuite, testcase
def msg_to_bytes(msg, standard="utf-8"):
"""Encode text to bytes."""
return bytes(msg.encode(standard))
def bytes_to_msg(seq, standard="utf-8"):
"""Decode bytes to text."""
return seq.decode(standard)
def custom_docstring_func(_, kwargs):
"""
Return original docstring (if available) and
parametrization arguments in the format ``key: value``.
"""
kwargs_strings = [
"{}: {}".format(arg_name, arg_value)
for arg_name, arg_value in kwargs.items()
]
return os.linesep.join(kwargs_strings)
@testsuite
class ConversionTests:
"""Sample currency conversion operations."""
def __init__(self):
self._rates = {"EUR": {"GBP": "0.90000"}, "GBP": {"EUR": "1.10000"}}
@testcase(
parameters=(
("EUR:GBP:1000", "900"),
("GBP:EUR:1000", "1100"),
("EUR:GBP:1500", "1350"),
("GBP:EUR:1500", "1650"),
),
docstring_func=custom_docstring_func,
)
def conversion_parameterized(self, env, result, request, expect):
"""
Client sends a request to the currency converter app.
The converter retrieves the conversion rate from the downstream
and sends back the converted result to the client.
"""
env.client.send(msg_to_bytes(request))
# App requests rates from server
received = bytes_to_msg(env.server.receive(size=7))
result.equal(request[:7], received, "Downstream receives rate query.")
source, target = received.split(":")
rate = self._rates[source][target]
result.log("Downstream sends rate: {}".format(rate))
env.server.send(msg_to_bytes(rate))
# Client receives response.
result.equal(
int(expect),
int(bytes_to_msg(env.client.receive(size=1024))),
"Client received converted value.",
)
@testsuite
class EdgeCases:
"""Suite containing edge case scenarios."""
@testcase
def same_currency(self, env, result):
"""
Client requests conversion to the same currency.
No downstream is involved as no rate is needed.
"""
request = "EUR:EUR:2000"
expect = "2000"
result.log("Client request: {}".format(request))
env.client.send(msg_to_bytes(request))
# Client receives response.
result.equal(
int(expect),
int(bytes_to_msg(env.client.receive(size=1024))),
"Client received converted value.",
)
@testcase
def zero_amount(self, env, result):
"""
Client requests conversion of 0 amount.
No downstream is involved as no rate is needed.
"""
request = "EUR:GBP:0"
expect = "0"
result.log("Client request: {}".format(request))
env.client.send(msg_to_bytes(request))
# Client receives response.
result.equal(
int(expect),
int(bytes_to_msg(env.client.receive(size=1024))),
"Client received converted value.",
)
@testcase(
parameters=(
"GBP.EUR.1000",
"EUR::GBP:500",
"GBP:EURO:1000",
"GBP:EUR:ABC",
),
docstring_func=custom_docstring_func,
)
def invalid_requests(self, env, result, request):
"""
Client sends a request with incorrect format.
Requests are matched by [A-Z]{3}:[A-Z]{3}:[0-9]+ regex.
"""
env.client.send(msg_to_bytes(request))
# Client receives response.
result.contain(
"Invalid request format",
bytes_to_msg(env.client.receive(size=1024)),
"Invalid request error received.",
)
@testsuite
class RestartEvent:
"""Converter app restart and reconnect scenarios."""
def _send_and_receive(self, env, result, request, rate, expect):
"""
Client sends a request to the currency converter app.
The converter retrieves the conversion rate from the downstream
and sends back the converted result to the client.
"""
result.log("Client sends request: {}".format(request))
env.client.send(msg_to_bytes(request))
# App requests rates from server
received = bytes_to_msg(env.server.receive(size=7))
result.equal(request[:7], received, "Downstream receives rate query.")
result.log("Downstream sends rate: {}".format(rate))
env.server.send(msg_to_bytes(rate))
# Client receives response.
result.equal(
int(expect),
int(bytes_to_msg(env.client.receive(size=1024))),
"Client received converted value.",
)
def _restart_components(self, env, result):
"""
Restart converter app.
Accept new connection from rate sending server.
Restart client driver to connect to new host:port.
"""
result.log("Restarting converter app.")
env.converter.restart()
result.log(
"App is now listening on {}:{}".format(
env.converter.host, env.converter.port
)
)
env.server.accept_connection()
env.client.restart()
@testcase
def restart_app(self, env, result):
"""
Restarting converter app and reconnect with rate
server and client components before doing new requests.
"""
result.log(
"App is listening on {}:{}".format(
env.converter.host, env.converter.port
)
)
self._send_and_receive(env, result, "EUR:GBP:1000", "0.8500", "850")
self._restart_components(env, result)
self._send_and_receive(env, result, "EUR:GBP:1000", "0.8700", "870")
self._restart_components(env, result)
self._send_and_receive(env, result, "EUR:GBP:2000", "0.8300", "1660")
converter.cfg¶
[Listener]
Host: localhost
Port: 0
[Downstream]
Host: {{context['server'].host}}
Port: {{context['server'].port}}
converter.py¶
"""
Converter application that:
1. Connects to an external service that provides FX exchange rates.
2. Accepts requests from clients i.e: 1000 GBP to be converted to EUR
3. Requests the currency rate from the FX rates service. i.e: 1.15
4. Responds to the client with the converted amount: i.e: 1150
"""
import os
import re
import sys
import socket
import logging
from configparser import ConfigParser
logging.basicConfig(stream=sys.stdout, format="%(message)s")
class FXConverter:
"""FXConverter class that accepts a config file."""
def __init__(self, config_file):
self._logger = logging.getLogger()
self._logger.setLevel(logging.INFO)
self._config = self.load_config(
os.path.join(os.getcwd(), "etc", config_file)
)
self._server = None
def load_config(self, filename):
"""
Reads from the config file the host/port information of the downstream
service and also the host/port to bind and listen for clients.
"""
config = ConfigParser()
config.read(filename)
self._logger.info("Configuration read:")
for section in ("Listener", "Downstream"):
self._logger.info(section)
self._logger.info("\tHost: {}".format(config.get(section, "Host")))
self._logger.info("\tPort: {}".format(config.get(section, "Port")))
return config
def _server_init(self):
self._server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._server.bind(
(
self._config.get("Listener", "Host"),
int(self._config.get("Listener", "Port")),
)
)
self._server.listen(10)
return self._server.getsockname()
def _validate_request(self, msg):
if not re.match(r"[A-Z]{3}:[A-Z]{3}:[0-9]+", msg):
raise ValueError("Invalid request format ({}).".format(msg))
def _process_request(self, downstream_cli, msg):
value = float(msg[8:]) # i.e 1000
if value == 0:
return "0"
currencies = msg[:7] # i.e EUR:GBP
source, target = currencies.split(":")
if source == target:
rate = 1
else:
downstream_cli.sendall(bytes(currencies.encode("utf-8")))
rate = float(downstream_cli.recv(1024).decode("utf-8"))
result = str(int(rate * value))
self._logger.info(
"Request result for {} with rate {}: {}".format(
msg[8:], rate, result
)
)
return result
def _loop(self, upstream_conn, downstream_cli, downstream_addr):
while True:
msg = str(upstream_conn.recv(1024).decode("utf-8"))
self._logger.info("Client msg: {}".format(msg))
if msg == "Stop":
self._server.close()
self._logger.info("Converter stopped.")
break
else:
try:
self._validate_request(msg)
except Exception as exc:
upstream_conn.sendall(bytes(str(exc).encode("utf-8")))
continue
else:
self._logger.info(
"Propagating query {} to {}".format(
msg, downstream_addr
)
)
result = self._process_request(downstream_cli, msg)
upstream_conn.sendall(bytes(result.encode("utf-8")))
def loop(self):
"""
Starts the application.
1. Connect to downstream server with FX exchange rates.
2. Accepts client connection.
3. Start the loop to handle client requests.
"""
host, port = self._server_init()
self._logger.info("Listener on: {}:{}".format(host, port))
downstream_addr = (
self._config.get("Downstream", "Host"),
int(self._config.get("Downstream", "Port")),
)
self._logger.info(
"Connected to downstream: {}:{}".format(
downstream_addr[0], downstream_addr[1]
)
)
downstream_cli = socket.create_connection(downstream_addr)
self._logger.info("Converter started.")
upstream_conn, client_address = self._server.accept()
self._logger.info("Client connected: {}".format(client_address))
self._loop(upstream_conn, downstream_cli, downstream_addr)
if __name__ == "__main__":
sys.stderr.flush()
_, config_file = sys.argv
converter = FXConverter(config_file)
converter.loop()
driver.py¶
"""Driver for converter.py application binary."""
import re
from testplan.testing.multitest.driver.app import App
class FXConverter(App):
"""
Inherits the generic ``testplan.testing.multitest.driver.app.App`` driver
and expose host/port values read from log extracts.
"""
def __init__(self, **options):
super(FXConverter, self).__init__(**options)
self.host = None
self.port = None
def post_start(self):
"""
Store host/port information to be made available in its context
so that client driver can connect to it.
"""
super(FXConverter, self).post_start()
self.host, self.port = re.split(":", self.extracts["listen_address"])
PDF report¶
Sample detailed PDF report.