Source code for testplan.common.utils.match

"""
Module of utility types and functions that perform matching.
"""
import os
import re
import sys
import time
import warnings
from contextlib import closing
from typing import Dict, List, Match, Optional, Pattern, Tuple, Union, AnyStr

from . import logger, timing
from .logfile import (
    LogPosition,
    FileLogStream,
    BinaryFileLogStream,
    TextFileLogStream,
    RotatedBinaryFileLogStream,
    RotatedTextFileLogStream,
    MTimeBasedLogRotationStrategy,
)

LOG_MATCHER_INTERVAL = 0.25
LOG_MATCHER_DEFAULT_TIMEOUT = 5.0


[docs]def match_regexps_in_file( logpath: os.PathLike, log_extracts: List[Pattern] ) -> Tuple[bool, Dict[str, str], List[Pattern]]: """ Return a boolean, dict pair indicating whether all log extracts matches, as well as any named groups they might have matched. :param logpath: Log file path. :param log_extracts: Regex list. :return: Match result. """ extracted_values = {} if not os.path.exists(logpath): return False, extracted_values, log_extracts extracts_status = [False for _ in log_extracts] # If log_extracts contain bytes regex, will convert all log_extracts to # bytes regex. if not all([isinstance(x.pattern, str) for x in log_extracts]): read_mode = "rb" _log_extracts = [] for regex in log_extracts: if not isinstance(regex.pattern, bytes): _log_extracts.append(re.compile(regex.pattern.encode("utf-8"))) else: _log_extracts.append(regex) else: read_mode = "r" _log_extracts = log_extracts with open(logpath, read_mode) as log: for line in log: for pos, regexp in enumerate(_log_extracts): match = regexp.match(line) if match: extracted_values.update(match.groupdict()) extracts_status[pos] = True unmatched = [ exc for idx, exc in enumerate(log_extracts) if not extracts_status[idx] ] return all(extracts_status), extracted_values, unmatched
[docs]class LogMatcher(logger.Loggable): """ Single line matcher for text files (usually log files). Once matched, it remembers the line number of the match and subsequent matches are scanned from the current line number. This can be useful when matched lines are not unique for the entire log file. Support simple cases of log rotation """ def __init__( self, log_path: Union[os.PathLike, str], binary: bool = False ): """ :param log_path: Path to the log file. log_path can be a glob then LogMatcher support rotated logfiles, that matching to the glob. :param binary: if True the logfile treated as a binary file, and binary regexps need to be used """ self.log_path = log_path self.binary = binary self.marks = {} self.position: Optional[LogPosition] = None self.log_stream: FileLogStream = self._create_log_stream() # deprecation helpers self.had_transformed = False super(LogMatcher, self).__init__() def _create_log_stream(self): # as rotated logstream should be able to handle non-rotated streams # without overhead we use that by default, but give an envvar to be # able to turn it off. We not yet expose the possibility to select the # logstream through the api. if os.environ.get("TESTPLAN_NO_LOGROTATION_IN_LOGMATCHER") in ( "true", "True", "1", "yes", "Yes", ): return self._create_non_rotated_log_stream() return self._create_rotated_log_stream() def _create_non_rotated_log_stream(self): return ( BinaryFileLogStream(self.log_path) if self.binary else TextFileLogStream(self.log_path) ) def _create_rotated_log_stream(self): return ( RotatedBinaryFileLogStream( self.log_path, MTimeBasedLogRotationStrategy() ) if self.binary else RotatedTextFileLogStream( self.log_path, MTimeBasedLogRotationStrategy() ) ) def _prepare_regexp( self, regexp: Union[Pattern[AnyStr], str, bytes] ) -> Pattern[AnyStr]: if isinstance(regexp, (str, bytes)): regexp = re.compile(regexp) try: if self.binary and isinstance(regexp.pattern, str): raise TypeError( f"LogMatcher is configured for binary match but string regexp was provided. Pattern: {regexp}" ) if not self.binary and isinstance(regexp.pattern, bytes): raise TypeError( f"LogMatcher is configured for text match but bytes regexp was provided. Pattern: {regexp}" ) except TypeError as error: if self.had_transformed: raise self.had_transformed = True self.binary = not self.binary self.log_stream = self._create_log_stream() warnings.warn( f"Incompatible regexp is used. " f"{error} " f"Transforming LogMatcher to {'binary' if self.binary else 'text'} " f"This fallback will be soon removed please update your LogMatcher or regexps to be in sync." ) return regexp
[docs] def seek(self, mark: Optional[str] = None): """ Sets current file position to the specified mark. The mark has to exist. If the mark is None sets current file position to beginning of file. :param mark: Name of the mark. """ if mark is None: self.position = None else: self.position = self.marks[mark]
[docs] def seek_eof(self): """Sets current file position to the current end of file.""" self.position = self.log_stream.flush()
[docs] def seek_sof(self): """Sets current file position to the start of file.""" self.seek()
[docs] def mark(self, name: str): """ Marks the current file position with the specified name. The mark name can later be used to set the file position :param name: Name of the mark. """ self.marks[name] = self.position
[docs] def match( self, regex: Union[str, bytes, Pattern], timeout: float = LOG_MATCHER_DEFAULT_TIMEOUT, raise_on_timeout: bool = True, ) -> Optional[Match]: """ Matches each line in the log file from the current line number to the end of the file. If a match is found the line number is stored and the match is returned. Can be configured to raise an exception if no match is found. :param regex: Regex string or compiled regular expression (``re.compile``) :param timeout: Timeout in seconds to wait for matching process, 0 means matching till EOF and not waiting for new lines, any value greater than 0 means doing matching up to such seconds, defaults to 5 seconds :param raise_on_timeout: To raise TimeoutException or not :return: The regex match or None if no match is found """ match = None start_time = time.time() end_time = start_time + timeout read_mode = "rb" regex = self._prepare_regexp(regex) with closing(self.log_stream) as log: log.seek(self.position) while True: if timeout > 0 and time.time() > end_time: break line = log.readline() if line: match = regex.match(line) if match: break elif timeout > 0: time.sleep(LOG_MATCHER_INTERVAL) else: break self.position = self.log_stream.position if match is not None: self.logger.debug( "Match[%s] found in %.2fs", regex.pattern, time.time() - start_time, ) elif timeout and raise_on_timeout: raise timing.TimeoutException( "No match[{}] found in {}s".format(regex.pattern, timeout) ) return match
[docs] def not_match( self, regex: Union[str, bytes, Pattern], timeout: float = LOG_MATCHER_DEFAULT_TIMEOUT, ): """ Opposite of :py:meth:`~testplan.common.utils.match.LogMatcher.match` which raises an exception if a match is found. Matching is performed from the current file position. If match is not found within timeout period then no exception is raised. :param regex: Regex string or compiled regular expression (``re.compile``) :param timeout: Timeout in seconds to wait for matching process, 0 means should not wait and return whatever matched on initial scan, defaults to 5 seconds """ match = self.match(regex, timeout, raise_on_timeout=False) if match is not None: raise Exception( f"Unexpected match[{regex.pattern}] found in {timeout}s" )
[docs] def match_all( self, regex: Union[str, bytes, Pattern], timeout: float = LOG_MATCHER_DEFAULT_TIMEOUT, raise_on_timeout: bool = True, ) -> List[Match]: """ Similar to match, but returns all occurrences of regex. Can be configured to raise an exception if no match is found. :param regex: Regex string or compiled regular expression (``re.compile``) :param timeout: Timeout in seconds to find out all matches in file, defaults to 5 seconds. :param raise_on_timeout: To raise TimeoutException or not :return: A list of regex matches """ matches = [] end_time = time.time() + timeout try: while timeout >= 0: matches.append( self.match(regex, timeout, raise_on_timeout=True) ) timeout = end_time - time.time() except timing.TimeoutException: if not matches and raise_on_timeout: raise return matches
[docs] def match_between(self, regex, mark1, mark2): """ Matches file against passed in regex. Matching is performed from file position denoted by mark1 and ends before file position denoted by mark2. If a match is not found then None is returned. :param regex: regex string or compiled regular expression (``re.compile``) :type regex: ``Union[str, re.Pattern, bytes]`` :param mark1: mark name of start position (None for beginning of file) :type mark1: ``str`` :param mark2: mark name of end position :type mark2: ``str`` """ match = None read_mode = "rb" regex = self._prepare_regexp(regex) with closing(self.log_stream) as log: log.seek(self.marks[mark1] if mark1 is not None else None) endpos = self.marks[mark2] while not self.log_stream.reached_position(endpos): line = log.readline() if not line: break match = regex.match(line) if match: break return match
[docs] def not_match_between(self, regex, mark1, mark2): """ Opposite of :py:meth:`~testplan.common.utils.match.LogMatcher.match_between` which returns None if a match is not found. Matching is performed from file position denoted by mark1 and ends before file position denoted by mark2. If a match is found then False is returned otherwise True. :param regex: regex string or compiled regular expression (``re.compile``) :type regex: ``Union[str, re.Pattern, bytes]`` :param mark1: mark name of start position (None for beginning of file) :type mark1: ``str`` :param mark2: mark name of end position :type mark2: ``str`` """ return not self.match_between(regex, mark1, mark2)
[docs] def get_between(self, mark1=None, mark2=None): """ Returns the content of the file from the start marker to the end marker. It is possible to omit either marker to receive everything from start to end of file. .. note:: Since markers point to the byte position immediately after match, this function will not return what was matched for mark1, but will return the contents of what was matched for mark2. :param mark1: mark name of start position (None for beginning of file) :type mark1: ``str`` :param mark2: mark name of end position (None for end of file) :type mark2: ``str`` :return: The content between mark1 and mark2. :rtype: ``str`` """ if mark1 is not None and mark2 is not None: if ( self.log_stream.compare(self.marks[mark1], self.marks[mark2]) >= 0 ): raise ValueError( 'Mark "{}" must be present before mark "{}"'.format( mark1, mark2 ) ) with closing(self.log_stream) as log: start_pos = self.marks[mark1] if mark1 is not None else None end_pos = self.marks[mark2] if mark2 is not None else None log.seek(start_pos) if not end_pos: return log.read() lines_between = [] while not self.log_stream.reached_position(end_pos): line = log.readline() lines_between.append(line) separator = b"" if self.binary else "" return separator.join(lines_between)