Source code for testplan.testing.multitest.driver.zookeeper

"""
Driver for Zookeeper server
"""
import os
import socket
from typing import Optional, Dict

from schema import Or

from testplan.common.config import ConfigOption
from testplan.common.utils.documentation_helper import emphasized
from testplan.common.utils.path import (
    StdFiles,
    instantiate,
    makedirs,
    makeemptydirs,
)
from testplan.common.utils.process import execute_cmd
from testplan.testing.multitest.driver.base import Driver, DriverConfig

ZK_SERVER = "/usr/share/zookeeper/bin/zkServer.sh"


[docs]class ZookeeperStandaloneConfig(DriverConfig): """ Configuration object for :py:class:`~testplan.testing.multitest.driver.zookeeper.ZookeeperStandalone` resource. """
[docs] @classmethod def get_options(cls): return { ConfigOption("cfg_template"): str, ConfigOption("binary"): str, ConfigOption("host"): Or(None, str), ConfigOption("port"): int, ConfigOption("env", default=None): Or(None, dict), }
[docs]class ZookeeperStandalone(Driver): """ Driver for starting a Zookeeper instance in standalone mode. {emphasized_members_docs} :param template: Zookeeper config file template. :type template: ``str`` :param binary: zkServer.sh file path. :type binary: ``str`` :param port: Zookeeper listen port. Zookeeper doesn't support random port :type port: ``int`` :param env: Environmental variables to be made available to Zookeeper process. :type env: ``dict`` """ CONFIG = ZookeeperStandaloneConfig def __init__( self, name: str, cfg_template: str, binary: str = ZK_SERVER, host: Optional[str] = None, port: int = 2181, env: Optional[dict] = None, **options, ): super(ZookeeperStandalone, self).__init__( name=name, cfg_template=cfg_template, binary=binary, host=host, port=port, env=env, **options, ) self._host = host self._port = port self._env = None self.config = None self.zkdata_path = None self.zklog_path = None self.etc_path = None self.pid_file = None self.std = None @emphasized @property def host(self) -> str: """Host to bind to.""" if self._host is None: raise RuntimeError( "Host not resolved yet, shouldn't be accessed now." ) return self._host @emphasized @property def port(self) -> int: """Port to listen on.""" return self._port @emphasized @property def env(self) -> Dict[str, str]: """Environment variables.""" if self._env is not None: return self._env self._env = self.cfg.env.copy() if self.cfg.env else {} return self._env @emphasized @property def connection_str(self) -> str: """Connection string.""" if self._host is None: raise RuntimeError( "Host not resolved yet, shouldn't be accessed now." ) return "{}:{}".format(self._host, self._port)
[docs] def pre_start(self): """ Create mandatory directories and install files from given templates using the drivers context before starting zookeeper. """ super(ZookeeperStandalone, self).pre_start() self.zkdata_path = os.path.join(self.runpath, "zkdata") self.zklog_path = os.path.join(self.runpath, "zklog") self.etc_path = os.path.join(self.runpath, "etc") self.env["ZOO_LOG_DIR"] = self.zklog_path for directory in (self.zkdata_path, self.zklog_path, self.etc_path): if self.cfg.path_cleanup is False: makedirs(directory) else: makeemptydirs(directory) self.config = os.path.join(self.runpath, "etc", "zookeeper.cfg") self._host = self._host or socket.getfqdn() if self._port == 0: raise RuntimeError("Zookeeper doesn't support random port") instantiate(self.cfg.cfg_template, self.context_input(), self.config)
[docs] def starting(self): """Starts the Zookeeper instance.""" super(ZookeeperStandalone, self).starting() start_cmd = [self.cfg.binary, "start", self.config] self.std = StdFiles(self.runpath) execute_cmd( start_cmd, label=self.uid(), check=True, stdout=self.std.out, stderr=self.std.err, logger=self.logger, env=self.env, )
[docs] def post_start(self): super().post_start() self.logger.info("%s listening on %s:%s", self, self.host, self.port)
[docs] def stopping(self): """Stops the Zookeeper instance.""" stop_cmd = [self.cfg.binary, "stop", self.config] try: execute_cmd( stop_cmd, label=self.uid(), check=True, stdout=self.std.out, stderr=self.std.err, logger=self.logger, env=self.env, ) finally: self.std.close()