"""
Driver for Kafka server
"""
import os
import re
import socket
from typing import Any, Dict, List, Optional
from schema import Or
from testplan.common.config import ConfigOption
from testplan.common.utils.documentation_helper import emphasized
from testplan.common.utils.path import instantiate, makedirs, makeemptydirs
from testplan.common.utils.process import execute_cmd
from testplan.testing.multitest.driver import app
KAFKA_BIN_PATH = "/opt/kafka/bin"
KAFKA_START = f"{KAFKA_BIN_PATH}/kafka-server-start.sh"
KAFKA_STORAGE = f"{KAFKA_BIN_PATH}/kafka-storage.sh"
[docs]
class KafkaStandaloneConfig(app.AppConfig):
"""
Configuration object for
:py:class:`~testplan.testing.multitest.driver.kafka.KafkaStandalone` resource.
"""
[docs]
@classmethod
def get_options(cls) -> Dict[Any, Any]:
return {
ConfigOption("cfg_template"): str,
ConfigOption("host"): str,
ConfigOption("port"): int,
ConfigOption("node_id"): int,
ConfigOption("controller_port"): int,
ConfigOption("controller_quorum_voters"): Or(str, None),
ConfigOption("kafka_storage"): str,
ConfigOption("cluster_id"): Or(str, None),
}
[docs]
class KafkaStandalone(app.App):
"""
Driver for starting a Kafka instance in standalone mode.
{emphasized_members_docs}
:param cfg_template: Kafka config file template.
:type cfg_template: ``str``
:param binary: kafka-server-start.sh file path.
:type binary: ``str``
:param port: Kafka listen port.
:type port: ``int``
:param env: Environmental variables to be made available to Zookeeper process.
:type env: ``dict``
"""
CONFIG = KafkaStandaloneConfig
def __init__(
self,
name: str,
cfg_template: str,
binary: str = KAFKA_START,
host: str = "localhost",
port: int = 0,
node_id: int = 1,
controller_port: int = 0,
controller_quorum_voters: Optional[str] = None,
kafka_storage: str = KAFKA_STORAGE,
cluster_id: Optional[str] = None,
**options: Any,
) -> None:
stdout_regexps = [
re.compile(
rf".*Awaiting socket connections on\s*(?P<host>[^:]+):(?P<port>(?!{controller_port}\b)[0-9]+).*"
),
re.compile(".*started.*"),
]
install_files = options.pop("install_files", [])
install_files.append(cfg_template)
super(KafkaStandalone, self).__init__(
name=name,
cfg_template=cfg_template,
binary=binary,
host=host,
port=port,
node_id=node_id,
controller_port=controller_port,
stdout_regexps=stdout_regexps,
controller_quorum_voters=controller_quorum_voters,
kafka_storage=kafka_storage,
install_files=install_files,
cluster_id=cluster_id,
**options,
)
self._host = host
self._port = port
self._controller_port = controller_port
self._controller_quorum_voters = controller_quorum_voters
@emphasized # type: ignore[prop-decorator]
@property
def host(self) -> str:
return self._host
@emphasized # type: ignore[prop-decorator]
@property
def port(self) -> int:
"""Port to listen on."""
return self._port
@emphasized # type: ignore[prop-decorator]
@property
def node_id(self) -> int:
"""Node ID."""
return self.cfg.node_id # type: ignore[no-any-return]
@emphasized # type: ignore[prop-decorator]
@property
def controller_port(self) -> int:
"""Controller port."""
return self._controller_port
@emphasized # type: ignore[prop-decorator]
@property
def controller_quorum_voters(self) -> str:
if not self._controller_quorum_voters:
self._controller_quorum_voters = (
f"{self.cfg.node_id}@{self.host}:{self.controller_port}"
)
return self._controller_quorum_voters
@property
def log_path(self) -> str:
return os.path.join(self.runpath, "etc")
@property
def etc_path(self) -> str:
"""
Decorated etc path property.
"""
return self.etcpath
@property
def config_path(self) -> str:
return os.path.join(
self.etcpath, os.path.basename(self.cfg.cfg_template)
)
[docs]
def pre_start(self) -> None:
super(KafkaStandalone, self).pre_start()
makedirs(self.log_path)
with open(self.config_path) as config_file:
config_data = config_file.read()
if "controller.quorum.voter" in config_data:
self.format_meta()
@property
def cmd(self) -> List[str]:
return [self.cfg.binary, self.config_path]
[docs]
def post_start(self) -> None:
super().post_start()
self._port = int(self.extracts["port"])
self.logger.info("%s listening on %s:%s", self, self._host, self._port)