Transports

FIX

Required files:

Certificates howto

To generate the certs needed for the tls example see: .. literalinclude:: ../../../examples/Transports/FIX/certs/readme.txt

test_plan.py

#!/usr/bin/env python
"""
This example demonstrates FIX communication via FixServer and FixClient drivers.

NOTE: The FixServer driver implementation requires select.poll(), which is not
available on all platforms. Typically it is available on POSIX systems but
not on Windows. This example will not run correctly on platforms where
select.poll() is not available.
"""

import sys

from testplan import test_plan
from testplan.report.testing.styles import Style, StyleEnum

import over_one_session
import over_two_sessions


OUTPUT_STYLE = Style(StyleEnum.ASSERTION_DETAIL, StyleEnum.ASSERTION_DETAIL)


# Hard-coding `pdf_path`, 'stdout_style' and 'pdf_style' so that the
# downloadable example gives meaningful and presentable output.
# NOTE: this programmatic arguments passing approach will cause Testplan
# to ignore any command line arguments related to that functionality.
@test_plan(
    name="FIXCommunication",
    pdf_path="report.pdf",
    stdout_style=OUTPUT_STYLE,
    pdf_style=OUTPUT_STYLE,
)
def main(plan):
    """
    Testplan decorated main function to add and execute MultiTests.

    :return: Testplan result object.
    :rtype:  ``testplan.base.TestplanResult``
    """
    plan.add(over_one_session.get_multitest())
    plan.add(over_two_sessions.get_multitest())


if __name__ == "__main__":
    res = main()
    print("Exiting code: {}".format(res.exit_code))
    sys.exit(res.exit_code)

over_one_session.py

"""Tests FIX communication between a server and a client."""

import os
from pathlib import Path
import sys

try:
    sys.path.append(os.environ["PYFIXMSG_PATH"])
    import pyfixmsg
except (KeyError, ImportError):
    raise RuntimeError(
        "Download pyfixmsg library from "
        "https://github.com/morganstanley/pyfixmsg "
        "and set PYFIXMSG_PATH env var to the local path."
    )
try:
    SPEC_FILE = os.environ["FIX_SPEC_FILE"]
except KeyError:
    raise RuntimeError(
        "No spec file set. You should download "
        "https://github.com/quickfix/quickfix/blob/master/spec/FIX42.xml "
        "file and set FIX_SPEC_FILE to the local path."
    )

from pyfixmsg.fixmessage import FixMessage
from pyfixmsg.codecs.stringfix import Codec
from pyfixmsg.reference import FixSpec

from testplan.common.utils.context import context
from testplan.testing.multitest import MultiTest, testsuite, testcase
from testplan.testing.multitest.driver.fix import FixServer, FixClient
from testplan.common.utils.sockets.tls import SimpleTLSConfig

CODEC = Codec(spec=FixSpec(SPEC_FILE))


def fixmsg(source):
    """
    Factory function that forces the codec to our given spec and avoid
    passing codec to serialisation and parsing methods.
    The codec defaults to a reasonable parser but without repeating groups.
    An alternative method is to use the ``to_wire`` and ``from_wire`` methods
    to serialise and parse messages and pass the codec explicitly.
    """
    # python 2 and 3 compatibility
    msg = FixMessage(source)
    msg.codec = CODEC
    return msg


@testsuite
class FIXTestsuite:
    @testcase
    def send_and_receive_msg(self, env, result):
        """
        Basic FIX messaging between a FixServer and a FixClient.
        """
        # First we create a FIX message containing a single tag: 35=D
        msg = fixmsg({35: "D"})

        # We use the client to send that message over to the server.
        # The message is enriched with the expected session tags (49, 56 etc).
        env.client.send(msg)

        # We create a FIX message to describe what we expect the server to
        # receive. We expect the default FIX version FIX.4.2, the same value
        # for tag 35 as given, D, and the correct senderCompID and targetCompID.
        exp_msg = fixmsg(
            {
                8: "FIX.4.2",
                35: "D",
                49: env.client.sender,
                56: env.client.target,
            }
        )

        # We receive the message from the server.
        received = env.server.receive()

        # We assert that we expect a message that matches the message we sent.
        # We restrict the comparison to tags 8, 35, 49 and 56, since we want to
        # ignore the other message-level tags such as 9 and 10 that are
        # automatically added by the connectors.
        result.fix.match(
            exp_msg,
            received,
            description="Message sent by client match.",
            include_tags=[8, 35, 49, 56],
        )

        # Now, we create a response message from the server, confirming receipt
        # of order (message type 8)
        msg = fixmsg({35: "8"})

        # We use the server to send the response to the client.
        env.server.send(msg)

        # we can also create a heartbeat message (message type 0)
        heartbeat = fixmsg({35: "0"})
        # We use the server to send the heartbeat to the client.
        env.server.send(heartbeat)

        # We create a FIX message to describe what we expect the client to
        # receive. The default FIX version FIX.4.2 is expected, together with
        # the right senderCompID and targetCompID.
        exp_msg = fixmsg(
            {
                8: "FIX.4.2",
                35: "8",
                49: env.client.target,
                56: env.client.sender,
            }
        )

        exp_heartbeat = fixmsg(
            {
                8: "FIX.4.2",
                35: "0",
                49: env.client.target,
                56: env.client.sender,
            }
        )

        # We receive the message from the client.
        received = env.client.receive()
        received_heartbeat = env.client.receive()

        # We expect a message that matches the message we sent. We restrict the
        # comparison to tags 8, 35, 49 and 56, since we want to ignore the
        # other message-level tags such as 9 and 10 that are automatically
        # added by the connectors.
        result.fix.match(
            exp_msg,
            received,
            description="Message sent by server match.",
            include_tags=[8, 35, 49, 56],
        )
        result.fix.match(
            exp_heartbeat,
            received_heartbeat,
            description="Message sent by server match.",
            include_tags=[8, 35, 49, 56],
        )


def get_multitest():
    """
    Creates and returns a new MultiTest instance to be added to the plan.
    The environment is a server and a client connecting using the context
    functionality that retrieves host/port of the server after is started.
    """
    test = MultiTest(
        name="OverOneSession",
        suites=[FIXTestsuite()],
        environment=[
            FixServer(name="server", msgclass=FixMessage, codec=CODEC),
            FixClient(
                name="client",
                host=context("server", "{{host}}"),
                port=context("server", "{{port}}"),
                sender="TW",
                target="ISLD",
                msgclass=FixMessage,
                codec=CODEC,
                logoff_at_stop=False,
            ),
        ],
    )
    return test

over_two_sessions.py

"""Tests FIX communication between a server and multiple clients."""

import os
import sys

try:
    sys.path.append(os.environ["PYFIXMSG_PATH"])
    import pyfixmsg
except (KeyError, ImportError):
    raise RuntimeError(
        "Download pyfixmsg library from "
        "https://github.com/morganstanley/pyfixmsg "
        "and set PYFIXMSG_PATH env var to the local path."
    )
try:
    SPEC_FILE = os.environ["FIX_SPEC_FILE"]
except KeyError:
    raise RuntimeError(
        "No spec file set. You should download "
        "https://github.com/quickfix/quickfix/blob/master/spec/FIX42.xml "
        "file and set FIX_SPEC_FILE to the local path."
    )

from pyfixmsg.fixmessage import FixMessage
from pyfixmsg.codecs.stringfix import Codec
from pyfixmsg.reference import FixSpec

from testplan.common.utils.context import context
from testplan.testing.multitest import MultiTest, testsuite, testcase
from testplan.testing.multitest.driver.fix import FixServer, FixClient

CODEC = Codec(spec=FixSpec(SPEC_FILE))


def fixmsg(source):
    """
    Factory function that forces the codec to our given spec and avoid
    passing codec to serialisation and parsing methods.
    The codec defaults to a reasonable parser but without repeating groups.
    An alternative method is to use the ``to_wire`` and ``from_wire`` methods
    to serialise and parse messages and pass the codec explicitly.
    """
    # python 2 and 3 compatibility
    msg = FixMessage(source)
    msg.codec = CODEC
    return msg


@testsuite
class FIXTestsuite:
    @testcase
    def send_and_receive_msgs(self, env, result):
        """
        Basic FIX messaging with many FixClients connecting to one FixServer.
        """
        # First we create a FIX message with tag: 35=D and a comment in tag 58.
        msg1 = fixmsg({35: "D", 58: "first client"})
        # We use the first client to send that message over to the server.
        # The message is enriched with the expected session tags (49, 56 etc).
        env.client1.send(msg1)
        # We create a FIX message to describe what we expect the server to
        # receive. We expect the default FIX version FIX.4.2, the same value
        # for tag 35 as given, D, and the correct senderCompID and targetCompID
        # (those from the first client).
        exp_msg1 = fixmsg(
            {
                8: "FIX.4.2",
                35: "D",
                49: env.client1.sender,
                56: env.client1.target,
                58: "first client",
            }
        )
        # We receive the message from the server. Since the server now has
        # multiple connections, we also need to specify which connection
        # we want to receive the message from. This is indicated through the
        # (senderCompID, targetCompID) pair passed in.
        received1 = env.server.receive(
            (env.client1.target, env.client1.sender)
        )
        # We assert and restrict the comparison to tags 8, 35, 49, 56 and 58,
        # since we want to ignore the other message-level tags such as 9 and 10
        # that are automatically added by the connectors.
        result.fix.match(
            exp_msg1,
            received1,
            description="Message sent by client 1 match.",
            include_tags=[8, 35, 49, 56, 58],
        )

        # We create a very similar message, but with a different comment.
        msg2 = fixmsg({35: "D", 58: "second client"})
        # Now, we send the message from the second client.
        # The message is enriched with the expected session tags (49, 56 etc).
        env.client2.send(msg2)

        # The message we expect is almost identical, except for senderCompID
        # and targetCompID tags, which now identify the second connection.
        exp_msg2 = fixmsg(
            {
                8: "FIX.4.2",
                35: "D",
                49: env.client2.sender,
                56: env.client2.target,
                58: "second client",
            }
        )
        # We receive the message and this time we want to receive from the
        # second client. So, we specify to the server that it should receive
        # from the second connection.
        received2 = env.server.receive(
            (env.client2.target, env.client2.sender)
        )
        # We assert and restrict the comparison to tags 8, 35, 49, 56 and 58,
        # since we want to ignore the other message-level tags such as 9 and 10
        # that are automatically added by the connectors.
        result.fix.match(
            exp_msg2,
            received2,
            description="Message sent by client 2 match.",
            include_tags=[8, 35, 49, 56, 58],
        )

        # Now, we create a response message from the server,
        # confirming receipt of order (message type 8).
        msg = fixmsg({35: "8"})
        # We use the server to send the response to both clients in turn.
        env.server.send(msg, (env.client1.target, env.client1.sender))
        env.server.send(msg, (env.client2.target, env.client2.sender))
        # We create a FIX message to describe what we expect the clients to
        # receive. The default FIX version FIX.4.2 is expected in both messages.
        # However, the senderCompID and targetCompID differ for the two clients.
        exp_msg1 = fixmsg(
            {
                8: "FIX.4.2",
                35: "8",
                49: env.client1.target,
                56: env.client1.sender,
            }
        )
        exp_msg2 = fixmsg(
            {
                8: "FIX.4.2",
                35: "8",
                49: env.client2.target,
                56: env.client2.sender,
            }
        )
        # We receive the message from the clients.
        received1 = env.client1.receive()
        received2 = env.client2.receive()
        # We expect the messages matche the message we sent. We restrict the
        # comparison to tags 8, 35, 49 and 56, since we want to ignore the
        # other message-level tags such as 9 and 10 that are automatically
        # added by the connectors.
        result.fix.match(
            exp_msg1,
            received1,
            description="Msg sent by server to client 1 match.",
            include_tags=[8, 35, 49, 56],
        )
        result.fix.match(
            exp_msg2,
            received2,
            description="Msg sent by server to client 2 match.",
            include_tags=[8, 35, 49, 56],
        )


def get_multitest():
    """
    Creates and returns a new MultiTest instance to be added to the plan.
    The environment is a server and two clients connecting using the context
    functionality that retrieves host/port of the server after is started.

       ------------- client1
       |
    server
       |
       ------------- client2

    """
    test = MultiTest(
        name="OverTwoSessions",
        suites=[FIXTestsuite()],
        environment=[
            FixServer(name="server", msgclass=FixMessage, codec=CODEC),
            FixClient(
                name="client1",
                host=context("server", "{{host}}"),
                port=context("server", "{{port}}"),
                sender="TW",
                target="ISLD",
                msgclass=FixMessage,
                codec=CODEC,
            ),
            FixClient(
                name="client2",
                host=context("server", "{{host}}"),
                port=context("server", "{{port}}"),
                sender="TW2",
                target="ISLD",
                msgclass=FixMessage,
                codec=CODEC,
            ),
        ],
    )
    return test

PDF report

Sample detailed PDF report.

../_images/fix_sessions_example.png

HTTP

Required files:

test_plan.py

#!/usr/bin/env python
"""
This example is to demonstrate HTTP communication test scenarios.
"""

import sys

from testplan import test_plan
from testplan.report.testing.styles import Style, StyleEnum

import http_basic
import http_custom_handler

OUTPUT_STYLE = Style(StyleEnum.ASSERTION_DETAIL, StyleEnum.ASSERTION_DETAIL)


# Hard-coding `pdf_path`, 'stdout_style' and 'pdf_style' so that the
# downloadable example gives meaningful and presentable output.
# NOTE: this programmatic arguments passing approach will cause Testplan
# to ignore any command line arguments related to that functionality.
@test_plan(
    name="HTTPConnections",
    pdf_path="report.pdf",
    stdout_style=OUTPUT_STYLE,
    pdf_style=OUTPUT_STYLE,
)
def main(plan):
    """
    Testplan decorated main function to add and execute 2 MultiTests.

    :return: Testplan result object.
    :rtype:  ``testplan.base.TestplanResult``
    """
    plan.add(http_basic.get_multitest("HTTPBasic"))
    plan.add(http_custom_handler.get_multitest("HTTPCustomHandler"))


if __name__ == "__main__":
    res = main()
    print("Exiting code: {}".format(res.exit_code))
    sys.exit(res.exit_code)

http_basic.py

"""Tests HTTP requests between a server and a client."""
import json

from testplan.common.utils.context import context

from testplan.testing.multitest import MultiTest, testsuite, testcase
from testplan.testing.multitest.driver.http import (
    HTTPServer,
    HTTPClient,
    HTTPResponse,
)


@testsuite
class HTTPTestsuite:
    """HTTP requests between a server and a client."""

    @testcase
    def request_and_response(self, env, result):
        """
        Client makes a request, server received and responds back.
        """
        # The HTTPClient sends a GET request to some section of the API. The
        # HTTPServer will respond with the next message in it's response queue
        # no matter the HTTP method (GET, POST etc.) or the section of the API
        # it has been sent.
        result.log("Client sends GET request")
        env.http_client.get(api="/random/text")

        # Need to do a receive otherwise it will ruin our next testcase
        received_request = env.http_server.receive()
        result.log(f"Server got GET request: {received_request}")

        # Create some JSON.
        json_content = {"this": ["is", "a", "json", "object"]}

        # We then prepare an HTTPResponse. Headers are added as a dictionary and
        # content as a list. For this example we just indicate that the content
        # type is JSON and dump the JSON as a string so it can be sent.
        prepared_response = HTTPResponse(
            headers={"Content-type": "application/json"},
            content=[json.dumps(json_content)],
        )

        # The HTTPServer then responds. Under the hood this adds the response to
        # the HTTPServer's response queue which will be immediately sent as the
        # HTTPClient has already sent a request.
        result.log("Server receives request and sends response")
        env.http_server.respond(prepared_response)

        # The HTTPClient then receives the HTTPServer's response.
        response = env.http_client.receive()

        # We are verifying the JSON sent back is the same as the one sent by the
        # HTTPServer.
        result.dict.match(
            response.json(), json_content, "JSON response from server"
        )

    @testcase
    def post_and_response(self, env, result):
        """
        Client makes a request, server received and responds back.
        """
        # Create some JSON.
        json_content = {"this": ["is", "another", "json", "object"]}

        # The HTTPClient sends a POST request with some data to some section of the API. The
        # HTTPServer will respond with the same message in it's response queue
        # no matter the HTTP method (GET, POST etc.) or the section of the API
        # it has been sent.
        result.log("Client sends POST request")
        env.http_client.post(
            api="/random/text",
            json=json_content,
            headers={"Content-Type": "application/json"},
        )

        # The HTTP Server receives the request
        received_request = env.http_server.receive()
        result.log(f"Server got POST request: {received_request}")

        # We are verifying the JSON sent back is the same as the one sent by the
        # HTTPServer.
        result.dict.match(
            received_request.json, json_content, "JSON sent to the server"
        )

        # We then prepare an HTTPResponse. Headers are added as a dictionary and
        # content as a list. For this example we just indicate that the content
        # type is JSON and dump the JSON as a string so it can be sent.
        prepared_response = HTTPResponse(
            headers={"Content-type": "application/json"},
            content=[json.dumps(json_content)],
        )

        # The HTTPServer then responds. Under the hood this adds the response to
        # the HTTPServer's response queue which will be immediately sent as the
        # HTTPClient has already sent a request.
        result.log("Server receives request and sends response")
        env.http_server.respond(prepared_response)

        # The HTTPClient then receives the HTTPServer's response.
        response = env.http_client.receive()

        # We are verifying the JSON sent back is the same as the one sent by the
        # HTTPServer.
        result.dict.match(
            response.json(), json_content, "JSON response from server"
        )


def get_multitest(name):
    """
    Creates and returns a new MultiTest instance to be added to the plan.
    The environment is a server and a client connecting using the context
    functionality that retrieves host/port of the server after is started.
    """
    test = MultiTest(
        name=name,
        suites=[HTTPTestsuite()],
        environment=[
            HTTPServer(name="http_server"),
            HTTPClient(
                name="http_client",
                host=context("http_server", "{{host}}"),
                port=context("http_server", "{{port}}"),
            ),
        ],
    )
    return test

http_custom_handler.py

"""Tests using a custom HTTP response handler."""
from testplan.common.utils.context import context

from testplan.testing.multitest import MultiTest, testsuite, testcase
from testplan.testing.multitest.driver.http import HTTPServer, HTTPClient

from custom_http_request_handler import CustomHTTPRequestHandler


@testsuite
class HTTPTestsuite:
    """Sending requests to an HTTPServer with a custom HTTP request handler."""

    @testcase
    def general_request(self, env, result):
        """
        Client makes a general request, server responds with text file contents.
        """
        # The HTTPClient sends a GET request to some section of the API.
        result.log("Client sends GET request")
        env.http_client.get(api="/random/text")

        # The HTTPServer will use the CustomHTTPRequestHandler to automatically
        # send the contents of the test.txt file back to every request. The
        # HTTPClient receives the HTTPServer's response.
        response = env.http_client.receive()

        # We are verifying the contents of the test.txt file is what the server
        # has sent back.
        text = open("test.txt").read()
        result.equal(
            response.text, text, "HTTPServer sends file contents in response."
        )

    @testcase
    def post_request(self, env, result):
        """
        Client makes a POST request, server responds with custom POST response.
        """
        result.log("Client sends POST request")
        env.http_client.post(api="/random/text")

        response = env.http_client.receive()

        result.equal(
            response.text,
            "POST response.",
            "HTTPServer sends custom POST response.",
        )


def get_multitest(name):
    """
    Creates and returns a new MultiTest instance to be added to the plan.
    The environment is a server and a client connecting using the context
    functionality that retrieves host/port of the server after is started.
    """
    # The HTTPServer can be passed handler_attributes in a dictionary. These
    # will be accessible in the custom HTTP request handler
    # (see custom_http_request_handler.py).
    attributes = {"text_file": "test.txt"}
    test = MultiTest(
        name=name,
        suites=[HTTPTestsuite()],
        environment=[
            HTTPServer(
                name="http_server",
                request_handler=CustomHTTPRequestHandler,
                handler_attributes=attributes,
            ),
            HTTPClient(
                name="http_client",
                host=context("http_server", "{{host}}"),
                port=context("http_server", "{{port}}"),
            ),
        ],
    )
    return test

custom_http_request_handler.py

"""Custom HTTP request handler."""
from testplan.testing.multitest.driver.http import (
    HTTPRequestHandler,
    HTTPResponse,
)


class CustomHTTPRequestHandler(HTTPRequestHandler):
    """Define a custom request handler."""

    def get_response(self, request):
        """
        Override the get_response method to determine how the server will
        respond to all requests. You must return an HTTPResponse object as the
        _parse_request method expects this.
        """
        text_file = self.server.handler_attributes["text_file"]
        with open(text_file) as input:
            text = input.read()
        response = HTTPResponse(content=[text])
        self.server.log_callback(
            "Creating custom response from {}".format(text_file)
        )
        return response

    def do_POST(self):
        """
        Override individual request methods (e.g. do_POST) to determine how the
        server will respond to individual requests. You will have to create the
        response, then call _send_header and _send_content.
        """
        response = HTTPResponse(content=["POST response."])
        self._send_header(response.status_code, response.headers)
        self._send_content(response.content)

test.txt

This is a test file.
It has multiple lines.

Kafka

Required files:

test_plan.py

#!/usr/bin/env python
"""
Demostrates Kafka driver usage from within the testcases.
"""

import os
import sys
import uuid

from testplan import test_plan
from testplan.testing.multitest import MultiTest

try:
    from confluent_kafka import Producer, Consumer
except ImportError:
    print("Cannot import confluent_kafka!")
    sys.exit()

from testplan.testing.multitest.driver.zookeeper import (
    ZookeeperStandalone,
    ZK_SERVER,
)
from testplan.testing.multitest.driver.kafka import (
    KafkaStandalone,
    KAFKA_START,
)
from testplan.testing.multitest import testsuite, testcase
from testplan.report.testing.styles import Style, StyleEnum


OUTPUT_STYLE = Style(StyleEnum.ASSERTION_DETAIL, StyleEnum.ASSERTION_DETAIL)


@testsuite
class KafkaTest:
    """Suite that contains testcases that perform kafka operation."""

    @testcase
    def test_send_receive(self, env, result):
        producer = Producer(
            {
                "bootstrap.servers": "localhost:{}".format(env.kafka.port),
                "max.in.flight": 1,
            }
        )
        consumer = Consumer(
            {
                "bootstrap.servers": "localhost:{}".format(env.kafka.port),
                "group.id": uuid.uuid4(),
                "default.topic.config": {"auto.offset.reset": "smallest"},
                "enable.auto.commit": True,
            }
        )

        topic = "testplan"
        message = str(uuid.uuid4()).encode("utf-8")
        producer.produce(topic=topic, value=message)
        producer.flush()
        consumer.subscribe([topic])
        msg = consumer.poll(10)
        result.equal(message, msg.value(), "Test producer and consumer")


# Hard-coding `pdf_path`, 'stdout_style' and 'pdf_style' so that the
# downloadable example gives meaningful and presentable output.
# NOTE: this programmatic arguments passing approach will cause Testplan
# to ignore any command line arguments related to that functionality.
@test_plan(
    name="KafkaExample",
    stdout_style=OUTPUT_STYLE,
    pdf_style=OUTPUT_STYLE,
    pdf_path="report.pdf",
)
def main(plan):
    """
    Testplan decorated main function to add and execute MultiTests.

    :return: Testplan result object.
    :rtype:  ``testplan.base.TestplanResult``
    """
    current_path = os.path.dirname(os.path.abspath(__file__))
    zookeeper_template = os.path.join(current_path, "zoo_template.cfg")
    kafka_template = os.path.join(current_path, "server_template.properties")

    plan.add(
        MultiTest(
            name="KafkaTest",
            suites=[KafkaTest()],
            environment=[
                ZookeeperStandalone(
                    name="zk", cfg_template=zookeeper_template
                ),
                KafkaStandalone(name="kafka", cfg_template=kafka_template),
            ],
        )
    )


if __name__ == "__main__":
    if os.path.exists(ZK_SERVER) and os.path.exists(KAFKA_START):
        sys.exit(not main())
    else:
        print("Zookeeper doesn't exist in this server.")

server_template.properties

# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# see kafka.server.KafkaConfig for additional details and defaults

############################# Server Basics #############################

# The id of the broker. This must be set to a unique integer for each broker.
broker.id=0

# Switch to enable topic deletion or not, default value is false
delete.topic.enable=true

############################# Socket Server Settings #############################

# The address the socket server listens on. It will get the value returned from 
# java.net.InetAddress.getCanonicalHostName() if not configured.
#   FORMAT:
#     listeners = listener_name://host_name:port
#   EXAMPLE:
#     listeners = PLAINTEXT://your.host.name:9092

listeners=PLAINTEXT://localhost:{{port}}

# Hostname and port the broker will advertise to producers and consumers. If not set, 
# it uses the value for "listeners" if configured.  Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092

# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL

# The number of threads handling network requests
num.network.threads=3

# The number of threads doing disk I/O
num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server
socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server
socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)
socket.request.max.bytes=104857600


############################# Log Basics #############################

# A comma seperated list of directories under which to store log files
#log.dirs=/tmp/jiaweil/kafka-vista-9098
log.dirs={{log_path}}

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=3

# The number of threads per data directory to be used for log recovery at startup and flushing at shutdown.
# This value is recommended to be increased for installations with data dirs located in RAID array.
num.recovery.threads.per.data.dir=1

############################# Log Flush Policy #############################

# Messages are immediately written to the filesystem but by default we only fsync() to sync
# the OS cache lazily. The following configurations control the flush of data to disk.
# There are a few important trade-offs here:
#    1. Durability: Unflushed data may be lost if you are not using replication.
#    2. Latency: Very large flush intervals may lead to latency spikes when the flush does occur as there will be a lot of data to flush.
#    3. Throughput: The flush is generally the most expensive operation, and a small flush interval may lead to exceessive seeks.
# The settings below allow one to configure the flush policy to flush data after a period of time or
# every N messages (or both). This can be done globally and overridden on a per-topic basis.

# The number of messages to accept before forcing a flush of data to disk
#log.flush.interval.messages=10000

# The maximum amount of time a message can sit in a log before we force a flush
#log.flush.interval.ms=1000

############################# Log Retention Policy #############################

# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.

# The minimum age of a log file to be eligible for deletion due to age
log.retention.hours=168

# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes. Functions independently of log.retention.hours.
#log.retention.bytes=1073741824

# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect={{context['zk'].connection_str}}

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000

offsets.topic.replication.factor=1

zoo_template.cfg

# limits the number of active connections from a host,
# specified by IP address, to a single ZooKeeper server.
maxClientCnxns=100

# The basic time unit in milliseconds used by ZooKeeper.
# It is used to do heartbeats and the minimum session timeout will be twice the tickTime.
tickTime=2000

# Timeouts ZooKeeper uses to limit the length of time the ZooKeeper
# servers in quorum have to connect to a leader.
initLimit=10

# Limits how far out of date a server can be from a leader.
syncLimit=5

# Enable admin server.
admin.enableServer=false

# The localtion to store the in-memory database snapshots and, unless specified otherwise,
# the transaction log of updates to the database.
dataDir={{zkdata_path}}

# The port to listen for client connections.
clientPort={{port}}

TCP

Required files:

test_plan.py

#!/usr/bin/env python
"""
This example is to demonstrate TCP communication test scenarios.
"""

import sys

from testplan import test_plan
from testplan.report.testing.styles import Style, StyleEnum

import tcp_one_connection
import tcp_multiple_connections

OUTPUT_STYLE = Style(StyleEnum.ASSERTION_DETAIL, StyleEnum.ASSERTION_DETAIL)


# Hard-coding `pdf_path`, 'stdout_style' and 'pdf_style' so that the
# downloadable example gives meaningful and presentable output.
# NOTE: this programmatic arguments passing approach will cause Testplan
# to ignore any command line arguments related to that functionality.
@test_plan(
    name="TCPConnections",
    pdf_path="report.pdf",
    stdout_style=OUTPUT_STYLE,
    pdf_style=OUTPUT_STYLE,
)
def main(plan):
    """
    Testplan decorated main function to add and execute 2 MultiTests.

    :return: Testplan result object.
    :rtype:  ``testplan.base.TestplanResult``
    """
    plan.add(tcp_one_connection.get_multitest("TCPOneConnection"))
    plan.add(tcp_multiple_connections.get_multitest("TCPManyConnections"))


if __name__ == "__main__":
    res = main()
    print("Exiting code: {}".format(res.exit_code))
    sys.exit(res.exit_code)

tcp_one_connection.py

"""Tests TCP communication between a server and a client."""

from testplan.common.utils.context import context

from testplan.testing.multitest import MultiTest, testsuite, testcase
from testplan.testing.multitest.driver.tcp import TCPServer, TCPClient


@testsuite
class TCPTestsuite:
    """TCP tests for a server and a client."""

    def setup(self, env):
        """Will be executed before the testcase."""
        # Server accepts client connection.
        env.server.accept_connection()

    @testcase
    def send_and_receive_msg(self, env, result):
        """
        Client sends a message, server received and responds back.
        """
        msg = env.client.cfg.name
        result.log("Client is sending: {}".format(msg))
        bytes_sent = env.client.send_text(msg)
        received = env.server.receive_text(size=bytes_sent)
        result.equal(received, msg, "Server received")

        response = "Hello {}".format(received)
        result.log("Server is responding: {}".format(response))
        bytes_sent = env.server.send_text(response)
        received = env.client.receive_text(size=bytes_sent)
        result.equal(received, response, "Client received")


def get_multitest(name):
    """
    Creates and returns a new MultiTest instance to be added to the plan.
    The environment is a server and a client connecting using the context
    functionality that retrieves host/port of the server after is started.
    """
    test = MultiTest(
        name=name,
        suites=[TCPTestsuite()],
        environment=[
            TCPServer(name="server"),
            TCPClient(
                name="client",
                host=context("server", "{{host}}"),
                port=context("server", "{{port}}"),
            ),
        ],
    )
    return test

tcp_multiple_connections.py

"""
Tests TCP communication between a server that accepts multiple clients.
"""
from testplan.common.utils.context import context

from testplan.testing.multitest import MultiTest, testsuite, testcase
from testplan.testing.multitest.driver.tcp import TCPServer, TCPClient


@testsuite
class TCPTestsuite:
    """TCP tests for a server and 2 clients."""

    def __init__(self):
        self._conn_idx = {}

    def setup(self, env):
        """Will be executed before the testcase."""
        # Client 1 connects, server accepts and stores the connection index,
        env.client1.connect()
        self._conn_idx["client1"] = env.server.accept_connection()

        # Client 2 connects, server accepts and stores the connection index,
        env.client2.connect()
        self._conn_idx["client2"] = env.server.accept_connection()

    @testcase
    def send_and_receive_msg(self, env, result):
        """
        The TCP communication is the following:
            1. Client 1 sends a message.
            2. Client 2 sends a message:
            3. Server receives client 1 message.
            4. Server responds to client 1.
            5. Server receives client 2 message.
            6. Server responds to client 2.
        """
        idx1 = self._conn_idx["client1"]
        idx2 = self._conn_idx["client2"]

        msg1 = env.client1.cfg.name
        result.log("Client1 is sending: {}".format(msg1))
        bytes_sent1 = env.client1.send_text(msg1)

        msg2 = env.client2.cfg.name
        result.log("Client2 is sending: {}".format(msg2))
        bytes_sent2 = env.client2.send_text(msg2)

        received = env.server.receive_text(size=bytes_sent1, conn_idx=idx1)
        result.equal(received, msg1, "Server received")

        response = "Hello {}".format(received)
        result.log("Server is responding: {}".format(response))
        resp_size = env.server.send_text(response, conn_idx=idx1)
        result.equal(
            env.client1.receive_text(size=resp_size),
            response,
            "Client1 received",
        )

        received = env.server.receive_text(size=bytes_sent2, conn_idx=idx2)
        result.equal(received, msg2, "Server received")

        response = "Hello {}".format(received)
        result.log("Server is responding: {}".format(response))
        resp_size = env.server.send_text(response, conn_idx=idx2)
        result.equal(
            env.client2.receive_text(size=resp_size),
            response,
            "Client2 received",
        )

    @testcase
    def reconnect_a_client(self, env, result):
        """
        Tests the ability to reconnect a client within the testcase.
        After reconnection, the server accepts the new connection
        and assignes a new connection index for this client.
        """
        prev_idx = self._conn_idx["client1"]
        env.client1.reconnect()
        self._conn_idx["client1"] = env.server.accept_connection()
        new_idx = self._conn_idx["client1"]

        result.gt(new_idx, prev_idx, "Client has new connection index")
        msg = "Connection old index: {}, new index: {}".format(
            prev_idx, new_idx
        )
        bytes_sent = env.client1.send_text(msg)
        # Default conn_idx tp receive is the most recent.
        received = env.server.receive_text(size=bytes_sent)
        result.log(received)


def get_multitest(name):
    """
    Creates and returns a new MultiTest instance to be added to the plan.
    The environment is a server and 2 clients connecting using the context
    functionality that retrieves host/port of the server after is started.
    """
    test = MultiTest(
        name=name,
        suites=[TCPTestsuite()],
        environment=[
            TCPServer(name="server"),
            TCPClient(
                name="client1",
                host=context("server", "{{host}}"),
                port=context("server", "{{port}}"),
                connect_at_start=False,
            ),
            TCPClient(
                name="client2",
                host=context("server", "{{host}}"),
                port=context("server", "{{port}}"),
                connect_at_start=False,
            ),
        ],
    )
    return test

ZMQ

Required files:

test_plan.py

#!/usr/bin/env python
"""
Testplan to run all ZMQ examples.
"""

import sys

from testplan import test_plan
from testplan.report.testing.styles import Style, StyleEnum
import zmq_pair_connection
import zmq_publish_subscribe_connection

OUTPUT_STYLE = Style(StyleEnum.ASSERTION_DETAIL, StyleEnum.ASSERTION_DETAIL)


@test_plan(
    name="ZMQConnections",
    pdf_path="report.pdf",
    stdout_style=OUTPUT_STYLE,
    pdf_style=OUTPUT_STYLE,
)
def main(plan):
    plan.add(zmq_pair_connection.get_multitest("ZMQPAIRConnection"))
    plan.add(
        zmq_publish_subscribe_connection.get_multitest("ZMQPUBSUBConnection")
    )


if __name__ == "__main__":
    res = main()
    print("Exiting code: {}".format(res.exit_code))
    sys.exit(res.exit_code)

zmq_pair_connection.py

"""
Example of ZMQ Pair servers and clients.
"""

import zmq

from testplan.common.utils.context import context

from testplan.testing.multitest import MultiTest, testsuite, testcase
from testplan.testing.multitest.driver.zmq import ZMQServer, ZMQClient


@testsuite
class ZMQTestsuite:
    def setup(self, env):
        self.timeout = 5

    @testcase
    def send_and_receive_msg(self, env, result):
        # This test demonstrates sending and receiving messages in
        # both directions between a ZMQ client and server.
        #
        # The client sends a message to the server. Messages must have a b
        # before them to ensure they are sent as bytes in both python 2 and 3.
        msg = b"Hello server"
        result.log("Client is sending: {}".format(msg))
        env.client.send(data=msg, timeout=self.timeout)

        # The server receives this message.
        received = env.server.receive(timeout=self.timeout)
        result.equal(received, msg, "Server received")

        # The server sends a response to the client.
        resp = b"Hello client"
        result.log("Server is responding: {}".format(resp))
        env.server.send(data=resp, timeout=self.timeout)

        # The client receives this response.
        received = env.client.receive(timeout=self.timeout)
        result.equal(received, resp, "Client received")


def get_multitest(name):
    test = MultiTest(
        name=name,
        suites=[ZMQTestsuite()],
        environment=[
            # The server message pattern is defined as ZMQ PAIR.
            ZMQServer(
                name="server",
                host="127.0.0.1",
                port=0,
                message_pattern=zmq.PAIR,
            ),
            # The client message pattern is defined as ZMQ PAIR.
            ZMQClient(
                name="client",
                hosts=[context("server", "{{host}}")],
                ports=[context("server", "{{port}}")],
                message_pattern=zmq.PAIR,
            ),
        ],
    )
    return test

zmq_publish_subscribe_connection.py

"""
Example of ZMQ Publish servers and Subscribe clients.
"""

import time

import zmq

from testplan.testing.multitest import MultiTest, testsuite, testcase
from testplan.testing.multitest.driver.zmq import ZMQServer, ZMQClient

from testplan.common.utils.context import context


def after_start(env):
    # The subscribe client blocks all messages by default, the subscribe method
    # allows messages with a prefix that matches the topic_filter. Therefore an
    # empty string allows all messages through.
    env.client1.subscribe(topic_filter=b"")
    env.client2.subscribe(topic_filter=b"")
    # The ZMQ Subscribe client takes a bit longer to actually connect, no
    # connection results in dropped messages There is no way to verify the
    # connection currently so we add a small delay after start.
    time.sleep(1)


@testsuite
class ZMQTestsuite:
    def setup(self, env):
        self.timeout = 5

    # The clients must be flushed after each test to remove any extra messages.
    # This would occur when running many_publish_one_subscribe before
    # one_publish_many_subscribe on client 2.
    def post_testcase(self, name, env, result):
        env.client1.flush()
        env.client2.flush()
        # As above the sleep is to verify the clients have reconnected.
        time.sleep(1)

    @testcase
    def many_publish_one_subscribe(self, env, result):
        # Many publish servers send a message each to one subscription client as
        # shown in the diagram below:
        #
        # Server1 ---msg1---+
        #                   |
        #                   +---msg1 & msg2---> Client1
        #                   |
        # Server2 ---msg2---+
        #
        # Server 1 sends a unique message to client 1.
        msg1 = b"Hello 1"
        result.log("Server 1 is sending: {}".format(msg1))
        env.server1.send(data=msg1, timeout=self.timeout)

        # Server 2 sends a unique message to client 1.
        msg2 = b"Hello 2"
        result.log("Server 2 is sending: {}".format(msg2))
        env.server2.send(data=msg2, timeout=self.timeout)

        # Client 1 receives it's first message.
        received1 = env.client1.receive(timeout=self.timeout)

        # Client 1 receives it's second message.
        received2 = env.client1.receive(timeout=self.timeout)

        # Check the sent messages are the same as the received messages. Note
        # the messages may arrive in a different order.
        sent_msgs = set([msg1, msg2])
        received_msgs = set([received1, received2])
        result.equal(received_msgs, sent_msgs, "Client 1 received")

    @testcase
    def one_publish_many_subscribe(self, env, result):
        # One publish server sends messages to many subscription clients as
        # shown in the diagram below:
        #
        #                  +---msg---> Client1
        #                  |
        # Server1 ---msg---+
        #                  |
        #                  +---msg---> Client2
        #
        # Server 1 sends a unique message to the clients it is connected to
        # (clients 1 & 2).
        msg = b"Hello 3"
        result.log("Server 1 is sending: {}".format(msg))
        env.server1.send(data=msg, timeout=self.timeout)

        # Client 1 receives message from server 1.
        received1 = env.client1.receive(timeout=self.timeout)
        result.equal(received1, msg, "Client 1 received")

        # Client 2 receives message from server 1.
        received2 = env.client2.receive(timeout=self.timeout)
        result.equal(received2, msg, "Client 2 received")


def get_multitest(name):
    # The environment contains two ZMQServers and two ZMQClients connected as
    # in the diagrams below. This allows us to send messages from one publish
    # server to many subscription clients and from many subscription clients to
    # one publish server as in the examples above.
    #
    #               +------> Client1
    #               |
    # Server1 ------+
    #               |
    #               +------> Client2
    #
    # Server2 -------------> Client1
    test = MultiTest(
        name=name,
        suites=[ZMQTestsuite()],
        environment=[
            # Both server's message patterns are defined as ZMQ
            # PUB.
            ZMQServer(
                name="server1",
                host="127.0.0.1",
                port=0,
                message_pattern=zmq.PUB,
            ),
            ZMQServer(
                name="server2",
                host="127.0.0.1",
                port=0,
                message_pattern=zmq.PUB,
            ),
            # Both client's message patterns are defined as ZMQ
            # SUB.
            ZMQClient(
                name="client1",
                hosts=[
                    context("server1", "{{host}}"),
                    context("server2", "{{host}}"),
                ],
                ports=[
                    context("server1", "{{port}}"),
                    context("server2", "{{port}}"),
                ],
                message_pattern=zmq.SUB,
            ),
            ZMQClient(
                name="client2",
                hosts=[context("server1", "{{host}}")],
                ports=[context("server1", "{{port}}")],
                message_pattern=zmq.SUB,
            ),
        ],
        after_start=after_start,
    )
    return test