Source code for testplan.testing.multitest.driver.tcp.client

"""TCPClient driver classes."""

import socket
from typing import Union, Tuple, Optional

from schema import Or
from testplan.common.utils import networking
from testplan.common.utils.timing import TimeoutException, TimeoutExceptionInfo
from testplan.common.config import ConfigOption
from testplan.common.utils.context import expand, ContextValue
from testplan.common.utils.sockets import Client

from ..base import Driver, DriverConfig


[docs]class TCPClientConfig(DriverConfig): """ Configuration object for :py:class:`~testplan.testing.multitest.driver.tcp.client.TCPClient` driver. """
[docs] @classmethod def get_options(cls): """ Schema for options validation and assignment of default values. """ return { "host": Or(str, ContextValue), "port": Or(int, str, ContextValue), ConfigOption("interface", default=None): Or(None, tuple), ConfigOption("connect_at_start", default=True): bool, }
[docs]class TCPClient(Driver): """ TCP client driver. This is built on top of the :py:class:`testplan.common.utils.sockets.client.Client` class, which provides equivalent functionality and may be used outside of MultiTest. {emphasized_members_docs} :param name: Name of TCPClient. :param host: Target host name. This can be a :py:class:`~testplan.common.utils.context.ContextValue` and will be expanded on runtime. :param port: Target port number. This can be a :py:class:`~testplan.common.utils.context.ContextValue` and will be expanded on runtime. :param interface: Interface to bind to. :param connect_at_start: Connect to server on start. Default: True Also inherits all :py:class:`~testplan.testing.multitest.driver.base.Driver` options. """ CONFIG = TCPClientConfig def __init__( self, name: str, host: Union[str, ContextValue], port: Union[int, str, ContextValue], interface: Union[Tuple[str, int], None] = None, connect_at_start: bool = True, **options ): options.update(self.filter_locals(locals())) super(TCPClient, self).__init__(**options) self._host: Optional[str] = None self._port: Optional[int] = None self._client = None self._server_host = None self._server_port = None @property def host(self) -> str: """Target host name.""" return self._host @property def port(self) -> int: """Client port number assigned.""" return self._port @property def server_port(self) -> int: return self._server_port
[docs] def connect(self) -> None: """ Connect client. """ self._client.connect() self._host, self._port = self._client.address
[docs] def send_text(self, msg: str, standard: str = "utf-8") -> int: """ Encodes to bytes and calls :py:meth:`TCPClient.send <testplan.testing.multitest.driver.tcp.client.TCPClient.send>`. """ return self.send(msg.encode(standard))
[docs] def send(self, msg: bytes) -> int: """ Sends bytes. :param msg: Message to be sent :return: Number of bytes sent """ return self._client.send(msg)[1]
[docs] def send_tsp(self, msg: bytes) -> Tuple[float, int]: """ Sends bytes and returns also timestamp sent. :param msg: Message to be sent :return: Timestamp when msg sent (in microseconds from epoch) and number of bytes sent """ return self._client.send(msg)
[docs] def receive_text(self, standard: str = "utf-8", **kwargs) -> str: """ Calls :py:meth:`TCPClient.receive <testplan.testing.multitest.driver.tcp.server.TCPClient.receive>` and decodes received bytes. """ return self.receive(**kwargs).decode(standard)
[docs] def receive(self, size: int = 1024, timeout: int = 30) -> Optional[bytes]: """Receive bytes from the given connection.""" received = None timeout_info = TimeoutExceptionInfo() try: received = self._client.receive(size, timeout=timeout or 0) except socket.timeout: if timeout is not None: raise TimeoutException( "Timed out waiting for message on {0}. {1}".format( self.cfg.name, timeout_info.msg() ) ) return received
[docs] def reconnect(self) -> None: """Client reconnect.""" self.close() self.connect()
[docs] def starting(self) -> None: """Start the TCP client and optionally connect to host/post.""" super(TCPClient, self).starting() self._server_host = expand(self.cfg.host, self.context) self._server_port = networking.port_to_int( expand(self.cfg.port, self.context) ) self._client = Client( host=self._server_host, port=self._server_port, interface=self.cfg.interface, ) if self.cfg.connect_at_start: self.connect()
[docs] def stopping(self) -> None: """Close the client connection.""" super(TCPClient, self).stopping() self.close()
[docs] def close(self) -> None: """ Close connection. """ if self._client: self._client.close()
[docs] def aborting(self) -> None: """Abort logic that stops the client.""" self.close()