Source code for testplan.testing.multitest.driver.http.client

"""HTTPClient Driver."""

import time
import queue
from threading import Thread, Event
from typing import Any, Dict, List, Optional, Tuple, Union

try:
    from typing import Literal
except ImportError:
    from typing_extensions import Literal

import requests
from schema import Use, Or

from testplan.common.config import ConfigOption
from testplan.common.utils.context import expand, is_context, ContextValue
from testplan.common.utils.strings import slugify

from ..base import (
    Driver,
    DriverConfig,
)
from ..connection import (
    Direction,
    Protocol,
    ConnectionExtractor,
)


[docs] class HTTPClientConfig(DriverConfig): """ Configuration object for :py:class:`~testplan.testing.multitest.driver.http.client.HTTPClient` driver. """
[docs] @classmethod def get_options(cls) -> Dict[Any, Any]: """ Schema for options validation and assignment of default values. """ return { "host": Or(str, lambda x: is_context(x)), ConfigOption("port", default=None): Or( None, Use(int), lambda x: is_context(x) ), ConfigOption("protocol", default="http"): str, ConfigOption("timeout", default=5): Use(int), ConfigOption("interval", default=0.01): Use(float), }
[docs] class HTTPClient(Driver): """ Driver for a client that can connect to a server and send/receive messages using HTTP protocol. {emphasized_members_docs} :param name: Name of HTTPClient. :type name: ``str`` :param host: Hostname to connect to. :type host: ``str`` or ``ContextValue`` :param port: Port to connect to. If None URL won't specify a port. :type port: ``int`` or ``ContextValue`` :param protocol: Use HTTP or HTTPS protocol. :type protocol: ``str`` :param timeout: Number of seconds to wait for a request. :type timeout: ``int`` :param interval: Number of seconds to sleep whilst trying to receive a message. :type interval: ``int`` Also inherits all :py:class:`~testplan.testing.multitest.driver.base.Driver` options. """ CONFIG = HTTPClientConfig EXTRACTORS = [ConnectionExtractor(Protocol.TCP, Direction.CONNECTING)] def __init__( self, name: str, host: Union[str, ContextValue], port: Optional[Union[int, ContextValue]] = None, protocol: str = "http", timeout: int = 5, interval: float = 0.01, **options: Any, ) -> None: options.update(self.filter_locals(locals())) options.setdefault("file_logger", "{}.log".format(slugify(name))) super(HTTPClient, self).__init__(**options) self._host: Optional[str] = None self._port: Optional[Union[int, Literal[""]]] = None self.protocol: Optional[str] = None self.timeout: Optional[int] = None self.interval: Optional[float] = None self.responses: queue.Queue[Any] = queue.Queue() self.request_threads: List[Tuple[Thread, Event]] = [] @property def host(self) -> Optional[str]: """Target host name.""" return self._host @property def connection_identifier(self) -> Optional[Union[int, Literal[""]]]: return self.port @property def port(self) -> Optional[Union[int, Literal[""]]]: """Client port number assigned.""" return self._port
[docs] def starting(self) -> None: """ Start the HTTPClient. """ super(HTTPClient, self).starting() self._host = expand(self.cfg.host, self.context) context_port = expand(self.cfg.port, self.context, int) self._port = context_port if context_port else "" self.protocol = expand(self.cfg.protocol, self.context) self.timeout = self.cfg.timeout self.interval = self.cfg.interval self.responses = queue.Queue() port_ = ":{}".format(self.port) if self.port else self.port self.logger.info( "Started HTTPClient sending requests to %s://%s%s", self.protocol, self.host, port_, )
[docs] def stopping(self) -> None: """ Stop the HTTPClient. """ super(HTTPClient, self).stopping()
[docs] def aborting(self) -> None: """Abort logic that stops the client.""" super(HTTPClient, self).aborting()
def _send_request( self, method: str, api: str, drop_response: Event, timeout: int, **kwargs: Any, ) -> None: """ Send a request using the requests module. :param method: HTTP method to be used in request (e.g. GET, POST etc.). :type method: ``str`` :param api: API to send request to. :type api: ``str`` :param drop_response: Whether to drop the response message (called by flush). :type drop_response: ``threading._Event`` :param timeout: Number of seconds to wait for a request. :type timeout: ``int`` :param kwargs: Optional arguments for the request, look at the requests modules docs for these arguments. :type kwargs: Depends on the argument. """ http_method = getattr(requests, method, requests.get) api = api[1:] if api.startswith("/") else api url = "{protocol}://{host}{port}/{api}".format( protocol=self.protocol, host=self.host, port=":{}".format(self.port) if self.port else self.port, api=api, ) timeout = kwargs.pop("timeout", timeout) self.logger.info( "Sending %s request: %s", http_method.__name__.upper(), url ) response = http_method(url=url, timeout=timeout, **kwargs) if not drop_response.is_set(): self.responses.put(response)
[docs] def send(self, method: str, api: str, **kwargs: Any) -> None: """ Send a non blocking HTTP request. :param method: HTTP method to be used in request (e.g. GET, POST etc.). :type method: ``str`` :param api: API to send request to. :type api: ``str`` :param kwargs: Optional arguments for the request, look at the requests modules docs for these arguments. :type kwargs: Depends on the argument. """ drop_response = Event() request_thread = Thread( target=self._send_request, args=(method, api, drop_response, self.timeout), kwargs=kwargs, ) request_thread.daemon = True request_thread.start() self.request_threads.append((request_thread, drop_response))
[docs] def head(self, api: str, **kwargs: Any) -> None: """ Send HEAD request.ZMQClient :param api: API to send request to. :type api: ``str`` :param kwargs: Optional arguments for the request, look at the requests modules docs for these arguments. :type kwargs: Depends on the argument. """ self.send("head", api, **kwargs)
[docs] def get( self, api: str, params: Optional[Dict[str, Any]] = None, **kwargs: Any ) -> None: """ Send GET request. :param api: API to send request to. :type api: ``str`` :param params: Parameters to append to HTTP request after ?. :type params: ``dict`` :param kwargs: Optional arguments for the request, look at the requests modules docs for these arguments. :type kwargs: Depends on the argument. """ self.send("get", api, params=params, **kwargs)
[docs] def post( self, api: str, data: Optional[Any] = None, json: Optional[Any] = None, **kwargs: Any, ) -> None: """ Send POST request. :param api: API to send request to. :type api: ``str`` :param data: Dictionary to send in the body of the request. :type data: ``dict`` :param json: JSON data to send in the body of the request. :type json: ``dict`` :param kwargs: Optional arguments for the request, look at the requests modules docs for these arguments. :type kwargs: Depends on the argument. """ self.send("post", api, data=data, json=json, **kwargs)
[docs] def put(self, api: str, data: Optional[Any] = None, **kwargs: Any) -> None: """ Send PUT request. :param api: API to send request to. :type api: ``str`` :param data: Dictionary to send in the body of the request. :type data: ``dict`` :param kwargs: Optional arguments for the request, look at the requests modules docs for these arguments. :type kwargs: Depends on the argument. """ self.send("put", api, data=data, **kwargs)
[docs] def delete(self, api: str, **kwargs: Any) -> None: """ Send DELETE request. :param api: API to send request to. :type api: ``str`` :param kwargs: Optional arguments for the request, look at the requests modules docs for these arguments. :type kwargs: Depends on the argument. """ self.send("delete", api, **kwargs)
[docs] def patch( self, api: str, data: Optional[Any] = None, **kwargs: Any ) -> None: """ Send PATCH request. :param api: API to send request to. :type api: ``str`` :param data: Dictionary to send in the body of the request. :type data: ``dict`` :param kwargs: Optional arguments for the request, look at the requests modules docs for these arguments. :type kwargs: Depends on the argument. """ self.send("patch", api, data=data, **kwargs)
[docs] def options(self, api: str, **kwargs: Any) -> None: """ Send OPTIONS request. :param api: API to send request to. :type api: ``str`` :param kwargs: Optional arguments for the request, look at the requests modules docs for these arguments. :type kwargs: Depends on the argument. """ self.send("options", api, **kwargs)
[docs] def receive( self, timeout: Optional[int] = None ) -> Optional[requests.Response]: """ Wait to receive a response. :param timeout: Number of seconds to wait for a response, overrides timeout from init. :type timeout: ``int`` :return: A request response or ``None`` :rtype: ``requests.models.Response`` or ``NoneType`` """ effective_timeout: int = ( timeout if timeout is not None else self.timeout # type: ignore[assignment] ) deadline = effective_timeout + time.time() response = None while time.time() < deadline: try: response = self.responses.get(False) except queue.Empty: self.logger.debug("Waiting for response...") response = None else: self.responses.task_done() self.logger.info("Received response.") break time.sleep(self.interval) # type: ignore[arg-type] return response
[docs] def flush(self) -> None: """ Drop any currently incoming messages and flush the received messages queue. """ for _, drop_message in self.request_threads: drop_message.set() self.logger.debug("Request thread set to drop response.") timeout = time.time() + (5 * self.timeout) # type: ignore[operator] while not self.responses.empty() and time.time() < timeout: try: self.responses.get(block=False) except queue.Empty: self.logger.debug("Responses queue flushed.") else: self.responses.task_done()