testplan.testing.multitest.driver.connection package

Submodules

testplan.testing.multitest.driver.connection.base module

class testplan.testing.multitest.driver.connection.base.BaseConnectionExtractor[source]

Bases: object

extract_connection(driver) List[BaseConnectionInfo][source]
class testplan.testing.multitest.driver.connection.base.BaseConnectionInfo(protocol: str, identifier: str, direction: testplan.testing.multitest.driver.connection.base.Direction)[source]

Bases: object

property connection_rep
direction: Direction
identifier: str
promote_to_connection()[source]
protocol: str
class testplan.testing.multitest.driver.connection.base.BaseDriverConnectionGroup(connection_rep: str, in_drivers: DefaultDict[str, Set], out_drivers: DefaultDict[str, Set])[source]

Bases: object

Base class to show connection between drivers. Each specific type (protocol) of connection should have its own subclass.

Stores the drivers involved in the connection as well as the logic of whether to add a driver into the connection.

in_drivers store incoming connections (e.g a server listening for connections), out_drivers store outgoing connections (e.g a client connecting to a server).

add_driver_if_in_connection(driver_name: str, driver_connection_info: BaseConnectionInfo)[source]
connection_rep: str
classmethod from_connection_info(driver_connection_info: BaseConnectionInfo)[source]
in_drivers: DefaultDict[str, Set]
out_drivers: DefaultDict[str, Set]
should_include()[source]
class testplan.testing.multitest.driver.connection.base.Direction(*values)[source]

Bases: Enum

CONNECTING = 'connecting'
LISTENING = 'listening'

testplan.testing.multitest.driver.connection.connection_info module

class testplan.testing.multitest.driver.connection.connection_info.DriverConnectionGraph(drivers)[source]

Bases: object

add_connection(driver_name: str, conn_info: BaseConnectionInfo)[source]
property edges
property nodes
set_nodes_and_edges()[source]
class testplan.testing.multitest.driver.connection.connection_info.FileConnectionInfo(protocol: str, identifier: str, direction: Direction)[source]

Bases: BaseConnectionInfo

ConnectionInfo for file-based communication between drivers.

property connection_rep
direction: Direction
identifier: str
promote_to_connection()[source]
protocol: str
class testplan.testing.multitest.driver.connection.connection_info.FileDriverConnectionGroup(connection_rep: str, in_drivers: DefaultDict[str, Set], out_drivers: DefaultDict[str, Set])[source]

Bases: BaseDriverConnectionGroup

ConnectionGroup for file-based communication between drivers.

Stores the drivers involved in the connection as well as the logic of whether to add a driver into the connection.

add_driver_if_in_connection(driver_name: str, driver_connection_info: FileConnectionInfo)[source]
connection_rep: str
classmethod from_connection_info(driver_connection_info: BaseConnectionInfo)
in_drivers: DefaultDict[str, Set]
out_drivers: DefaultDict[str, Set]
should_include()
class testplan.testing.multitest.driver.connection.connection_info.PortConnectionInfo(protocol: str, identifier: str, direction: Direction, port: int | None = None, host: str | None = None)[source]

Bases: BaseConnectionInfo

ConnectionInfo for port communication (e.g TCP/UDP) between drivers.

property connection_rep
direction: Direction
host: str | None = None
identifier: str
port: int | None = None
promote_to_connection()[source]
protocol: str
class testplan.testing.multitest.driver.connection.connection_info.PortDriverConnectionGroup(connection_rep: str, in_drivers: DefaultDict[str, Set], out_drivers: DefaultDict[str, Set])[source]

Bases: BaseDriverConnectionGroup

ConnectionGroup for port communication (e.g TCP/UDP) between drivers.

Stores the drivers involved in the connection as well as the logic of whether to add a driver into the connection.

add_driver_if_in_connection(driver_name: str, driver_connection_info: PortConnectionInfo)[source]
connection_rep: str
classmethod from_connection_info(driver_connection_info: BaseConnectionInfo)
in_drivers: DefaultDict[str, Set]
out_drivers: DefaultDict[str, Set]
should_include()
class testplan.testing.multitest.driver.connection.connection_info.Protocol[source]

Bases: object

FILE = 'file'
TCP = 'tcp'
UDP = 'udp'

testplan.testing.multitest.driver.connection.connection_extractor module

class testplan.testing.multitest.driver.connection.connection_extractor.ConnectionExtractor(protocol: Protocol, direction: Direction)[source]

Bases: BaseConnectionExtractor

extract_connection(driver) List[PortConnectionInfo][source]
class testplan.testing.multitest.driver.connection.connection_extractor.SubprocessFileConnectionExtractor(files_to_ignore: List[str] = None)[source]

Bases: BaseConnectionExtractor

extract_connection(driver) List[FileConnectionInfo][source]
class testplan.testing.multitest.driver.connection.connection_extractor.SubprocessPortConnectionExtractor(connections_to_check: List[Protocol] = None, connections_to_ignore: List[Protocol] = None)[source]

Bases: BaseConnectionExtractor

extract_connection(driver) List[PortConnectionInfo][source]

Module contents