Source code for testplan.common.utils.match

"""
Module of utility types and functions that perform matching.
"""
import os
import re
import time
import getpass
import warnings
from contextlib import closing
from typing import (
    AnyStr,
    Dict,
    List,
    Match,
    Optional,
    Pattern,
    Tuple,
    Union,
)
import paramiko
from typing_extensions import TypeAlias

from . import logger, timing
from .logfile import (
    LogPosition,
    RotatedFileLogStream,
    RotatedBinaryFileLogStream,
    RemoteRotatedBinaryFileLogStream,
    RotatedTextFileLogStream,
    RemoteRotatedTextFileLogStream,
    MTimeBasedLogRotationStrategy,
    RemoteMTimeBasedLogRotationStrategy,
)


DEFAULT_PARAMIKO_CONFIG = {"username": getpass.getuser()}

LOG_MATCHER_INTERVAL = 0.25
LOG_MATCHER_DEFAULT_TIMEOUT = 5.0


Regex: TypeAlias = Union[str, bytes, Pattern]


def _format_logline(s):
    if not s:
        return "<EOF>\n"
    if len(s) <= 100:
        return s
    return f"{s[:100]} ... ({len(s) - 100} chars omitted)"


[docs] def match_regexps_in_file( logpath: os.PathLike, log_extracts: List[Pattern] ) -> Tuple[bool, Dict[str, str], List[Pattern]]: """ Return a boolean, dict pair indicating whether all log extracts matches, as well as any named groups they might have matched. :param logpath: Log file path. :param log_extracts: Regex list. :return: Match result. """ extracted_values = {} if not os.path.exists(logpath): return False, extracted_values, log_extracts extracts_status = [False for _ in log_extracts] # If log_extracts contain bytes regex, will convert all log_extracts to # bytes regex. if not all([isinstance(x.pattern, str) for x in log_extracts]): read_mode = "rb" _log_extracts = [] for regex in log_extracts: if not isinstance(regex.pattern, bytes): _log_extracts.append(re.compile(regex.pattern.encode("utf-8"))) else: _log_extracts.append(regex) else: read_mode = "r" _log_extracts = log_extracts with open(logpath, read_mode) as log: for line in log: for pos, regexp in enumerate(_log_extracts): match = regexp.match(line) if match: extracted_values.update(match.groupdict()) extracts_status[pos] = True unmatched = [ exc for idx, exc in enumerate(log_extracts) if not extracts_status[idx] ] return all(extracts_status), extracted_values, unmatched
[docs] class ScopedLogfileMatch: def __init__( self, log_matcher: "LogMatcher", regex: Regex, # to be extended to accept list[Regex] timeout: float = LOG_MATCHER_DEFAULT_TIMEOUT, ): self.regex, self.timeout = regex, timeout self.match_results = [] self.match_failure = None self.log_matcher = log_matcher def __enter__(self): self.log_matcher.seek_eof() self.match_results = [] self.match_failure = None return self def __exit__(self, exc_type, exc_value, traceback): if exc_type is not None: return False m = self.log_matcher.match( self.regex, self.timeout, raise_on_timeout=False ) s_pos = self.log_matcher._debug_info_s[0] e_pos = self.log_matcher._debug_info_e[0] if m is not None: self.match_results.append((m, self.regex, s_pos, e_pos)) else: self.match_failure = (None, self.regex, s_pos, e_pos)
[docs] class LogMatcher(logger.Loggable): """ Single line matcher for text files (usually log files). Once matched, it remembers the line number of the match and subsequent matches are scanned from the current line number. This can be useful when matched lines are not unique for the entire log file. Support simple cases of log rotation """ def __init__( self, log_path: Union[os.PathLike, str], binary: bool = False ): """ :param log_path: Path to the log file. log_path can be a glob then LogMatcher support rotated logfiles, that matching to the glob. :param binary: if True the logfile treated as a binary file, and binary regexps need to be used """ self.log_path = log_path self.binary = binary self.marks = {} self.position: Optional[LogPosition] = None self.log_stream: RotatedFileLogStream = self._create_log_stream() self._debug_info_s = () self._debug_info_e = () # deprecation helpers self.had_transformed = False super(LogMatcher, self).__init__() def _create_log_stream(self) -> RotatedFileLogStream: return ( RotatedBinaryFileLogStream( self.log_path, MTimeBasedLogRotationStrategy() ) if self.binary else RotatedTextFileLogStream( self.log_path, MTimeBasedLogRotationStrategy() ) ) def _prepare_regexp(self, regexp: Regex) -> Pattern[AnyStr]: if isinstance(regexp, (str, bytes)): regexp = re.compile(regexp) elif isinstance(regexp, re.Pattern): pass else: try: import rpyc if isinstance(regexp, rpyc.core.netref.BaseNetref): regexp = re.compile(regexp.pattern, regexp.flags) except ImportError: pass try: if self.binary and isinstance(regexp.pattern, str): raise TypeError( f"LogMatcher is configured for binary match but string regexp was provided. Pattern: {regexp}" ) if not self.binary and isinstance(regexp.pattern, bytes): raise TypeError( f"LogMatcher is configured for text match but bytes regexp was provided. Pattern: {regexp}" ) except TypeError as error: if self.had_transformed: raise self.had_transformed = True self.binary = not self.binary self.log_stream = self._create_log_stream() warnings.warn( f"Incompatible regexp is used. " f"{error} " f"Transforming LogMatcher to {'binary' if self.binary else 'text'} " f"This fallback will be soon removed please update your LogMatcher or regexps to be in sync." ) return regexp
[docs] def seek(self, mark: Optional[str] = None): """ Sets current file position to the specified mark. The mark has to exist. If the mark is None sets current file position to beginning of file. :param mark: Name of the mark. """ if mark is None: self.position = None else: self.position = self.marks[mark]
[docs] def seek_eof(self): """Sets current file position to the current end of file.""" self.position = self.log_stream.flush()
[docs] def seek_sof(self): """Sets current file position to the start of file.""" self.seek()
[docs] def mark(self, name: str): """ Marks the current file position with the specified name. The mark name can later be used to set the file position :param name: Name of the mark. """ self.marks[name] = self.position
def _match( self, regex: Pattern[AnyStr], timeout: float, ) -> Optional[Match]: """ Base block for ``match``, ``not_match`` & ``match_all``, as well as certain ``LogfileNamespace`` assertions. :param regex: Checked regular expression :param timeout: Timeout in seconds to wait for matching process, 0 means matching till EOF and not waiting for new lines, any value greater than 0 means doing matching up to such seconds, defaults to 5 seconds :return: The regex match or None if no match is found """ match = None start_time = time.time() end_time = start_time + timeout with closing(self.log_stream) as log: log.seek(self.position) non_eof = "" while True: line = log.readline() if self._debug_info_s is None: self._debug_info_s = ( str(self.position) if self.position is not None else "<BOF>", start_time, _format_logline(line), ) if line: non_eof = line match = regex.match(line) if match: break elif timeout > 0: if time.time() > end_time: break time.sleep(LOG_MATCHER_INTERVAL) else: break self.position = self.log_stream.position if self._debug_info_e is None: self._debug_info_e = ( str(self.position), time.time(), _format_logline(non_eof), ) return match
[docs] def match( self, regex: Regex, timeout: float = LOG_MATCHER_DEFAULT_TIMEOUT, raise_on_timeout: bool = True, ) -> Optional[Match]: """ Matches each line in the log file from the current line number to the end of the file. If a match is found the line number is stored and the match is returned. By default an exception is raised if no match is found. :param regex: Regex string or compiled regular expression (``re.compile``) :param timeout: Timeout in seconds to wait for matching process, 0 means matching till EOF and not waiting for new lines, any value greater than 0 means doing matching up to such seconds, defaults to 5 seconds :param raise_on_timeout: To raise TimeoutException or not :return: The regex match or None if no match is found """ self._debug_info_s = None self._debug_info_e = None regex = self._prepare_regexp(regex) m = self._match(regex, timeout=timeout) if m is None: self.logger.debug( "%s: no expected match[%s] found,\nsearch starting from %s (around %s), " "where first line seen as:\n%s" "and ending at %s (around %s), where last line seen as:\n%s", self, regex.pattern, *self._debug_info_s, *self._debug_info_e, ) if timeout and raise_on_timeout: raise timing.TimeoutException( "No match[%s] found in %.2fs.", regex.pattern, timeout ) return m
[docs] def not_match( self, regex: Regex, timeout: float = LOG_MATCHER_DEFAULT_TIMEOUT, ): """ Opposite of :py:meth:`~testplan.common.utils.match.LogMatcher.match` which raises an exception if a match is found. Matching is performed from the current file position. If match is not found within timeout period then no exception is raised. :param regex: Regex string or compiled regular expression (``re.compile``) :param timeout: Timeout in seconds to wait for matching process, 0 means should not wait and return whatever matched on initial scan, defaults to 5 seconds """ self._debug_info_s = None self._debug_info_e = None regex = self._prepare_regexp(regex) m = self._match(regex, timeout) if m is not None: self.logger.debug( "%s: unexpected match[%s] found,\nsearch starting from %s (around %s), " "where first line seen as:\n%s" "and ending at %s (around %s), where last line seen as:\n%s", self, regex.pattern, *self._debug_info_s, *self._debug_info_e, ) raise Exception( f"Unexpected match[{regex.pattern}] found in {timeout}s" )
[docs] def match_all( self, regex: Regex, timeout: float = LOG_MATCHER_DEFAULT_TIMEOUT, raise_on_timeout: bool = True, ) -> List[Match]: """ Similar to match, but returns all occurrences of regex. By default an exception is raised if no match is found. :param regex: Regex string or compiled regular expression (``re.compile``) :param timeout: Timeout in seconds to find out all matches in file, defaults to 5 seconds. :param raise_on_timeout: To raise TimeoutException or not :return: A list of regex matches """ matches = [] end_time = time.time() + timeout self._debug_info_s = None regex = self._prepare_regexp(regex) while True: if timeout == 0: t = 0 else: t = end_time - time.time() if t <= 0: break self._debug_info_e = None m = self._match(regex, t) if m is not None: matches.append(m) else: break if not matches: self.logger.debug( "%s: no expected match[%s] found,\nsearch starting from %s (around %s), " "where first line seen as:\n%s" "and ending at %s (around %s), where last line seen as:\n%s", self, regex.pattern, *self._debug_info_s, *self._debug_info_e, ) if timeout and raise_on_timeout: raise timing.TimeoutException( "No match[%s] found in %.2fs.", regex.pattern, timeout ) return matches
[docs] def match_between(self, regex: Regex, mark1: str, mark2: str): """ Matches file against passed in regex. Matching is performed from file position denoted by mark1 and ends before file position denoted by mark2. If a match is not found then None is returned. :param regex: regex string or compiled regular expression (``re.compile``) :param mark1: mark name of start position (None for beginning of file) :param mark2: mark name of end position :return: The regex match or None if no match is found """ match = None regex = self._prepare_regexp(regex) with closing(self.log_stream) as log: log.seek(self.marks[mark1] if mark1 is not None else None) endpos = self.marks[mark2] while not self.log_stream.reached_position(endpos): line = log.readline() if not line: break match = regex.match(line) if match: break return match
[docs] def not_match_between(self, regex: Regex, mark1: str, mark2: str): """ Opposite of :py:meth:`~testplan.common.utils.match.LogMatcher.match_between` which returns None if a match is not found. Matching is performed from file position denoted by mark1 and ends before file position denoted by mark2. If a match is found then False is returned otherwise True. :param regex: regex string or compiled regular expression (``re.compile``) :param mark1: mark name of start position (None for beginning of file) :param mark2: mark name of end position """ return not self.match_between(regex, mark1, mark2)
[docs] def get_between(self, mark1=None, mark2=None): """ Returns the content of the file from the start marker to the end marker. It is possible to omit either marker to receive everything from start to end of file. .. note:: Since markers point to the byte position immediately after match, this function will not return what was matched for mark1, but will return the contents of what was matched for mark2. :param mark1: mark name of start position (None for beginning of file) :type mark1: ``str`` :param mark2: mark name of end position (None for end of file) :type mark2: ``str`` :return: The content between mark1 and mark2. :rtype: ``str`` """ if mark1 is not None and mark2 is not None: if ( self.log_stream.compare(self.marks[mark1], self.marks[mark2]) >= 0 ): raise ValueError( 'Mark "{}" must be present before mark "{}"'.format( mark1, mark2 ) ) with closing(self.log_stream) as log: start_pos = self.marks[mark1] if mark1 is not None else None end_pos = self.marks[mark2] if mark2 is not None else None log.seek(start_pos) if not end_pos: return log.read() lines_between = [] while not self.log_stream.reached_position(end_pos): line = log.readline() lines_between.append(line) separator = b"" if self.binary else "" return separator.join(lines_between)
[docs] def expect( self, regex: Regex, timeout: float = LOG_MATCHER_DEFAULT_TIMEOUT, ): """ Context manager as a composite of :py:meth:`~testplan.common.utils.match.LogMatcher.seek_eof` and :py:meth:`~testplan.common.utils.match.LogMatcher.match`. On entering seeking to log stream EOF, on exiting doing log matching, as expected pattern should be (indirectly) produced by context manager body. :param regex: Regex string or compiled regular expression. :param timeout: Timeout in seconds as a float for regex matching. """ return ScopedLogfileMatch( log_matcher=self, regex=regex, timeout=timeout, )
def __str__(self) -> str: return f"LogMatcher[{self.log_path}]"
[docs] class RemoteLogMatcher(LogMatcher): """ Extension of LogMatcher for matching patterns in log files on remote hosts. Establishes an SSH connection to the remote host and uses SFTP to access log files for pattern matching operations. Similar to LogMatcher, this class supports single line matching for text/binary files on remote machines. It maintains file position state between operations and supports log rotation. :param host: Hostname or IP address of the remote server :param log_path: Path to the log file on the remote server. Can be a glob pattern to support rotated log files :param binary: If True, the log file is treated as a binary file and binary regexps must be used for matching :param paramiko_config: Dictionary of configuration parameters for paramiko SSH client. Defaults to using the current user for authentication Usage: remote_matcher = RemoteLogMatcher( host='remote-server', log_path='/var/log/application.log', paramiko_config={'username': 'user', 'password': 'pass'} ) # Then use all the same methods as LogMatcher match = remote_matcher.match(r'Error.*') """ def __init__( self, host: str, log_path: Union[os.PathLike, str], binary: bool = False, paramiko_config: Optional[dict] = None, ): self._host = host self._paramiko_config: dict = ( paramiko_config or DEFAULT_PARAMIKO_CONFIG ) self._ssh_client = paramiko.SSHClient() self._ssh_client.set_missing_host_key_policy( paramiko.MissingHostKeyPolicy() ) self._ssh_client.connect(hostname=self._host, **self._paramiko_config) self._sftp_client = self._ssh_client.open_sftp() super().__init__(log_path, binary) def _create_log_stream(self) -> RotatedFileLogStream: return ( RemoteRotatedBinaryFileLogStream( ssh_client=self._ssh_client, sftp_client=self._sftp_client, path_pattern=self.log_path, rotation_strategy=RemoteMTimeBasedLogRotationStrategy( self._ssh_client, self._sftp_client ), ) if self.binary else RemoteRotatedTextFileLogStream( ssh_client=self._ssh_client, sftp_client=self._sftp_client, path_pattern=self.log_path, rotation_strategy=RemoteMTimeBasedLogRotationStrategy( self._ssh_client, self._sftp_client ), ) )