"""Interactive code reloader module."""
import sys
import os
import io
import time
import inspect
import importlib
import modulefinder
import collections
import functools
import warnings
from testplan.common.utils import path as path_utils
from testplan.common.utils import logger
from testplan.common.utils import strings
from testplan.common.utils.package import import_tmp_module
from testplan.testing.multitest import suite, MultiTest
[docs]class ModuleReloader(logger.Loggable):
"""
Reloads modules and their dependencies if there was any file modification.
:param extra_deps: Modules to register as extra dependencies to reload,
despite not being directly imported by __main__, or paths of these
modules.
:type extra_deps: ``Iterable[ModuleType]`` or ``Iterable[str]``
:param scheduled_modules: Name of module which has schedule tests, with
the module path registered as extra dependencies to reload.
:type scheduled_modules: ``Dict[str, str]``
"""
def __init__(self, extra_deps=None, scheduled_modules=None):
super(ModuleReloader, self).__init__()
self._extra_deps = extra_deps or []
self._scheduled_modules = scheduled_modules or {}
if isinstance(self._scheduled_modules, (tuple, list)):
counter = collections.Counter(
module_info[0] for module_info in self._scheduled_modules
)
duplicates = [mod for mod, cnt in counter.items() if cnt > 1]
for dup in duplicates:
paths = set(
os.path.realpath(path)
for mod, path in self._scheduled_modules
if mod == dup
)
if len(paths) > 1:
paths_to_print = os.linesep.join(
f" -- {path}" for path in paths
)
warnings.warn(
f"Module `{dup}` imported from different places, it"
" makes Testplan not able to reload tests properly:"
f"{os.linesep}{paths_to_print}"
)
self._scheduled_modules = dict(self._scheduled_modules)
# Import modules that have scheduled tests
for mod, path in self._scheduled_modules.items():
with import_tmp_module(
mod, path, delete=False, warn_if_exist=False
):
pass
(
self._reload_dirs,
self._dep_graph,
self._watched_modules,
) = self._build_dependencies()
# Last recorded reload time for watched modules.
self._last_reload_time = {} # type: Dict[str, float]
self._init_time = time.time()
[docs] def reload(self, tests, rebuild_dependencies=False):
"""
Reload code and update testsuites in given tests with new code.
:param tests: Iterable of Tests (e.g. MultiTest).
:type tests: ``Iterable[Test]``
:param rebuild_dependencies: Hard re-calculate all file dependencies.
:type rebuild_dependencies: ``bool``
"""
start_time = time.time()
suite_instances = self._suites_by_class(tests)
if rebuild_dependencies:
(
self._reload_dirs,
self._dep_graph,
self._watched_modules,
) = self._build_dependencies()
modified_modules = self._modified_modules
if modified_modules:
self.logger.debug(
"Watched files have been modified - reloading dependencies."
)
self._reload_modified_modules(modified_modules, suite_instances)
self.logger.info(
"Took %.2f seconds to reload dependencies.",
time.time() - start_time,
)
else:
self.logger.debug("No watched files have been modified.")
def _build_dependencies(self):
"""
Build a list of directories to reload code from and a tree of
dependencies.
"""
main_module_file = sys.modules["__main__"].__file__
if not main_module_file:
raise RuntimeError(
"Can only use interactive reloader when the __main__ module "
"is a file."
)
reload_dirs = {os.path.abspath(os.path.dirname(main_module_file))}
# Add extra reload source directories if required.
reload_dirs = reload_dirs.union(
self._extra_reload_dirs(
self._extra_deps + list(set(self._scheduled_modules.values()))
)
)
dep_graph, watched_modules = self._build_dep_graph(
main_module_file, reload_dirs
)
return reload_dirs, dep_graph, watched_modules
def _extra_reload_dirs(self, deps):
"""
Build and return a set of reload directories used by extra
dependencies.
:param deps: Extra modules to add as dependencies of __main__.
:type deps ``Iterable[ModuleType]``
:return: Reload directories of extra dependencies
:rtype: ``set[str]``
"""
reload_dirs = set()
for dep in deps:
if isinstance(dep, str):
dirpath = path_utils.fix_home_prefix(os.path.abspath(dep))
# Add it to `sys.path` for reloading
if dirpath not in sys.path:
sys.path.append(dirpath)
self.logger.debug("Adding extra dependent path: %s", dirpath)
reload_dirs.add(dirpath)
else:
filepath = _module_filepath(dep)
dirpath = os.path.dirname(filepath) if filepath else None
# Find the path where module or package can be imported
if _has_package(dep):
package = sys.modules.get(dep.__package__.split(".")[0])
if package is not None:
filepath = _module_filepath(package)
if filepath:
dirpath = os.path.dirname(
os.path.dirname(filepath)
)
if dirpath:
# Even though this module has been imported, its directory
# may have been removed from `sys.path`. In that case it
# needs to be added back so the module can be reloaded.
if dirpath not in sys.path:
sys.path.append(dirpath)
self.logger.debug(
"Adding extra dependent path for module %s: %s",
dep.__name__,
dirpath,
)
reload_dirs.add(dirpath)
return reload_dirs
def _build_dep_graph(self, main_module_file, reload_dirs):
"""
Build the graph of dependencies starting from the main module.
:param main_module_file: File path to the python script being executed
as __main__.
:type main_module_file: ``str``
:param reload_dirs: Directories to reload modules from.
:type reload_dirs: ``Iterable[str]``
"""
finder = _GraphModuleFinder(path=self._filtered_syspath(reload_dirs))
try:
with io.open(main_module_file, "r") as fp:
text = fp.read()
except OSError as exc:
raise RuntimeError(
"Could not run main module {} as a script: {}.".format(
main_module_file,
exc,
)
)
else:
imports = "".join(
f"import {module_name}{os.linesep}"
for module_name in sorted(self._scheduled_modules.keys())
)
finder.load_module(
"__main__",
io.StringIO( # Instance of `TextIOBase` with a `read` method
text + os.linesep * 2 + imports if imports else text
),
main_module_file,
("", "r", 1), # In deprecated module `imp`: PY_SOURCE == 1
)
return finder.build_dep_graph()
def _filtered_syspath(self, reload_dirs):
"""
:return: sys.path filtered by paths that are in a reload dir.
:rtype: ``List[str]``
"""
return list(
{
path
for path in sys.path
if any(
os.path.abspath(path).startswith(os.path.abspath(d))
for d in reload_dirs
)
}
)
@property
def _modified_modules(self):
"""
Calls `os.stat` on all watched files to check which ones have been
modified and require a reload.
:return: Set of all modules whose filepaths that have been modified
and require reloading.
:rtype: ``set[_ModuleNode]``
"""
return set(
mod
for mod in self._watched_modules
if os.stat(mod.filepath).st_mtime
> self._last_reload_time.get(mod.name, self._init_time)
)
def _suites_by_class(self, tests):
"""
Creates a {module_name: {class_name: [suite_instance, ...]}} mapping.
:param tests: iterator of Tests.
:type tests: ``Iterable[Test]``
:return: Mapping of module and class name to list of suite instances.
:rtype: ``Dict[str, Dict[str, List[Any]]]``
"""
suite_dict = collections.defaultdict(
functools.partial(collections.defaultdict, list)
)
for test in tests:
if isinstance(test, MultiTest):
for suite in test.cfg.suites:
suite_dict[suite.__module__][
suite.__class__.__name__
].append(suite)
return suite_dict
def _reload_modified_modules(self, modified_modules, suite_instances):
"""
Reload all files that have been modified. If a module has been reloaded,
all modules that depend on it should also be reloaded. We ensure this
by walking the graph of dependencies in depth-first order.
:param modified_modules: Set of modules that have been modified and
require a reload.
:type modified_modules: ``set[_ModuleNode]``
:param suite_instances: Mapping of module and class names to list of
suite instances.
:type suite_instances: ``Dict[str, Dict[str, List[Any]]]``
"""
# Walk the graph using depth-first search order.
visited_nodes = set()
reloaded_nodes = set()
# We never want to reload the __main__ module, so we start the
# recursion from each immediate dependency in turn.
for dep in self._dep_graph.dependencies:
if dep not in visited_nodes:
self._reload_recur(
dep,
modified_modules,
suite_instances,
visited_nodes,
reloaded_nodes,
)
def _reload_recur(
self,
mod_node,
modified_modules,
suite_instances,
visited_nodes,
reloaded_nodes,
):
"""
Recursively walk the graph of dependencies, reloading all modified
files.
:param mod_node: Current module we are processing.
:type mod_node: ``_ModuleNode``
:param modified_modules: Set of modules that have been modified and
require a reload.
:type modified_modules: ``set[_ModuleNode]``
:param suite_instances: Mapping of module and class names to list of
suite instances.
:type suite_instances: ``Dict[str, Dict[str, List[Any]]]``
:param visited_nodes: Set of modules already visited.
:type visited_nodes: ``set[_ModuleNode]``
:return: Whether the module was reloaded.
:rtype: ``bool``
"""
if mod_node in reloaded_nodes:
return True # we already reloaded this
if mod_node in visited_nodes:
return False # we already decided not need reload
visited_nodes.add(mod_node)
# Reload this module's dependencies if necessary. Check if any
# dependencies were reloaded - we will need to reload our current
# module too if so. Note that we must use a list comprehension inside
# the any call to avoid short-circuiting as soon as a dependency is
# reloaded.
dep_reloaded = any(
[
self._reload_recur(
dep,
modified_modules,
suite_instances,
visited_nodes,
reloaded_nodes,
)
for dep in mod_node.dependencies
]
)
# Reload this module if there are file modifications or if any of its
# dependencies have been reloaded.
if mod_node in modified_modules or dep_reloaded:
self.logger.info("Reloading %s", mod_node.name)
mod_node.reload()
if mod_node.name in suite_instances:
mod_node.update_suites(suite_instances)
self._last_reload_time[mod_node.name] = time.time()
reloaded_nodes.add(mod_node)
return True
else:
return False
class _GraphModuleFinder(modulefinder.ModuleFinder, logger.Loggable):
r"""
Variant of the standard library ModuleFinder that is able to produce a
directed acyclic graph of dependencies. The root node corresponds to the
main module passed as a script, its child nodes correspond to its
direct import dependencies, and so on.
An example of how such a graph
with a main module and dependencies A, B, C and D could look is:
main
/ \
/ \
V V
A ----> B
/ \ \
/ --- \
V | V
C --> D
:param args: passed to ModuleFinder
:param kwargs: passed to ModuleFinder
"""
def __init__(self, *args, **kwargs):
# In the Python 2 stdlib, ModuleFinder is an old-style class that
# does not inherit from object, therefore we cannot use super().
modulefinder.ModuleFinder.__init__(self, *args, **kwargs)
logger.Loggable.__init__(self) # Enable logging via self.logger
# Mapping of module object to a set of its dependencies. We store the
# dependencies in a list instead of a set to preserve ordering. While
# not strictly necessary, it makes testing easier when the ordering
# of nodes in the graph can be relied on. However, we must ensure that
# each dependency is unique, so we check before adding each new
# dependency below. This should be a significant performance issue,
# as we only expect each module to have a relatively small number
# of dependencies (around 10s) so O(n) adding is acceptable.
self._module_deps = collections.defaultdict(list)
self._curr_caller = None
self._module_nodes = {}
def import_hook(self, name, caller=None, *args, **kwargs):
"""
Hook called when ``caller`` imports module ``name``. Store off the
caller so we can use it later.
:param name: Name of the module being imported.
:type name: ``str``
:param caller: Module object that is importing ``name``.
:type caller: ``modulefinder.Module``
:param args: Other args are passed down to
``modulefinder.ModuleFinder.import_hook``
:param kwargs: Other kwargs are passed down to
``modulefinder.ModuleFinder.import_hook``
:return: Return value from ``modulefinder.ModuleFinder.import_hook``
"""
previous_caller = self._curr_caller
if caller is None:
caller_frame = inspect.stack()[2]
self._curr_caller = caller_frame.frame.f_locals.get("m")
assert (
self._curr_caller is not None
), "Source code of `modulefinder` library has changed !!"
else:
self._curr_caller = caller
mod = modulefinder.ModuleFinder.import_hook(
self, name, caller, *args, **kwargs
)
self._curr_caller = previous_caller
return mod
def import_module(self, *args, **kwargs):
"""
Called a bit further down the stack when a module is imported. Update
the internal mapping of module dependencies.
:param args: all args are passed down to
``modulefinder.ModuleFinder.import_module``
:param kwargs: all kwargs are passed down to
``modulefinder.ModuleFinder.import_module``
:return: Imported module object.
:rtype: ``modulefinder.Module``
"""
caller = self._curr_caller
mod = modulefinder.ModuleFinder.import_module(self, *args, **kwargs)
if (
caller is not None
and mod is not None
and _has_file(mod)
and mod not in self._module_deps[caller]
):
self.logger.debug(
"Adding %(mod)s to %(caller)s's dependencies",
{"mod": mod.__name__, "caller": caller.__name__},
)
self._module_deps[caller].append(mod)
return mod
def build_dep_graph(self):
"""
Build a directed graph of dependencies, made up of ``_ModuleNode``s.
:return: Root node of dependency graph and a set of all nodes in the
graph.
:rtype: ``Tuple[_ModuleNode, set[_ModuleNode]``
"""
main_mod = self.modules["__main__"]
root_node = self._produce_graph(main_mod, [])
return root_node, set(self._module_nodes.values())
def _produce_graph(self, mod, processed):
"""
Recursive function to build a graph of dependencies.
:param mod: Module to use for the tree root.
:type mod: ``modulefinder.Module``
:param processed: List of modules processed so far in this branch, in
order of processing. Used to avoid infinite recursion in the case
of circular dependencies.
:type processed: ``List[modulefinder.Module]``
:raises RuntimeError: If ``mod`` is in ``processed`` - the calling
function should ensure it is not asking to process a module that
has already been processed.
:return: Root node of dependency graph.
:rtype: ``_ModuleNode``
"""
self.logger.debug(
"Creating node in dependency graph for module %s", mod.__name__
)
if mod in processed:
raise RuntimeError(
"Module {mod} has already been processed in this branch. "
"Already processed = {processed}".format(
mod=mod.__name__, processed=processed
)
)
# Create a new list of processed modules to avoid mutating the list
# we were passed.
new_processed = processed + [mod]
# If this module has already been processed in another branch, we
# can short circuit and return the existing node.
if mod in self._module_nodes:
return self._module_nodes[mod]
# Now for the recursive step - produce a sub-graph of the dependencies
# of each of our dependencies first, then attach each one to the set
# of dependencies for the current node. Check that each dependency
# has not already been processed in this branch to avoid circular
# dependencies causing infinite recursion.
dependencies = [
self._produce_graph(mod=dep, processed=new_processed)
for dep in self._module_deps[mod]
if dep not in new_processed
]
node = _ModuleNode(mod, dependencies)
self._module_nodes[mod] = node
return node
class _ModuleNode:
"""
Node in the directed acyclic graph of dependencies produced by
_GraphModuleFinder.
:param mod: Module object this node represents
:type mod: ``modulefinder.Module``
:param dependencies: List of nodes for modules this node is dependent on.
:type dependencies: ``List[_ModuleNode]``
"""
def __init__(self, mod, dependencies):
self.mod = mod
if not all(isinstance(d, self.__class__) for d in dependencies):
raise TypeError(
"Dependencies must all be of type {}".format(self.__class__)
)
self.dependencies = dependencies
self.filepath = _module_filepath(mod)
self._native_mod = None
def __repr__(self):
return "ModuleNode[{}]".format(self.name)
def __hash__(self):
return hash(self.mod)
@property
def graph_string(self):
"""
:return: a multi-line string representing the graph from this node
downwards.
:rtype: ``str``
"""
if self.dependencies:
ret_str = "{mod} ->\n{children}".format(
mod=self.name,
children=strings.indent(
"\n".join(c.graph_string for c in self.dependencies)
),
)
else:
ret_str = self.name
return ret_str
@property
def name(self):
return self.mod.__name__
def reload(self):
"""Reload this module from file."""
try:
if self._native_mod is None:
self._native_mod = sys.modules[self.mod.__name__]
self._native_mod = importlib.reload(self._native_mod)
except (KeyError, ModuleNotFoundError):
# ignore dynamic import module
pass
def update_suites(self, suite_instances):
"""
Update suite objects to new classes.
:param suite_instances: Mapping of module and class name to test suite
instances.
:type suite_instances: ``Dict[str, Dict[str, List[Any]]``
"""
for suite_cls in self._iter_suites():
for suite_obj in suite_instances[self.name].get(
suite_cls.__name__, []
):
suite_obj.__class__ = suite_cls
suite.set_testsuite_testcases(suite_obj)
def _iter_suites(self):
"""Generate classes of test suites defined in this module."""
for key in self.mod.globalnames.keys():
attr = getattr(self._native_mod, key, None)
if attr and _is_testsuite(attr):
yield attr
def _has_file(mod):
"""
:param mod: Module object. Can be any of the multiple types used to
represent a module, we just check for a __file__ attribute.
:type mod: ``Any``
:return: If given module has a not None __file__ attribute.
:rtype: ``bool``
"""
return hasattr(mod, "__file__") and mod.__file__ is not None
def _has_package(mod):
"""
:param mod: Module object. Can be any of the multiple types used to
represent a module, we just check for a __package__ attribute.
:type mod: ``Any``
:return: If given module has a valid __package__ attribute.
:rtype: ``bool``
"""
return hasattr(mod, "__package__") and mod.__package__ != ""
def _module_filepath(mod):
"""
:param mod: Module object - either a module itself of its modulefinder
proxy.
:type mod: ``Union[module, modulefinder.module]``
:return: the normalised filepath to a module, or None if it has no __file__
attribute.
:rtype: ``Optional[str]``
"""
if not _has_file(mod):
return None
ret_path = path_utils.fix_home_prefix(os.path.abspath(mod.__file__))
if ret_path.endswith("c"):
return ret_path[:-1]
return ret_path
def _is_testsuite(attr):
"""
:return: If given attribute is a testsuite class.
:rtype: ``bool``
"""
return inspect.isclass(attr) and hasattr(attr, "__testcases__")