Source code for testplan.common.utils.sockets.server

"""TCP Server module."""

import time
import socket
import select
import threading

from testplan.common.utils.timing import wait


[docs]class Server: """ A server that can send and receive messages based on socket interface. Supports multiple connections. :param host: The host address the server is bound to. :type host: ``str`` :param port: The port the server is bound to. :type port: ``str`` or ``int`` :param listen: Socket listen argument. :type listen: ``int`` """ def __init__(self, host="localhost", port=0, listen=1): self._input_host = host self._input_port = port self._listen = listen self._ip = None self._port = None self._listening = False self._server = None self._server_thread = None self._lock = threading.Lock() self._connection_by_fd = {} self._fds = {} self.active_connections = 0 self.accepted_connections = 0 @property def host(self): """Input host provided.""" return self._input_host @property def ip(self): """IP retrieved from socket.""" return self._ip @property def port(self): """Port retrieved after binding.""" return self._port @property def socket(self): """ Returns the underlying ``socket`` object """ return self._server
[docs] def bind(self): """Bind to a socket.""" self._server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) if self._input_port != 0: self._server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self._server.bind((self._input_host, self._input_port)) self._ip, self._port = self._server.getsockname()
[docs] def serve(self, loop_sleep=0.005, listening_timeout=5): """Start serving connections.""" self._server_thread = threading.Thread( target=self._serving, kwargs=dict(loop_sleep=loop_sleep) ) self._server_thread.daemon = True self._server_thread.start() wait(lambda: self._listening, listening_timeout, raise_on_timeout=True)
def _serving(self, loop_sleep=0.005): """Listen for new inbound connections.""" self._server.listen(self._listen) self._listening = True inputs = [self._server] outputs = [] while self._listening: try: readable, writable, exceptional = select.select( inputs, outputs, inputs ) except ValueError: for sock in inputs: # Remove the closed socks. if sock.fileno() == -1: inputs.remove(sock) continue for sock in readable: if sock is self._server: # New connection conn, client_addr = sock.accept() inputs.append(conn) self._connection_by_fd[conn.fileno()] = conn self._fds[self.active_connections] = conn.fileno() self.active_connections += 1 for sock in exceptional: inputs.remove(sock) sock.close() time.sleep(loop_sleep) self._remove_all_connections() try: self._server.shutdown(socket.SHUT_RDWR) except: pass self._server.close()
[docs] def accept_connection(self, timeout=10, accept_connection_sleep=0.1): """ Accepts a connection in the order in which they were received. Return the index of the connection, which can be used to send and receive messages using that connection. If no connection is already available or becomes available in the given timeout, then the method returns -1. :param timeout: Timeout to wait for receiving connection. :type timeout: ``int`` :param accept_connection_sleep: Sleep time to retry accept connection. :type accept_connection_sleep: ``float`` :return: Index of connection :rtype: ``int`` """ started = time.time() while True: if self.accepted_connections in self._fds: self.accepted_connections += 1 return self.accepted_connections - 1 if time.time() > started + timeout: return -1 time.sleep(accept_connection_sleep)
[docs] def close_connection(self, conn_idx): """ Unregister, close and remove connection with given connection index :param conn_idx: Connection index of connection to be removed :type conn_idx: ``int`` :return: ``None`` :rtype: ``NoneType`` """ fdesc = self._fds[conn_idx] self._connection_by_fd[fdesc].close() del self._connection_by_fd[fdesc] del self._fds[conn_idx]
[docs] def receive( self, size=1024, conn_idx=None, timeout=30, wait_full_size=True ): """ Receive a message of given size (number of bytes) from the given connection. :param size: Number of bytes to receive :type size: ``int`` :param conn_idx: Index of connection to receive from :type conn_idx: ``int`` :param timeout: timeout in seconds :type timeout: ``int`` :param wait_full_size: Wait until full size is received. :type wait_full_size: ``bool`` :return: message received :rtype: ``bytes`` """ conn_idx = self._validate_connection_idx(conn_idx) # Get file descriptor and details of connection fdesc = self._fds[conn_idx] connection = self._connection_by_fd[fdesc] connection.settimeout(timeout) if wait_full_size is False: connection.settimeout(0) msg = connection.recv(size) connection.settimeout(timeout) else: with self._lock: msg = b"" try: while len(msg) < size: new_msg = connection.recv(size - len(msg)) if not new_msg: raise Exception("Socket connection broken") msg += new_msg except socket.error: if timeout == 0: raise socket.timeout() raise return msg
[docs] def send(self, msg, conn_idx=None, timeout=30): """ Send the given message through the given connection. :param msg: message to be sent :type msg: ``bytes`` :param conn_idx: Index of connection to send to :type conn_idx: ``int`` :param timeout: Timeout in seconds for sending all bytes :type timeout: ``int`` :return: Number of bytes sent :rtype: ``int`` """ conn_idx = self._validate_connection_idx(conn_idx) connection = self._connection_by_fd[self._fds[conn_idx]] connection.settimeout(timeout) with self._lock: connection.sendall(msg) return len(msg)
[docs] def close(self): """Closes the server and listen thread.""" self._listening = False # self._serving may be stuck in select.select if self._server_thread: self._server_thread.join(timeout=0.1)
def _validate_connection_idx(self, conn_idx): """ Check if given connection index is valid. If this is None, then the connection defaults to the one and only existing active connection. If there are more active connections or the initial connection is no longer valid this will fail. :param conn_idx: Index of connection to send to :type conn_idx: ``int`` :return: Connection index to send message to :rtype: ``int`` """ if conn_idx is None: if self.accepted_connections > 1: conn_idx = self.accepted_connections - 1 else: conn_idx = 0 if self.accepted_connections == 0: raise Exception("No connection accepted") if conn_idx not in self._fds: raise Exception("Connection {} not active".format(conn_idx)) return conn_idx def _remove_all_connections(self): """ Unregister, close and remove all existing connections :return: ``None`` :rtype: ``NoneType`` """ for fdesc in self._connection_by_fd: self._connection_by_fd[fdesc].close() self._connection_by_fd = {} self._fds = {}