Source code for testplan.testing.multitest.driver.connection.connection_extractor

import socket
import sys
from typing import List
import psutil

from testplan.common.utils.logger import TESTPLAN_LOGGER
from testplan.testing.multitest.driver.connection.base import (
    BaseConnectionExtractor,
    Direction,
)
from testplan.testing.multitest.driver.connection.connection_info import (
    Protocol,
    PortConnectionInfo,
    FileConnectionInfo,
)


SOCKET_CONNECTION_MAP = {
    socket.SocketKind.SOCK_STREAM: Protocol.TCP,
    socket.SocketKind.SOCK_DGRAM: Protocol.UDP,
}

NETWORK_CONNECTION_MAP = {
    Protocol.TCP: socket.SocketKind.SOCK_STREAM,
    Protocol.UDP: socket.SocketKind.SOCK_DGRAM,
}


[docs] class ConnectionExtractor(BaseConnectionExtractor): def __init__(self, protocol: Protocol, direction: Direction) -> None: self.protocol = protocol self.direction = direction
[docs] def extract_connection(self, driver) -> List[PortConnectionInfo]: return [ PortConnectionInfo( protocol=self.protocol, direction=self.direction, identifier=driver.connection_identifier, port=getattr(driver, "local_port", None), host=getattr(driver, "local_host", None), ) ]
[docs] class SubprocessPortConnectionExtractor(BaseConnectionExtractor): def __init__( self, connections_to_check: List[Protocol] = None, connections_to_ignore: List[Protocol] = None, ): if not connections_to_check: connections_to_check = [Protocol.TCP, Protocol.UDP] if not connections_to_ignore: connections_to_ignore = [] # map the protocols to SocketKind for idx, protocol in enumerate(connections_to_check): connections_to_check[idx] = NETWORK_CONNECTION_MAP[protocol] for idx, protocol in enumerate(connections_to_ignore): connections_to_ignore[idx] = NETWORK_CONNECTION_MAP[protocol] connections_to_ignore.append(socket.SocketKind.SOCK_SEQPACKET) self.connections_to_check = connections_to_check self.connections_to_ignore = connections_to_ignore
[docs] def extract_connection(self, driver) -> List[PortConnectionInfo]: connections = [] try: proc = psutil.Process(driver.pid) listening_addresses = [] # update to net_connections when psutil is updated to 6.0.0 for conn in proc.connections(): # first loop to determine which is listening if conn.status == psutil.CONN_LISTEN: # TODO: account for host types listening_addresses.append(conn.laddr.port) for conn in proc.connections(): # second loop to get connections if ( sys.platform != "win32" and conn.family == socket.AddressFamily.AF_UNIX ): # ignore unix sockets for now continue if ( conn.type not in self.connections_to_check or conn.type in self.connections_to_ignore ): continue if conn.status == psutil.CONN_NONE: # UDP sockets connections.append( PortConnectionInfo( protocol=SOCKET_CONNECTION_MAP[conn.type], identifier=conn.laddr.port, direction=Direction.LISTENING, port=conn.laddr.port, host=conn.laddr.ip, ) ) elif conn.status == psutil.CONN_ESTABLISHED: if conn.laddr.port in listening_addresses: connections.append( PortConnectionInfo( protocol=SOCKET_CONNECTION_MAP[conn.type], identifier=conn.laddr.port, direction=Direction.LISTENING, port=conn.laddr.port, host=conn.laddr.ip, ) ) else: connections.append( PortConnectionInfo( protocol=SOCKET_CONNECTION_MAP[conn.type], identifier=conn.raddr.port, direction=Direction.CONNECTING, port=conn.laddr.port, host=conn.laddr.ip, ) ) except ( psutil.NoSuchProcess, psutil.AccessDenied, ) as err: TESTPLAN_LOGGER.info( f"Error getting metadata for driver {str(driver)}: {err}" ) return connections
[docs] class SubprocessFileConnectionExtractor(BaseConnectionExtractor): def __init__(self, files_to_ignore: List[str] = None): if not files_to_ignore: files_to_ignore = ["stdout", "stderr"] self.files_to_ignore = files_to_ignore
[docs] def extract_connection(self, driver) -> List[FileConnectionInfo]: connections = [] try: proc = psutil.Process(driver.pid) for open_file in proc.open_files(): if open_file.path.split("/")[-1] in self.files_to_ignore: continue if sys.platform == "win32": # psutil does not show open mode in windows, assume its read/write connections.append( FileConnectionInfo( protocol=Protocol.FILE, identifier=open_file.path, direction=Direction.LISTENING, ) ) connections.append( FileConnectionInfo( protocol=Protocol.FILE, identifier=open_file.path, direction=Direction.CONNECTING, ) ) continue if open_file.mode in ["r", "r+", "a+"]: connections.append( FileConnectionInfo( protocol=Protocol.FILE, identifier=open_file.path, direction=Direction.LISTENING, ) ) if open_file.mode in ["w", "a", "r+", "a+"]: connections.append( FileConnectionInfo( protocol=Protocol.FILE, identifier=open_file.path, direction=Direction.CONNECTING, ) ) except ( psutil.NoSuchProcess, psutil.AccessDenied, ) as err: TESTPLAN_LOGGER.info( f"Error getting metadata for driver {str(driver)}: {err}" ) return connections