"""HTTPClient Driver."""
import time
import queue
from threading import Thread, Event
from typing import Union
try:
from typing import Literal
except ImportError:
from typing_extensions import Literal
import requests
from schema import Use, Or
from testplan.common.config import ConfigOption
from testplan.common.utils.context import expand, is_context, ContextValue
from testplan.common.utils.strings import slugify
from ..base import Driver, DriverConfig
[docs]class HTTPClientConfig(DriverConfig):
"""
Configuration object for
:py:class:`~testplan.testing.multitest.driver.http.client.HTTPClient`
driver.
"""
[docs] @classmethod
def get_options(cls):
"""
Schema for options validation and assignment of default values.
"""
return {
"host": Or(str, lambda x: is_context(x)),
ConfigOption("port", default=None): Or(
None, Use(int), lambda x: is_context(x)
),
ConfigOption("protocol", default="http"): str,
ConfigOption("timeout", default=5): Use(int),
ConfigOption("interval", default=0.01): Use(float),
}
[docs]class HTTPClient(Driver):
"""
Driver for a client that can connect to a server and send/receive messages
using HTTP protocol.
{emphasized_members_docs}
:param name: Name of HTTPClient.
:type name: ``str``
:param host: Hostname to connect to.
:type host: ``str`` or ``ContextValue``
:param port: Port to connect to. If None URL won't specify a port.
:type port: ``int`` or ``ContextValue``
:param protocol: Use HTTP or HTTPS protocol.
:type protocol: ``str``
:param timeout: Number of seconds to wait for a request.
:type timeout: ``int``
:param interval: Number of seconds to sleep whilst trying to receive a
message.
:type interval: ``int``
Also inherits all
:py:class:`~testplan.testing.multitest.driver.base.Driver` options.
"""
CONFIG = HTTPClientConfig
def __init__(
self,
name: str,
host: Union[str, ContextValue],
port: Union[int, ContextValue] = None,
protocol: str = "http",
timeout: int = 5,
interval: float = 0.01,
**options
):
options.update(self.filter_locals(locals()))
options.setdefault("file_logger", "{}.log".format(slugify(name)))
super(HTTPClient, self).__init__(**options)
self._host: str = None
self._port: Union[int, Literal[""]] = None
self.protocol = None
self.timeout = None
self.interval = None
self.responses = None
self.request_threads = []
@property
def host(self):
"""Target host name."""
return self._host
@property
def port(self):
"""Client port number assigned."""
return self._port
[docs] def starting(self):
"""
Start the HTTPClient.
"""
super(HTTPClient, self).starting()
self._host = expand(self.cfg.host, self.context)
context_port = expand(self.cfg.port, self.context, int)
self._port = context_port if context_port else ""
self.protocol = expand(self.cfg.protocol, self.context)
self.timeout = self.cfg.timeout
self.interval = self.cfg.interval
self.responses = queue.Queue()
port_ = ":{}".format(self.port) if self.port else self.port
self.logger.info(
"Started HTTPClient sending requests to %s://%s%s",
self.protocol,
self.host,
port_,
)
[docs] def stopping(self):
"""
Stop the HTTPClient.
"""
super(HTTPClient, self).stopping()
[docs] def aborting(self):
"""Abort logic that stops the client."""
super(HTTPClient, self).aborting()
def _send_request(self, method, api, drop_response, timeout, **kwargs):
"""
Send a request using the requests module.
:param method: HTTP method to be used in request (e.g. GET, POST etc.).
:type method: ``str``
:param api: API to send request to.
:type api: ``str``
:param drop_response: Whether to drop the response message (called by
flush).
:type drop_response: ``threading._Event``
:param timeout: Number of seconds to wait for a request.
:type timeout: ``int``
:param kwargs: Optional arguments for the request, look at the requests
modules docs for these arguments.
:type kwargs: Depends on the argument.
"""
http_method = getattr(requests, method, requests.get)
api = api[1:] if api.startswith("/") else api
url = "{protocol}://{host}{port}/{api}".format(
protocol=self.protocol,
host=self.host,
port=":{}".format(self.port) if self.port else self.port,
api=api,
)
timeout = kwargs.pop("timeout", timeout)
self.logger.info(
"Sending %s request: %s", http_method.__name__.upper(), url
)
response = http_method(url=url, timeout=timeout, **kwargs)
if not drop_response.is_set():
self.responses.put(response)
[docs] def send(self, method, api, **kwargs):
"""
Send a non blocking HTTP request.
:param method: HTTP method to be used in request (e.g. GET, POST etc.).
:type method: ``str``
:param api: API to send request to.
:type api: ``str``
:param kwargs: Optional arguments for the request, look at the requests
modules docs for these arguments.
:type kwargs: Depends on the argument.
"""
drop_response = Event()
request_thread = Thread(
target=self._send_request,
args=(method, api, drop_response, self.timeout),
kwargs=kwargs,
)
request_thread.setDaemon(True)
request_thread.start()
self.request_threads.append((request_thread, drop_response))
[docs] def head(self, api, **kwargs):
"""
Send HEAD request.ZMQClient
:param api: API to send request to.
:type api: ``str``
:param kwargs: Optional arguments for the request, look at the requests
modules docs for these arguments.
:type kwargs: Depends on the argument.
"""
self.send("head", api, **kwargs)
[docs] def get(self, api, params=None, **kwargs):
"""
Send GET request.
:param api: API to send request to.
:type api: ``str``
:param params: Parameters to append to HTTP request after ?.
:type params: ``dict``
:param kwargs: Optional arguments for the request, look at the requests
modules docs for these arguments.
:type kwargs: Depends on the argument.
"""
self.send("get", api, params=params, **kwargs)
[docs] def post(self, api, data=None, json=None, **kwargs):
"""
Send POST request.
:param api: API to send request to.
:type api: ``str``
:param data: Dictionary to send in the body of the request.
:type data: ``dict``
:param json: JSON data to send in the body of the request.
:type json: ``dict``
:param kwargs: Optional arguments for the request, look at the requests
modules docs for these arguments.
:type kwargs: Depends on the argument.
"""
self.send("post", api, data=data, json=json, **kwargs)
[docs] def put(self, api, data=None, **kwargs):
"""
Send PUT request.
:param api: API to send request to.
:type api: ``str``
:param data: Dictionary to send in the body of the request.
:type data: ``dict``
:param kwargs: Optional arguments for the request, look at the requests
modules docs for these arguments.
:type kwargs: Depends on the argument.
"""
self.send("put", api, data=data, **kwargs)
[docs] def delete(self, api, **kwargs):
"""
Send DELETE request.
:param api: API to send request to.
:type api: ``str``
:param kwargs: Optional arguments for the request, look at the requests
modules docs for these arguments.
:type kwargs: Depends on the argument.
"""
self.send("delete", api, **kwargs)
[docs] def patch(self, api, data=None, **kwargs):
"""
Send PATCH request.
:param api: API to send request to.
:type api: ``str``
:param data: Dictionary to send in the body of the request.
:type data: ``dict``
:param kwargs: Optional arguments for the request, look at the requests
modules docs for these arguments.
:type kwargs: Depends on the argument.
"""
self.send("patch", api, data=data, **kwargs)
[docs] def options(self, api, **kwargs):
"""
Send OPTIONS request.
:param api: API to send request to.
:type api: ``str``
:param kwargs: Optional arguments for the request, look at the requests
modules docs for these arguments.
:type kwargs: Depends on the argument.
"""
self.send("options", api, **kwargs)
[docs] def receive(self, timeout=None):
"""
Wait to receive a response.
:param timeout: Number of seconds to wait for a response,
overrides timeout from init.
:type timeout: ``int``
:return: A request response or ``None``
:rtype: ``requests.models.Response`` or ``NoneType``
"""
timeout = timeout if timeout is not None else self.timeout
timeout += time.time()
response = None
while time.time() < timeout:
try:
response = self.responses.get(False)
except queue.Empty:
self.logger.debug("Waiting for response...")
response = None
else:
self.responses.task_done()
self.logger.info("Received response.")
break
time.sleep(self.interval)
return response
[docs] def flush(self):
"""
Drop any currently incoming messages and flush the received messages
queue.
"""
for _, drop_message in self.request_threads:
drop_message.set()
self.logger.debug("Request thread set to drop response.")
timeout = time.time() + (5 * self.timeout)
while not self.responses.empty() and time.time() < timeout:
try:
self.responses.get(block=False)
except queue.Empty:
self.logger.debug("Responses queue flushed.")
else:
self.responses.task_done()