Source code for testplan.testing.multitest.driver.kafka

"""
Driver for Kafka server
"""
import os
import re
import socket
from typing import List, Optional

from schema import Or

from testplan.common.config import ConfigOption
from testplan.common.utils.documentation_helper import emphasized
from testplan.common.utils.path import instantiate, makedirs, makeemptydirs
from testplan.testing.multitest.driver import app

KAFKA_START = "/opt/kafka/bin/kafka-server-start.sh"


[docs]class KafkaStandaloneConfig(app.AppConfig): """ Configuration object for :py:class:`~testplan.testing.multitest.driver.kafka.KafkaStandalone` resource. """
[docs] @classmethod def get_options(cls): return { ConfigOption("cfg_template"): str, ConfigOption("host"): Or(str, None), ConfigOption("port"): int, }
[docs]class KafkaStandalone(app.App): """ Driver for starting a Kafka instance in standalone mode. {emphasized_members_docs} :param cfg_template: Kafka config file template. :type cfg_template: ``str`` :param binary: kafka-server-start.sh file path. :type binary: ``str`` :param port: Kafka listen port. :type port: ``int`` :param env: Environmental variables to be made available to Zookeeper process. :type env: ``dict`` """ CONFIG = KafkaStandaloneConfig def __init__( self, name: str, cfg_template: str, binary: str = KAFKA_START, host: Optional[str] = None, port: int = 0, **options ): stdout_regexps = [ re.compile( r".*Awaiting socket connections on\s*(?P<host>[^:]+):(?P<port>[0-9]+).*" ), re.compile(".*started.*"), ] super(KafkaStandalone, self).__init__( name=name, cfg_template=cfg_template, binary=binary, host=host, port=port, stdout_regexps=stdout_regexps, **options ) self.log_path = None self.etc_path = None self.config = None self._host = host self._port = port @emphasized @property def host(self) -> str: """Host to bind to.""" if self._host is None: raise RuntimeError( "Host not resolved yet, shouldn't be accessed now." ) return self._host @emphasized @property def port(self) -> int: """Port to listen on.""" return self._port
[docs] def pre_start(self): super(KafkaStandalone, self).pre_start() self._host = self._host or socket.getfqdn() self.log_path = os.path.join(self.runpath, "log") self.etc_path = os.path.join(self.runpath, "etc") for directory in (self.log_path, self.etc_path): if self.cfg.path_cleanup is False: makedirs(directory) else: makeemptydirs(directory) self.config = os.path.join(self.runpath, "etc", "server.properties") instantiate(self.cfg.cfg_template, self.context_input(), self.config)
@property def cmd(self) -> List[str]: return [self.cfg.binary, self.config]
[docs] def post_start(self): super().post_start() self._port = int(self.extracts["port"]) self.logger.info("%s listening on %s:%s", self, self._host, self._port)