Source code for testplan.testing.multitest.driver.tcp.server

"""
TCPServer driver classes.
"""

import socket
from typing import Optional, Union

from schema import Or
from testplan.common.config import ConfigOption
from testplan.common.utils import networking
from testplan.common.utils.context import ContextValue, expand
from testplan.common.utils.documentation_helper import emphasized
from testplan.common.utils.sockets import Server
from testplan.common.utils.timing import TimeoutException, TimeoutExceptionInfo

from ..base import Driver, DriverConfig


[docs]class TCPServerConfig(DriverConfig): """ Configuration object for :py:class:`~testplan.testing.multitest.driver.tcp.server.TCPServer` driver. """
[docs] @classmethod def get_options(cls): """ Schema for options validation and assignment of default values. """ return { ConfigOption("host", default="localhost"): str, ConfigOption("port", default=0): Or(int, str, ContextValue), }
[docs]class TCPServer(Driver): """ Driver for a server that can send and receive messages over TCP. Supports multiple connections. This is built on top of the :py:class:`testplan.common.utils.sockets.server.Server` class, which provides equivalent functionality and may be used outside of MultiTest. {emphasized_members_docs} :param name: Name of TCPServer. :type name: ``str`` :param host: Host name to bind to. Default: 'localhost' :type host: ``str`` :param port: Port number to bind to. Default: 0 (Random port) :type port: ``int`` Also inherits all :py:class:`~testplan.testing.multitest.driver.base.Driver` options. """ CONFIG = TCPServerConfig def __init__( self, name: str, host: Optional[Union[str, ContextValue]] = "localhost", port: Optional[Union[int, str, ContextValue]] = 0, **options ): options.update(self.filter_locals(locals())) super(TCPServer, self).__init__(**options) self._host: str = None self._port: int = None self._server = None @emphasized @property def host(self): """Target host name.""" return self._host @emphasized @property def port(self): """Port number assigned.""" return self._port @property def socket(self): """ Returns the underlying ``socket`` object """ return self._server.socket
[docs] def accept_connection(self, timeout=10): """Doc from Server.""" return self._server.accept_connection(timeout=timeout)
accept_connection.__doc__ = Server.accept_connection.__doc__
[docs] def close_connection(self, conn_idx): """ Docstring from Server.close_connection """ self._server.close_connection(conn_idx)
close_connection.__doc__ = Server.close_connection.__doc__
[docs] def send_text(self, msg, standard="utf-8", **kwargs): """ Encodes to bytes and calls :py:meth:`TCPServer.send <testplan.testing.multitest.driver.tcp.server.TCPServer.send>`. """ return self.send(bytes(msg.encode(standard)), **kwargs)
[docs] def send(self, msg, conn_idx=None, timeout=30): """Doc from Server.""" return self._server.send(msg=msg, conn_idx=conn_idx, timeout=timeout)
send.__doc__ = Server.send.__doc__
[docs] def receive_text(self, standard="utf-8", **kwargs): """ Calls :py:meth:`TCPServer.receive <testplan.testing.multitest.driver.tcp.server.TCPServer.receive>` and decodes received bytes. """ return self.receive(**kwargs).decode(standard)
[docs] def receive(self, size=None, conn_idx=None, timeout=30): """Receive bytes from the given connection.""" received = None timeout_info = TimeoutExceptionInfo() try: receive_kwargs = dict(conn_idx=conn_idx, timeout=timeout or 0) if size is None: receive_kwargs["size"] = 1024 receive_kwargs["wait_full_size"] = False else: receive_kwargs["size"] = size receive_kwargs["wait_full_size"] = True received = self._server.receive(**receive_kwargs) except socket.timeout: if timeout is not None: raise TimeoutException( "Timed out waiting for message on {0}. {1}".format( self.cfg.name, timeout_info.msg() ) ) return received
[docs] def starting(self): """Starts the TCP server.""" super(TCPServer, self).starting() self._server = Server( host=self.cfg.host, port=networking.port_to_int(expand(self.cfg.port, self.context)), ) self._server.bind() self._server.serve() self._host = self.cfg.host self._port = self._server.port self.logger.info( "%s listening on %s:%s", self, self.host, self.port, )
def _stop_logic(self): if self._server: self._server.close()
[docs] def stopping(self): """Stops the TCP server.""" super(TCPServer, self).stopping() self._stop_logic()
[docs] def aborting(self): """Abort logic that stops the server.""" self._stop_logic()