"""Conversion utilities."""
import itertools
from typing import Callable, Iterable, List, Optional, Sequence, Tuple, Union
from .comparison import is_match_res
from .reporting import Absent
RecursiveListTuple = List[Union[Tuple, Tuple["RecursiveListTuple"]]]
[docs]
def make_tuple(
value: object,
convert_none: bool = False,
) -> Union[Tuple, object]:
"""
Converts a value into a tuple.
:param value: value to make the tuple out of
:param convert_none: whether to convert None
:return: the value or the value converted to a tuple
"""
if isinstance(value, list):
return tuple(value)
if not isinstance(value, tuple) and (convert_none or value is not None):
return (value,)
return value
[docs]
def sort_and_group(iterable: Iterable, key: Callable) -> List[Tuple]:
"""
Sorts an iterable and groups the items by the given key function.
:param iterable: iterable of items
:param key: key function to sort by
:return: groups of items sorted by key
"""
groups = [
(k, list(g))
for k, g in itertools.groupby(sorted(iterable, key=key), key=key)
]
return groups
[docs]
def nested_groups(
iterable: Iterable,
key_funcs: Sequence[Callable],
) -> RecursiveListTuple:
"""
Creates nested groups from the given ``iterable`` using ``key_funcs``
:param iterable: iterable of items
:param key_funcs: key functions to sort by, applied in a waterfall
:return: recursively nested groups of items sorted by key functions
"""
first, rest = key_funcs[0], key_funcs[1:]
grouping = sort_and_group(iterable, first)
if rest:
return [(key, nested_groups(group, rest)) for key, group in grouping]
else:
return grouping
# Below function was designed to be used when defining types in the
# configuration schema. For example make_iterables([str, ContextVale])
# will return [[str], (str,), [ContextValue], (ContextValue,)].
[docs]
def make_iterables(values: Iterable) -> List[Union[List, Tuple]]:
"""
Create a list of lists and tuples for each of the values.
:param values: an iterable of values
:return: list containing one list and tuple for each value
"""
iterables = []
for value in values:
iterables.append([value])
iterables.append((value,))
return iterables
[docs]
def expand_match_res(
rows: List[Tuple],
level: int = 0,
ignore_key: bool = False,
key_path: List = None,
):
"""
Recursively expands and yields all rows of items to display.
:param rows: comparison results
:param level: recursive parameter for level of nesting
:param ignore_key: recursive parameter for ignoring a key
:param key_path: recursive parameter to build the sequence of keys
:return: rows used in building comparison result table
"""
if key_path is None:
key_path = []
for row in rows:
# While comparing dict value (list type), dict key is ignored, thus
# a special object `Absent` is used as key, which means no key here.
key = row[0] if ignore_key is False else Absent
if key is not Absent: # `None` or empty string can also be used as key
key_path.append(key)
match = row[1]
val = row[2]
# val should be a tuple
if val[0] == 0: # value
yield (tuple(key_path), level, key, match, (val[1], val[2]))
elif is_match_res(val[0]): # ``_rec_compare``d container
yield (tuple(key_path), level, key, match, "")
yield from expand_match_res(
val[1],
level=level + 1,
ignore_key=True if val[0] == 11 else False,
key_path=key_path,
)
elif val[0] in (1, 2): # ``fmt``ed container
yield (tuple(key_path), level, key, match, "")
yield from expand_fmt_res(
val[1],
level=level + 1,
ignore_key=True if val[0] == 1 else False,
key_path=key_path,
match=match,
)
else:
raise ValueError(f"unknown type {val[0]}")
if key is not Absent:
key_path.pop()
[docs]
def expand_fmt_res(
rows: List[Tuple],
level: int,
ignore_key: bool,
key_path: List[str],
match: str,
):
for row in rows:
key = row[0] if ignore_key is False else Absent
if key is not Absent: # `None` or empty string can also be used as key
key_path.append(key)
val = row if ignore_key else row[1]
# val should be a tuple
if val[0] == 0: # value
yield (tuple(key_path), level, key, match, (val[1], val[2]))
elif val[0] in (1, 2): # container
yield (tuple(key_path), level, key, match, "")
yield from expand_fmt_res(
val[1],
level=level + 1,
ignore_key=True if val[0] == 1 else False,
key_path=key_path,
match=match,
)
else:
raise ValueError(f"unknown type {val[0]}")
if key is not Absent:
key_path.pop()
[docs]
def flatten_dict_comparison(comparison: List[Tuple]) -> List[List]:
"""
Flatten the comparison object from dictionary match into a tabular format.
:param comparison: list of comparison results
:return: result table to be used in display
"""
result_table = [] # level, key, result, left, right
left = list(
expand_match_res(map(lambda x: (x[0], x[1], x[2]), comparison))
)
right = list(
expand_match_res(map(lambda x: (x[0], x[1], x[3]), comparison))
)
while left or right:
lpart, rpart = None, None
if left and right:
# NOTE: if the left keypath is longer we entered a nested structure
# on one side only
# if the key is Absent only on left side, then we just insert an
# empty row for visual separation
if (
len(left[0][0]) > len(right[0][0])
or left[0][2] is Absent
and left[0][2] != right[0][2]
):
lpart = left.pop(0)
# NOTE: same as above but from right-hand side perspective
elif (
len(left[0][0]) < len(right[0][0])
or right[0][2] is Absent
and left[0][2] != right[0][2]
):
rpart = right.pop(0)
else:
lpart, rpart = left.pop(0), right.pop(0)
# NOTE: if any of the sides is exhausted we proceed with the other
elif left:
lpart = left.pop(0)
elif right:
rpart = right.pop(0)
level = lpart[1] if lpart else rpart[1]
key = lpart[2] if lpart else rpart[2]
if key is Absent:
level -= 1
# key = '(group)'
status = lpart[3] if lpart else rpart[3]
lval = lpart[4] if lpart else None
rval = rpart[4] if rpart else None
result_table.append(
[level, "" if key is Absent else key, status, lval, rval]
)
while True:
level_decreased = False
prev_level = 0
for idx in range(len(result_table)):
level = result_table[idx][0]
if level > prev_level + 1:
for inner_idx in range(idx, len(result_table)):
if result_table[inner_idx][0] > prev_level:
level_decreased = True
result_table[inner_idx][0] -= 1
else:
break
prev_level = level
if level_decreased is False:
break
return result_table
[docs]
def delta_encode_level(homo):
prev = 0
hetero = []
for r in homo:
level = r[0]
res = r[1:]
diff = level - prev
if diff != 0:
hetero.append(diff)
prev = level
hetero.append(res)
return hetero
[docs]
def delta_decode_level(hetero):
level = 0
homo = []
for r in hetero:
if isinstance(r, int):
level += r
continue
else:
homo.append([level, *r])
return homo