Source code for testplan.testing.multitest.driver.http.server

"""HTTPServer Driver."""

import time
import queue
import json
import http.server as http_server
from threading import Thread
from typing import Optional

from schema import Use

from testplan.common.config import ConfigOption
from testplan.common.utils.documentation_helper import emphasized
from testplan.common.utils.strings import slugify

from ..base import Driver, DriverConfig

_CONTENT_TYPE_KEY = "Content-Type"
_CONTENT_LENGTH_KEY = "Content-Length"


[docs]class ReceivedRequest: """ Stores information of requests received by the HTTP Server. """ def __init__( self, method: str, path_url: str, headers: dict, raw_data: bytes, raw_requestline: bytes, requestline: str, request_version: str, ) -> None: """ Constructs a new received request :param method: method of the request, eg.: GET, POST, HEAD... :param path_url: URL of the request :param headers: Dictionary of HTTP Headers sent with the request :param raw_data: Data sent in the body of the request in original bytes format :param raw_requestline: The first line of the request containing the method, URL and version in original bytes format :param requestline: The first line of the request containing the method, URL and version in string format :param request_version: Human-readable string of the protocol version """ # Default empty dict for headers. headers = {} if headers is None else headers self.method = method self.path_url = path_url self.headers = headers self.raw_data = raw_data self.raw_requestline = raw_requestline self.requestline = requestline self.request_version = request_version def __str__(self) -> str: """ String representation of the first line of the request. """ return self.requestline @property def content_type(self) -> Optional[str]: """ Returns the request's content type :return Content type header or None """ if _CONTENT_TYPE_KEY in self.headers: return self.headers[_CONTENT_TYPE_KEY] else: return None @property def json(self) -> Optional[dict]: """ Returns the request's data in JSON format if exists :return Request's data in JSON format or None """ if ( self.raw_data is not None and self.content_type == "application/json" ): return json.loads(self.raw_data) else: return None
[docs]class HTTPResponse: """ HTTPResponse containing the status code, headers and content. """ def __init__( self, status_code: int = None, headers: dict = None, content: list = None, ) -> None: """ Constructs a HTTPResponse :param status_code: The returned status code. :param headers: A dictionary containing the header keywords and values. :param content: A list of strings to be sent back. """ self.status_code = status_code or 200 self.headers = headers or {_CONTENT_TYPE_KEY: "text/plain"} self.content = content or []
[docs]class HTTPRequestHandler(http_server.BaseHTTPRequestHandler): """ Responds to any HTTP request with response in queue. If empty send an error message in response. """ def _send_header( self, status_code: int = 200, headers: dict = None ) -> None: """ Send a header response. :param status_code: The returned status code. :param headers: The returned headers. """ if headers is None: headers = {_CONTENT_TYPE_KEY: "text/plain"} self.send_response(code=int(status_code)) for keyword, value in headers.items(): self.send_header(keyword, value) self.end_headers() def _send_content(self, content): """ Send the content response. :param content: A list of strings to be sent back. :type content: ``list`` """ for line in content: try: line = line.encode("utf-8") except AttributeError: pass self.wfile.write(line) def _parse_request(self) -> None: """ Put the request into the HTTPServer's requests queue. Get the response from the HTTPServer's response queue. If there is no response send an error message back. """ raw_content = None if _CONTENT_LENGTH_KEY in self.headers: raw_content = self.rfile.read( int(self.headers[_CONTENT_LENGTH_KEY]) ) response = self.get_response( ReceivedRequest( headers=self.headers, path_url=self.path, raw_data=raw_content, raw_requestline=self.raw_requestline, requestline=self.requestline, method=self.command, request_version=self.request_version, ) ) if not isinstance(response, HTTPResponse): raise TypeError("Response must be of type HTTPResponse") self._send_header( status_code=response.status_code, headers=response.headers ) self._send_content(response.content) self.server.log_callback( "Sent response with:\n status code {}\n headers {}".format( response.status_code, response.headers ) )
[docs] def log_message(self, format, *args): """Log messages from the BaseHTTPRequestHandler class.""" self.server.log_callback("BASE CLASS: {}".format(format % args))
[docs] def get_response(self, request: ReceivedRequest) -> HTTPResponse: """ Parse the request and return the response. :param request: The request path. :return: Http response. """ self.server.requests.put(request) timeout = time.time() + self.server.timeout while time.time() < timeout: try: response = self.server.responses.get(False) except queue.Empty: response = HTTPResponse( status_code=500, content=["No response in driver queue."] ) self.server.log_callback("No response found in queue.") else: self.server.log_callback("Response popped from queue.") break time.sleep(self.server.interval) if response.status_code == 500: self.server.log_callback("Responding with 500 error.") return response
[docs] def do_HEAD(self): """Handles a HEAD request.""" self._send_header() self.server.log_callback("Sending response to HEAD request.")
[docs] def do_GET(self): """Handles a GET request.""" self._parse_request() self.server.log_callback("Sending response to GET request.")
[docs] def do_POST(self): """Handles a POST request.""" self._parse_request() self.server.log_callback("Sending response to POST request.")
[docs] def do_PUT(self): """Handles a PUT request.""" self._parse_request() self.server.log_callback("Sending response to PUT request.")
[docs] def do_DELETE(self): """Handles a DELETE request.""" self._parse_request() self.server.log_callback("Sending response to DELETE request.")
[docs] def do_PATCH(self): """Handles a PATCH request.""" self._parse_request() self.server.log_callback("Sending response to PATCH request.")
[docs] def do_OPTIONS(self): """Handles a OPTIONS request.""" self._parse_request() self.server.log_callback("Sending response to OPTIONS request.")
[docs]class HTTPServerConfig(DriverConfig): """ Configuration object for :py:class:`~testplan.testing.multitest.driver.http.server.HTTPServer` driver. """
[docs] @classmethod def get_options(cls): """ Schema for options validation and assignment of default values. """ return { ConfigOption("host", default="localhost"): str, ConfigOption("port", default=0): Use(int), ConfigOption( "request_handler", default=HTTPRequestHandler ): lambda v: issubclass(v, http_server.BaseHTTPRequestHandler), ConfigOption("handler_attributes", default={}): dict, ConfigOption("timeout", default=5): Use(int), ConfigOption("interval", default=0.01): Use(float), }
[docs]class HTTPServer(Driver): """ Driver for a server that can accept connection and send/receive messages using HTTP protocol. {emphasized_members_docs} :param name: Name of HTTPServer. :type name: ``str`` :param host: Hostname to connect to. :type host: ``str`` or ``ContextValue`` :param port: Port to connect to. :type port: ``str`` or ``ContextValue`` :param request_handler: Handles requests and responses for the server. :type request_handler: subclass of ``http.server.BaseHTTPRequestHandler`` :param handler_attributes: Dictionary of attributes to be accessed from the request_handler. :type handler_attributes: ``dict`` :param timeout: Number of seconds to wait for a response from the queue in the request_handler. :type timeout: ``int`` :param interval: Time to wait between each attempt to get a response. :type interval: ``int`` Also inherits all :py:class:`~testplan.testing.multitest.driver.base.Driver` options. """ CONFIG = HTTPServerConfig def __init__( self, name, host: str = "localhost", port: int = 0, request_handler=HTTPRequestHandler, handler_attributes=None, timeout=5, interval=0.01, **options ): options.update(self.filter_locals(locals())) options.setdefault("file_logger", "{}.log".format(slugify(name))) super(HTTPServer, self).__init__(**options) self._host: str = None self._port: int = None self.request_handler = None self.handler_attributes = None self.timeout = None self.interval = None self.requests = None self.responses = None self._server_thread = None @emphasized @property def host(self): """Host name.""" return self._host @emphasized @property def port(self): """Port number assigned.""" return self._port
[docs] def queue_response(self, response): """ Put an HTTPResponse on to the end of the response queue. :param response: A response to be sent. :type response: ``HTTPResponse`` """ if not isinstance(response, HTTPResponse): raise TypeError("Response must be of type HTTPResponse") self.responses.put(response) self.logger.debug("Added response to HTTPServer response queue.")
[docs] def respond(self, response): """ Put an HTTPResponse on to the end of the response queue. :param response: A response to be sent. :type response: ``HTTPResponse`` """ self.queue_response(response)
[docs] def get_request(self) -> Optional[str]: """ Get a request sent to the HTTPServer, if the requests queue is empty return None. :return: A request from the queue or ``None`` """ try: return self.requests.get(False).path_url except queue.Empty: return None
[docs] def get_full_request(self) -> Optional[ReceivedRequest]: """ Get a request sent to the HTTPServer, if the requests queue is empty return None. :return: A request from the queue or ``None`` """ try: return self.requests.get(False) except queue.Empty: return None
[docs] def receive(self, timeout: int = None) -> Optional[ReceivedRequest]: """ Wait to receive a request. :param timeout: Number of seconds to wait for a request, overrides timeout from init. :return: A request or ``None`` """ timeout = timeout if timeout is not None else self.timeout timeout += time.time() request = None while time.time() < timeout: try: request = self.requests.get(False) except queue.Empty: self.logger.debug("Waiting for request...") request = None else: self.requests.task_done() self.logger.debug("Received request.") break time.sleep(self.interval) return request
[docs] def starting(self): """Start the HTTPServer.""" super(HTTPServer, self).starting() self.request_handler = self.cfg.request_handler self.handler_attributes = self.cfg.handler_attributes self.timeout = self.cfg.timeout self.interval = self.cfg.interval self.requests = queue.Queue() self.responses = queue.Queue() self._server_thread = _HTTPServerThread( host=self.cfg.host, port=self.cfg.port, requests_queue=self.requests, responses_queue=self.responses, handler_attributes=self.handler_attributes, request_handler=self.request_handler, timeout=self.timeout, logger=self.logger, ) self._server_thread.setName(self.name) self._server_thread.start() while not hasattr(self._server_thread.server, "server_port"): time.sleep(0.1) self._host, self._port = self._server_thread.server.server_address self.logger.info( "Started HTTPServer listening on http://%s:%s", self.host, self.port, )
def _stop(self): """Stop the HTTPServer.""" if self._server_thread: self._server_thread.stop()
[docs] def stopping(self): """Stop the HTTPServer.""" super(HTTPServer, self).stopping() self._stop()
[docs] def aborting(self): """Abort logic that stops the server.""" super(HTTPServer, self).aborting() self._stop()
[docs] def flush_request_queue(self) -> None: """Flush the received messages queue.""" timeout = time.time() + (5 * self.timeout) while not self.requests.empty() and time.time() < timeout: try: self.requests.get(block=False) except queue.Empty: self.logger.debug("Requests queue flushed.") else: self.requests.task_done()
class _HTTPServerThread(Thread): """ HTTP server running on a separate thread. :param host: Host address for HTTP server. :type host: ``str`` :param port: Port for HTTP server. :type port: ``int`` :param requests_queue: Requests queue for HTTP server. :type requests_queue: ``Queue.Queue`` :param responses_queue: Responses queue for HTTP server. :type responses_queue: ``Queue.Queue`` :param handler_attributes: Dictionary of attributes to be accessed from the request_handler. :type handler_attributes: ``dict`` :param request_handler: Handles requests and responses for the server. :type request_handler: subclass of ``http.server.BaseHTTPRequestHandler`` :param timeout: Number of seconds to wait for a response from the queue in the request_handler. :param interval: Time to wait between each attempt to get a response. :type interval: ``int`` :type timeout: ``int`` :param logger: Logger for the driver. :type logger: ``logging.Logger`` """ def __init__( self, host, port, requests_queue, responses_queue, handler_attributes, request_handler=None, timeout=5, interval=0.01, logger=None, ): super(_HTTPServerThread, self).__init__() self.host = host self.port = port self.requests_queue = requests_queue self.responses_queue = responses_queue self.handler_attributes = handler_attributes self.request_handler = request_handler or HTTPRequestHandler self.timeout = timeout self.interval = interval self.logger = logger self.server = None def run(self): """Start the HTTP server thread.""" self.server = http_server.HTTPServer( server_address=(self.host, self.port), RequestHandlerClass=self.request_handler, ) self.server.requests = self.requests_queue self.server.responses = self.responses_queue self.server.handler_attributes = self.handler_attributes self.server.timeout = self.timeout self.server.interval = self.interval nothing = lambda msg: None self.server.log_callback = ( self.logger.debug if self.logger else nothing ) self.server.serve_forever() def stop(self): """Stop the HTTP server thread.""" if self.server is not None: self.server.shutdown()