ZMQ¶
- Required files:
test_plan.py¶
#!/usr/bin/env python
"""
Testplan to run all ZMQ examples.
"""
import sys
from testplan import test_plan
from testplan.report.testing.styles import Style, StyleEnum
import zmq_pair_connection
import zmq_publish_subscribe_connection
OUTPUT_STYLE = Style(StyleEnum.ASSERTION_DETAIL, StyleEnum.ASSERTION_DETAIL)
@test_plan(
name="ZMQConnections",
pdf_path="report.pdf",
stdout_style=OUTPUT_STYLE,
pdf_style=OUTPUT_STYLE,
)
def main(plan):
plan.add(zmq_pair_connection.get_multitest("ZMQPAIRConnection"))
plan.add(
zmq_publish_subscribe_connection.get_multitest("ZMQPUBSUBConnection")
)
if __name__ == "__main__":
res = main()
print("Exiting code: {}".format(res.exit_code))
sys.exit(res.exit_code)
zmq_pair_connection.py¶
"""
Example of ZMQ Pair servers and clients.
"""
import zmq
from testplan.common.utils.context import context
from testplan.testing.multitest import MultiTest, testsuite, testcase
from testplan.testing.multitest.driver.zmq import ZMQServer, ZMQClient
@testsuite
class ZMQTestsuite:
def setup(self, env):
self.timeout = 5
@testcase
def send_and_receive_msg(self, env, result):
# This test demonstrates sending and receiving messages in
# both directions between a ZMQ client and server.
#
# The client sends a message to the server. Messages must have a b
# before them to ensure they are sent as bytes in both python 2 and 3.
msg = b"Hello server"
result.log("Client is sending: {}".format(msg))
env.client.send(data=msg, timeout=self.timeout)
# The server receives this message.
received = env.server.receive(timeout=self.timeout)
result.equal(received, msg, "Server received")
# The server sends a response to the client.
resp = b"Hello client"
result.log("Server is responding: {}".format(resp))
env.server.send(data=resp, timeout=self.timeout)
# The client receives this response.
received = env.client.receive(timeout=self.timeout)
result.equal(received, resp, "Client received")
def get_multitest(name):
test = MultiTest(
name=name,
suites=[ZMQTestsuite()],
environment=[
# The server message pattern is defined as ZMQ PAIR.
ZMQServer(
name="server",
host="127.0.0.1",
port=0,
message_pattern=zmq.PAIR,
),
# The client message pattern is defined as ZMQ PAIR.
ZMQClient(
name="client",
hosts=[context("server", "{{host}}")],
ports=[context("server", "{{port}}")],
message_pattern=zmq.PAIR,
),
],
)
return test
zmq_publish_subscribe_connection.py¶
"""
Example of ZMQ Publish servers and Subscribe clients.
"""
import time
import zmq
from testplan.testing.multitest import MultiTest, testsuite, testcase
from testplan.testing.multitest.driver.zmq import ZMQServer, ZMQClient
from testplan.common.utils.context import context
def after_start(env):
# The subscribe client blocks all messages by default, the subscribe method
# allows messages with a prefix that matches the topic_filter. Therefore an
# empty string allows all messages through.
env.client1.subscribe(topic_filter=b"")
env.client2.subscribe(topic_filter=b"")
# The ZMQ Subscribe client takes a bit longer to actually connect, no
# connection results in dropped messages There is no way to verify the
# connection currently so we add a small delay after start.
time.sleep(1)
@testsuite
class ZMQTestsuite:
def setup(self, env):
self.timeout = 5
# The clients must be flushed after each test to remove any extra messages.
# This would occur when running many_publish_one_subscribe before
# one_publish_many_subscribe on client 2.
def post_testcase(self, name, env, result):
env.client1.flush()
env.client2.flush()
# As above the sleep is to verify the clients have reconnected.
time.sleep(1)
@testcase
def many_publish_one_subscribe(self, env, result):
# Many publish servers send a message each to one subscription client as
# shown in the diagram below:
#
# Server1 ---msg1---+
# |
# +---msg1 & msg2---> Client1
# |
# Server2 ---msg2---+
#
# Server 1 sends a unique message to client 1.
msg1 = b"Hello 1"
result.log("Server 1 is sending: {}".format(msg1))
env.server1.send(data=msg1, timeout=self.timeout)
# Server 2 sends a unique message to client 1.
msg2 = b"Hello 2"
result.log("Server 2 is sending: {}".format(msg2))
env.server2.send(data=msg2, timeout=self.timeout)
# Client 1 receives it's first message.
received1 = env.client1.receive(timeout=self.timeout)
# Client 1 receives it's second message.
received2 = env.client1.receive(timeout=self.timeout)
# Check the sent messages are the same as the received messages. Note
# the messages may arrive in a different order.
sent_msgs = set([msg1, msg2])
received_msgs = set([received1, received2])
result.equal(received_msgs, sent_msgs, "Client 1 received")
@testcase
def one_publish_many_subscribe(self, env, result):
# One publish server sends messages to many subscription clients as
# shown in the diagram below:
#
# +---msg---> Client1
# |
# Server1 ---msg---+
# |
# +---msg---> Client2
#
# Server 1 sends a unique message to the clients it is connected to
# (clients 1 & 2).
msg = b"Hello 3"
result.log("Server 1 is sending: {}".format(msg))
env.server1.send(data=msg, timeout=self.timeout)
# Client 1 receives message from server 1.
received1 = env.client1.receive(timeout=self.timeout)
result.equal(received1, msg, "Client 1 received")
# Client 2 receives message from server 1.
received2 = env.client2.receive(timeout=self.timeout)
result.equal(received2, msg, "Client 2 received")
def get_multitest(name):
# The environment contains two ZMQServers and two ZMQClients connected as
# in the diagrams below. This allows us to send messages from one publish
# server to many subscription clients and from many subscription clients to
# one publish server as in the examples above.
#
# +------> Client1
# |
# Server1 ------+
# |
# +------> Client2
#
# Server2 -------------> Client1
test = MultiTest(
name=name,
suites=[ZMQTestsuite()],
environment=[
# Both server's message patterns are defined as ZMQ
# PUB.
ZMQServer(
name="server1",
host="127.0.0.1",
port=0,
message_pattern=zmq.PUB,
),
ZMQServer(
name="server2",
host="127.0.0.1",
port=0,
message_pattern=zmq.PUB,
),
# Both client's message patterns are defined as ZMQ
# SUB.
ZMQClient(
name="client1",
hosts=[
context("server1", "{{host}}"),
context("server2", "{{host}}"),
],
ports=[
context("server1", "{{port}}"),
context("server2", "{{port}}"),
],
message_pattern=zmq.SUB,
),
ZMQClient(
name="client2",
hosts=[context("server1", "{{host}}")],
ports=[context("server1", "{{port}}")],
message_pattern=zmq.SUB,
),
],
after_start=after_start,
)
return test