Source code for jsonschema_diff.core.custom_compare.list

import difflib
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Dict, Optional

from ..abstraction import Statuses
from ..compare_base import Compare
from ..property import Property

if TYPE_CHECKING:
    from ..compare_base import LEGEND_RETURN_TYPE
    from ..config import Config


@dataclass
[docs] class CompareListElement:
[docs] config: "Config"
[docs] my_config: dict
[docs] value: Any
[docs] status: Statuses
[docs] compared_property: Optional[Property] = None
[docs] def compare(self) -> None: # Если элемент списка — словарь, рендерим его как Property if isinstance(self.value, dict): # Подбираем old/new под статус элемента if self.status == Statuses.DELETED: old_schema = self.value new_schema = None elif self.status == Statuses.ADDED: old_schema = None new_schema = self.value else: # NO_DIFF и прочие — считаем, что значение одинаково слева и справа old_schema = self.value new_schema = self.value self.compared_property = Property( config=self.config, name=None, schema_path=[], json_path=[], old_schema=old_schema, new_schema=new_schema, ) self.compared_property.compare()
[docs] def replace_penultimate_space(self, tab_level: int, s: str, repl: str) -> str: position = ( len(self.config.TAB) * tab_level ) # 1 + (len(self.config.TAB) * tab_level) - 1 # PREFIX + TAB * COUNT - 1 return s[:position] + repl + s[position:]
def _real_render(self, tab_level: int = 0) -> str: if self.compared_property is not None: render_lines, _render_compares = self.compared_property.render(tab_level=tab_level) return "\n".join(render_lines) # Иначе — старое поведение (строка/число/пр. выводим как есть) return f"{self.status.value} {self.config.TAB * tab_level}{self.value}"
[docs] def render(self, tab_level: int = 0) -> str: lines = [ line for line in self._real_render(tab_level=tab_level).split("\n") if line.strip() != "" ] # первая строка = START_LINE, последняя = END_LINE, остальное = MIDDLE_LINE if len(lines) > 1: prepare = [] for i, line in enumerate(lines): if i == 0: prepare.append( self.replace_penultimate_space( tab_level=tab_level, s=line, repl=self.my_config.get("START_LINE", " ") ) ) elif i == len(lines) - 1: prepare.append( self.replace_penultimate_space( tab_level=tab_level, s=line, repl=self.my_config.get("END_LINE", " ") ) ) else: prepare.append( self.replace_penultimate_space( tab_level=tab_level, s=line, repl=self.my_config.get("MIDDLE_LINE", " ") ) ) return "\n".join(prepare) else: return self.replace_penultimate_space( tab_level=tab_level, s=lines[0], repl=self.my_config.get("SINGLE_LINE", " ") )
[docs] class CompareList(Compare): def __init__(self, *args: Any, **kwargs: Any) -> None: super().__init__(*args, **kwargs)
[docs] self.elements: list[CompareListElement] = []
[docs] self.changed_elements: list[CompareListElement] = []
# --- вспомогательное: score ∈ [0..1] из Property.calc_diff() def _score_from_stats(self, stats: Dict[str, int]) -> float: unchanged = stats.get("NO_DIFF", 0) + stats.get("UNKNOWN", 0) changed = ( stats.get("ADDED", 0) + stats.get("DELETED", 0) + stats.get("REPLACED", 0) ) # модификации не в счет + stats.get("MODIFIED", 0) denom = unchanged + changed if denom == 0: return 1.0 return unchanged / float(denom)
[docs] def compare(self) -> Statuses: super().compare() if self.status == Statuses.NO_DIFF: return self.status elif self.status in [Statuses.ADDED, Statuses.DELETED]: # add for v in self.value: element = CompareListElement(self.config, self.my_config, v, self.status) element.compare() self.elements.append(element) self.changed_elements.append(element) elif self.status == Statuses.REPLACED: # replace or no-diff # ------------------------------ # 1) Матричное сопоставление dict↔dict (order-independent) # ------------------------------ old_list = self.old_value if isinstance(self.old_value, list) else [self.old_value] new_list = self.new_value if isinstance(self.new_value, list) else [self.new_value] old_dicts: list[tuple[int, dict]] = [ (i, v) for i, v in enumerate(old_list) if isinstance(v, dict) ] new_dicts: list[tuple[int, dict]] = [ (j, v) for j, v in enumerate(new_list) if isinstance(v, dict) ] threshold = float(self.my_config.get("DICT_MATCH_THRESHOLD", 0.10)) matched_old: set[int] = set() matched_new: set[int] = set() # сформируем все кандидаты (score, i, j, prop), отсортируем по score по убыванию candidates: list[tuple[float, int, int, Property]] = [] for oi, ov in old_dicts: for nj, nv in new_dicts: prop = Property( config=self.config, name=None, schema_path=[], json_path=[], old_schema=ov, new_schema=nv, ) prop.compare() score = self._score_from_stats(prop.calc_diff()) candidates.append((score, oi, nj, prop)) candidates.sort(key=lambda t: t[0], reverse=True) # жадный матч по убыванию score с порогом for score, oi, nj, prop in candidates: if score < threshold: break if oi in matched_old or nj in matched_new: continue matched_old.add(oi) matched_new.add(nj) # добавляем как один элемент списка с compared_property # статус NO_DIFF, если проперти без отличий, иначе MODIFIED status = Statuses.NO_DIFF if prop.status == Statuses.NO_DIFF else Statuses.MODIFIED el = CompareListElement( self.config, self.my_config, value=None, status=status, compared_property=prop ) self.elements.append(el) if status != Statuses.NO_DIFF: self.changed_elements.append(el) # все старые dict, что не подобрались → DELETED for oi, ov in old_dicts: if oi not in matched_old: el = CompareListElement( self.config, self.my_config, value=ov, status=Statuses.DELETED ) el.compare() self.elements.append(el) self.changed_elements.append(el) # все новые dict, что не подобрались → ADDED for nj, nv in new_dicts: if nj not in matched_new: el = CompareListElement( self.config, self.my_config, value=nv, status=Statuses.ADDED ) el.compare() self.elements.append(el) self.changed_elements.append(el) # ------------------------------ # 2) Прежняя логика для НЕ-словарей (order-sensitive) — через SequenceMatcher # ВАЖНО: словари из сравнения исключаем, чтобы не дублировать их как insert/delete # ------------------------------ def filter_non_dict(src: list[Any]) -> list[Any]: return [v for v in src if not isinstance(v, dict)] old_rest = filter_non_dict(old_list) new_rest = filter_non_dict(new_list) def get_str_list(v: Any) -> list[str] | str: if isinstance(v, list): return [str(i) for i in v] return str(v) real_old_value = get_str_list(old_rest) real_new_value = get_str_list(new_rest) sm = difflib.SequenceMatcher(a=real_old_value, b=real_new_value, autojunk=False) for tag, i1, i2, j1, j2 in sm.get_opcodes(): def add_element( source: list[Any], status: Statuses, from_index: int, to_index: int ) -> None: is_change = status != Statuses.NO_DIFF for v in source[from_index:to_index]: element = CompareListElement(self.config, self.my_config, v, status) element.compare() self.elements.append(element) if is_change: self.changed_elements.append(element) match tag: case "equal": add_element(old_rest, Statuses.NO_DIFF, i1, i2) case "delete": add_element(old_rest, Statuses.DELETED, i1, i2) case "insert": add_element(new_rest, Statuses.ADDED, j1, j2) case "replace": add_element(old_rest, Statuses.DELETED, i1, i2) add_element(new_rest, Statuses.ADDED, j1, j2) case _: raise ValueError(f"Unknown tag: {tag}") if len(self.changed_elements) > 0: self.status = Statuses.MODIFIED else: self.status = Statuses.NO_DIFF else: raise ValueError("Unsupported keys combination") return self.status
[docs] def is_for_rendering(self) -> bool: return super().is_for_rendering() or len(self.changed_elements) > 0
[docs] def render( self, tab_level: int = 0, with_path: bool = True, to_crop: tuple[int, int] = (0, 0) ) -> str: to_return = self._render_start_line( tab_level=tab_level, with_path=with_path, to_crop=to_crop ) for i in self.elements: to_return += f"\n{i.render(tab_level + 1)}" return to_return
@staticmethod
[docs] def legend() -> "LEGEND_RETURN_TYPE": return { "element": "Arrays\nLists", "description": ( "Arrays are always displayed fully, with statuses of all elements " "separately (left to them).\nIn example:\n" '["Masha", "Misha", "Vasya"] replace to ["Masha", "Olya", "Misha"]' ), "example": { "old_value": {"some_list": ["Masha", "Misha", "Vasya"]}, "new_value": {"some_list": ["Masha", "Olya", "Misha"]}, }, }