Coverage for jsonschema_diff/core/custom_compare/list.py: 60%
154 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-15 18:01 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-15 18:01 +0000
1import difflib
2from dataclasses import dataclass
3from typing import TYPE_CHECKING, Any, Dict, Optional
5from ..abstraction import Statuses
6from ..compare_base import Compare
7from ..property import Property
9if TYPE_CHECKING:
10 from ..compare_base import LEGEND_RETURN_TYPE
11 from ..config import Config
14@dataclass
15class CompareListElement:
16 config: "Config"
17 my_config: dict
18 value: Any
19 status: Statuses
20 compared_property: Optional[Property] = None
22 def compare(self) -> None:
23 # Если элемент списка — словарь, рендерим его как Property
24 if isinstance(self.value, dict):
25 # Подбираем old/new под статус элемента
26 if self.status == Statuses.DELETED:
27 old_schema = self.value
28 new_schema = None
29 elif self.status == Statuses.ADDED:
30 old_schema = None
31 new_schema = self.value
32 else:
33 # NO_DIFF и прочие — считаем, что значение одинаково слева и справа
34 old_schema = self.value
35 new_schema = self.value
37 self.compared_property = Property(
38 config=self.config,
39 name=None,
40 schema_path=[],
41 json_path=[],
42 old_schema=old_schema,
43 new_schema=new_schema,
44 )
45 self.compared_property.compare()
47 def replace_penultimate_space(self, tab_level: int, s: str, repl: str) -> str:
48 position = (
49 len(self.config.TAB) * tab_level
50 ) # 1 + (len(self.config.TAB) * tab_level) - 1 # PREFIX + TAB * COUNT - 1
51 return s[:position] + repl + s[position:]
53 def _real_render(self, tab_level: int = 0) -> str:
54 if self.compared_property is not None:
55 render_lines, _render_compares = self.compared_property.render(tab_level=tab_level)
57 return "\n".join(render_lines)
59 # Иначе — старое поведение (строка/число/пр. выводим как есть)
60 return f"{self.status.value} {self.config.TAB * tab_level}{self.value}"
62 def render(self, tab_level: int = 0) -> str:
63 lines = [
64 line
65 for line in self._real_render(tab_level=tab_level).split("\n")
66 if line.strip() != ""
67 ]
68 # первая строка = START_LINE, последняя = END_LINE, остальное = MIDDLE_LINE
69 if len(lines) > 1:
70 prepare = []
71 for i, line in enumerate(lines):
72 if i == 0:
73 prepare.append(
74 self.replace_penultimate_space(
75 tab_level=tab_level, s=line, repl=self.my_config.get("START_LINE", " ")
76 )
77 )
78 elif i == len(lines) - 1:
79 prepare.append(
80 self.replace_penultimate_space(
81 tab_level=tab_level, s=line, repl=self.my_config.get("END_LINE", " ")
82 )
83 )
84 else:
85 prepare.append(
86 self.replace_penultimate_space(
87 tab_level=tab_level, s=line, repl=self.my_config.get("MIDDLE_LINE", " ")
88 )
89 )
91 return "\n".join(prepare)
92 else:
93 return self.replace_penultimate_space(
94 tab_level=tab_level, s=lines[0], repl=self.my_config.get("SINGLE_LINE", " ")
95 )
98class CompareList(Compare):
99 def __init__(self, *args: Any, **kwargs: Any) -> None:
100 super().__init__(*args, **kwargs)
101 self.elements: list[CompareListElement] = []
102 self.changed_elements: list[CompareListElement] = []
104 # --- вспомогательное: score ∈ [0..1] из Property.calc_diff()
105 def _score_from_stats(self, stats: Dict[str, int]) -> float:
106 unchanged = stats.get("NO_DIFF", 0) + stats.get("UNKNOWN", 0)
107 changed = (
108 stats.get("ADDED", 0) + stats.get("DELETED", 0) + stats.get("REPLACED", 0)
109 ) # модификации не в счет + stats.get("MODIFIED", 0)
110 denom = unchanged + changed
111 if denom == 0:
112 return 1.0
113 return unchanged / float(denom)
115 def compare(self) -> Statuses:
116 super().compare()
118 if self.status == Statuses.NO_DIFF:
119 return self.status
120 elif self.status in [Statuses.ADDED, Statuses.DELETED]: # add
121 for v in self.value:
122 element = CompareListElement(self.config, self.my_config, v, self.status)
123 element.compare()
124 self.elements.append(element)
125 self.changed_elements.append(element)
126 elif self.status == Statuses.REPLACED: # replace or no-diff
127 # ------------------------------
128 # 1) Матричное сопоставление dict↔dict (order-independent)
129 # ------------------------------
130 old_list = self.old_value if isinstance(self.old_value, list) else [self.old_value]
131 new_list = self.new_value if isinstance(self.new_value, list) else [self.new_value]
133 old_dicts: list[tuple[int, dict]] = [
134 (i, v) for i, v in enumerate(old_list) if isinstance(v, dict)
135 ]
136 new_dicts: list[tuple[int, dict]] = [
137 (j, v) for j, v in enumerate(new_list) if isinstance(v, dict)
138 ]
140 threshold = float(self.my_config.get("DICT_MATCH_THRESHOLD", 0.10))
142 matched_old: set[int] = set()
143 matched_new: set[int] = set()
145 # сформируем все кандидаты (score, i, j, prop), отсортируем по score по убыванию
146 candidates: list[tuple[float, int, int, Property]] = []
147 for oi, ov in old_dicts:
148 for nj, nv in new_dicts:
149 prop = Property(
150 config=self.config,
151 name=None,
152 schema_path=[],
153 json_path=[],
154 old_schema=ov,
155 new_schema=nv,
156 )
157 prop.compare()
158 score = self._score_from_stats(prop.calc_diff())
159 candidates.append((score, oi, nj, prop))
160 candidates.sort(key=lambda t: t[0], reverse=True)
162 # жадный матч по убыванию score с порогом
163 for score, oi, nj, prop in candidates:
164 if score < threshold:
165 break
166 if oi in matched_old or nj in matched_new:
167 continue
168 matched_old.add(oi)
169 matched_new.add(nj)
171 # добавляем как один элемент списка с compared_property
172 # статус NO_DIFF, если проперти без отличий, иначе MODIFIED
173 status = Statuses.NO_DIFF if prop.status == Statuses.NO_DIFF else Statuses.MODIFIED
174 el = CompareListElement(
175 self.config, self.my_config, value=None, status=status, compared_property=prop
176 )
177 self.elements.append(el)
178 if status != Statuses.NO_DIFF:
179 self.changed_elements.append(el)
181 # все старые dict, что не подобрались → DELETED
182 for oi, ov in old_dicts:
183 if oi not in matched_old:
184 el = CompareListElement(
185 self.config, self.my_config, value=ov, status=Statuses.DELETED
186 )
187 el.compare()
188 self.elements.append(el)
189 self.changed_elements.append(el)
191 # все новые dict, что не подобрались → ADDED
192 for nj, nv in new_dicts:
193 if nj not in matched_new:
194 el = CompareListElement(
195 self.config, self.my_config, value=nv, status=Statuses.ADDED
196 )
197 el.compare()
198 self.elements.append(el)
199 self.changed_elements.append(el)
201 # ------------------------------
202 # 2) Прежняя логика для НЕ-словарей (order-sensitive) — через SequenceMatcher
203 # ВАЖНО: словари из сравнения исключаем, чтобы не дублировать их как insert/delete
204 # ------------------------------
205 def filter_non_dict(src: list[Any]) -> list[Any]:
206 return [v for v in src if not isinstance(v, dict)]
208 old_rest = filter_non_dict(old_list)
209 new_rest = filter_non_dict(new_list)
211 def get_str_list(v: Any) -> list[str] | str:
212 if isinstance(v, list):
213 return [str(i) for i in v]
214 return str(v)
216 real_old_value = get_str_list(old_rest)
217 real_new_value = get_str_list(new_rest)
219 sm = difflib.SequenceMatcher(a=real_old_value, b=real_new_value, autojunk=False)
220 for tag, i1, i2, j1, j2 in sm.get_opcodes():
222 def add_element(
223 source: list[Any], status: Statuses, from_index: int, to_index: int
224 ) -> None:
225 is_change = status != Statuses.NO_DIFF
226 for v in source[from_index:to_index]:
227 element = CompareListElement(self.config, self.my_config, v, status)
228 element.compare()
229 self.elements.append(element)
230 if is_change:
231 self.changed_elements.append(element)
233 match tag:
234 case "equal":
235 add_element(old_rest, Statuses.NO_DIFF, i1, i2)
236 case "delete":
237 add_element(old_rest, Statuses.DELETED, i1, i2)
238 case "insert":
239 add_element(new_rest, Statuses.ADDED, j1, j2)
240 case "replace":
241 add_element(old_rest, Statuses.DELETED, i1, i2)
242 add_element(new_rest, Statuses.ADDED, j1, j2)
243 case _:
244 raise ValueError(f"Unknown tag: {tag}")
246 if len(self.changed_elements) > 0:
247 self.status = Statuses.MODIFIED
248 else:
249 self.status = Statuses.NO_DIFF
250 else:
251 raise ValueError("Unsupported keys combination")
253 return self.status
255 def is_for_rendering(self) -> bool:
256 return super().is_for_rendering() or len(self.changed_elements) > 0
258 def render(
259 self, tab_level: int = 0, with_path: bool = True, to_crop: tuple[int, int] = (0, 0)
260 ) -> str:
261 to_return = self._render_start_line(
262 tab_level=tab_level, with_path=with_path, to_crop=to_crop
263 )
265 for i in self.elements:
266 to_return += f"\n{i.render(tab_level + 1)}"
267 return to_return
269 @staticmethod
270 def legend() -> "LEGEND_RETURN_TYPE":
271 return {
272 "element": "Arrays\nLists",
273 "description": (
274 "Arrays are always displayed fully, with statuses of all elements "
275 "separately (left to them).\nIn example:\n"
276 '["Masha", "Misha", "Vasya"] replace to ["Masha", "Olya", "Misha"]'
277 ),
278 "example": {
279 "old_value": {"some_list": ["Masha", "Misha", "Vasya"]},
280 "new_value": {"some_list": ["Masha", "Olya", "Misha"]},
281 },
282 }