Coverage for jsonschema_diff / core / custom_compare / list.py: 89%
252 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 17:05 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-18 17:05 +0000
1import hashlib
2import json
3from collections import defaultdict, deque
4from dataclasses import dataclass
5from typing import TYPE_CHECKING, Any, Dict, Optional
7from ..abstraction import Statuses
8from ..compare_base import Compare
9from ..property import Property
11if TYPE_CHECKING:
12 from ..compare_base import LEGEND_RETURN_TYPE
13 from ..config import Config
16@dataclass
17class CompareListElement:
18 config: "Config"
19 my_config: dict
20 value: Any
21 status: Statuses
22 compared_property: Optional[Property] = None
24 def compare(self) -> None:
25 # Если элемент списка — словарь, рендерим его как Property
26 if isinstance(self.value, dict):
27 # Подбираем old/new под статус элемента
28 if self.status == Statuses.DELETED:
29 old_schema = self.value
30 new_schema = None
31 elif self.status == Statuses.ADDED:
32 old_schema = None
33 new_schema = self.value
34 else:
35 # NO_DIFF и прочие — считаем, что значение одинаково слева и справа
36 old_schema = self.value
37 new_schema = self.value
39 self.compared_property = Property(
40 config=self.config,
41 name=None,
42 schema_path=[],
43 json_path=[],
44 old_schema=old_schema,
45 new_schema=new_schema,
46 )
47 self.compared_property.compare()
49 def replace_penultimate_space(self, tab_level: int, s: str, repl: str) -> str:
50 position = (
51 len(self.config.TAB) * tab_level
52 ) # 1 + (len(self.config.TAB) * tab_level) - 1 # PREFIX + TAB * COUNT - 1
53 return s[:position] + repl + s[position:]
55 def _raw_pairs(self, tab_level: int = 0) -> list[tuple[Statuses, str]]:
56 if self.compared_property is not None:
57 render_lines, _render_compares = self.compared_property._render_pairs(
58 tab_level=tab_level
59 )
60 return render_lines
62 # Иначе — старое поведение (строка/число/пр. выводим как есть)
63 return [(self.status, f"{self.status.value} {self.config.TAB * tab_level}{self.value}")]
65 def _render_pairs(self, tab_level: int = 0) -> list[tuple[Statuses, str]]:
66 lines = [
67 (status, line)
68 for status, line in self._raw_pairs(tab_level=tab_level)
69 if line.strip() != ""
70 ]
71 if len(lines) <= 0:
72 # В крайне редких случаях, длина списка == 0
73 # мне лень разбираться, так что легализуем
74 return []
76 to_return: list[tuple[Statuses, str]] = []
77 i = 0
78 # Рендерим «рамку» по группам одинаковых статусов.
79 # Так границы не «перетекают» между DELETED/ADDED/NO_DIFF блоками.
80 while i < len(lines):
81 status = lines[i][0]
82 j = i + 1
83 while j < len(lines) and lines[j][0] == status:
84 j += 1
86 group = lines[i:j]
87 if len(group) == 1:
88 to_return.append(
89 (
90 group[0][0],
91 self.replace_penultimate_space(
92 tab_level=tab_level,
93 s=group[0][1],
94 repl=self.my_config.get("SINGLE_LINE", " "),
95 ),
96 )
97 )
98 else:
99 for k, (line_status, line) in enumerate(group):
100 if k == 0:
101 repl = self.my_config.get("START_LINE", " ")
102 elif k == len(group) - 1:
103 repl = self.my_config.get("END_LINE", " ")
104 else:
105 repl = self.my_config.get("MIDDLE_LINE", " ")
107 to_return.append(
108 (
109 line_status,
110 self.replace_penultimate_space(tab_level=tab_level, s=line, repl=repl),
111 )
112 )
114 i = j
116 return to_return
118 def render(self, tab_level: int = 0) -> Optional[str]:
119 lines_with_status = self._render_pairs(tab_level=tab_level)
120 if len(lines_with_status) <= 0:
121 return None
122 return "\n".join([line for _status, line in lines_with_status])
125class CompareList(Compare):
126 def __init__(self, *args: Any, **kwargs: Any) -> None:
127 super().__init__(*args, **kwargs)
128 self.elements: list[CompareListElement] = []
129 self.changed_elements: list[CompareListElement] = []
131 # --- вспомогательное: score ∈ [0..1] из Property.calc_diff()
132 def _score_from_stats(self, stats: Dict[str, int]) -> float:
133 unchanged = stats.get("NO_DIFF", 0) + stats.get("UNKNOWN", 0)
134 changed = (
135 stats.get("ADDED", 0) + stats.get("DELETED", 0) + stats.get("REPLACED", 0)
136 ) # модификации не в счет + stats.get("MODIFIED", 0)
137 denom = unchanged + changed
138 if denom == 0:
139 return 1.0
140 return unchanged / float(denom)
142 @staticmethod
143 def _stable_repr(value: Any) -> str:
144 try:
145 return json.dumps(value, ensure_ascii=True, sort_keys=True, separators=(",", ":"))
146 except TypeError:
147 return str(value)
149 @staticmethod
150 def _scalar_match_key(value: Any) -> str:
151 return f"{type(value).__qualname__}:{CompareList._stable_repr(value)}"
153 @staticmethod
154 def _stable_tie_break(old_repr: str, new_repr: str) -> float:
155 digest = hashlib.sha1(f"{old_repr}|{new_repr}".encode("utf-8")).digest()
156 return (int.from_bytes(digest[:8], byteorder="big", signed=False) / (2**64)) * 1e-9
158 @staticmethod
159 def _hungarian_max(weights: list[list[float]]) -> list[int]:
160 row_count = len(weights)
161 if row_count == 0:
162 return []
164 column_count = len(weights[0])
165 if column_count < row_count:
166 raise ValueError("Hungarian solver expects columns >= rows")
168 max_weight = max(max(row) for row in weights)
169 cost = [[max_weight - w for w in row] for row in weights]
171 u = [0.0] * (row_count + 1)
172 v = [0.0] * (column_count + 1)
173 p = [0] * (column_count + 1)
174 way = [0] * (column_count + 1)
176 for i in range(1, row_count + 1):
177 p[0] = i
178 j0 = 0
179 minv = [float("inf")] * (column_count + 1)
180 used = [False] * (column_count + 1)
182 while True:
183 used[j0] = True
184 i0 = p[j0]
185 delta = float("inf")
186 j1 = 0
188 for j in range(1, column_count + 1):
189 if used[j]:
190 continue
191 cur = cost[i0 - 1][j - 1] - u[i0] - v[j]
192 if cur < minv[j]:
193 minv[j] = cur
194 way[j] = j0
195 if minv[j] < delta:
196 delta = minv[j]
197 j1 = j
199 for j in range(column_count + 1):
200 if used[j]:
201 u[p[j]] += delta
202 v[j] -= delta
203 else:
204 minv[j] -= delta
206 j0 = j1
207 if p[j0] == 0:
208 break
210 while True:
211 j1 = way[j0]
212 p[j0] = p[j1]
213 j0 = j1
214 if j0 == 0:
215 break
217 assignment = [-1] * row_count
218 for j in range(1, column_count + 1):
219 if p[j] != 0:
220 assignment[p[j] - 1] = j - 1
222 return assignment
224 def compare(self) -> Statuses:
225 super().compare()
227 if self.status == Statuses.NO_DIFF:
228 return self.status
229 elif self.status in [Statuses.ADDED, Statuses.DELETED]: # add
230 for v in self.value:
231 element = CompareListElement(self.config, self.my_config, v, self.status)
232 element.compare()
233 self.elements.append(element)
234 self.changed_elements.append(element)
235 elif self.status == Statuses.REPLACED: # replace or no-diff
236 # ------------------------------
237 # 1) Матричное сопоставление dict↔dict (order-independent)
238 # ------------------------------
239 old_list = self.old_value if isinstance(self.old_value, list) else [self.old_value]
240 new_list = self.new_value if isinstance(self.new_value, list) else [self.new_value]
242 old_dicts: list[tuple[int, dict]] = [
243 (i, v) for i, v in enumerate(old_list) if isinstance(v, dict)
244 ]
245 new_dicts: list[tuple[int, dict]] = [
246 (j, v) for j, v in enumerate(new_list) if isinstance(v, dict)
247 ]
249 threshold = float(self.my_config.get("DICT_MATCH_THRESHOLD", 0.10))
251 matched_old: set[int] = set()
252 matched_new: set[int] = set()
254 if len(old_dicts) > 0 and len(new_dicts) > 0:
255 row_count = len(old_dicts)
256 real_column_count = len(new_dicts)
258 old_repr = [self._stable_repr(item[1]) for item in old_dicts]
259 new_repr = [self._stable_repr(item[1]) for item in new_dicts]
261 # Добавляем "dummy" колонки (unmatched old), чтобы не форсировать плохие пары.
262 score_matrix: list[list[float]] = [
263 [0.0 for _ in range(real_column_count + row_count)] for _ in range(row_count)
264 ]
265 properties: dict[tuple[int, int], tuple[float, Property]] = {}
267 disallowed_score = -1_000_000.0
268 for old_row, (_oi, ov) in enumerate(old_dicts):
269 for new_col, (_nj, nv) in enumerate(new_dicts):
270 prop = Property(
271 config=self.config,
272 name=None,
273 schema_path=[],
274 json_path=[],
275 old_schema=ov,
276 new_schema=nv,
277 )
278 prop.compare()
279 score = self._score_from_stats(prop.calc_diff())
280 properties[(old_row, new_col)] = (score, prop)
282 if score >= threshold:
283 score_matrix[old_row][new_col] = score + self._stable_tie_break(
284 old_repr[old_row],
285 new_repr[new_col],
286 )
287 else:
288 score_matrix[old_row][new_col] = disallowed_score
290 assignment = self._hungarian_max(score_matrix)
291 matched_by_old_row: dict[int, tuple[int, Property]] = {}
293 for old_row, matched_col in enumerate(assignment):
294 if matched_col < 0 or matched_col >= real_column_count:
295 continue
297 score, prop = properties[(old_row, matched_col)]
298 if score >= threshold:
299 matched_by_old_row[old_row] = (matched_col, prop)
301 for old_row, (oi, _ov) in enumerate(old_dicts):
302 pair = matched_by_old_row.get(old_row)
303 if pair is None:
304 continue
306 matched_col, prop = pair
307 nj, _nv = new_dicts[matched_col]
309 matched_old.add(oi)
310 matched_new.add(nj)
312 # добавляем как один элемент списка с compared_property
313 # статус NO_DIFF, если проперти без отличий, иначе MODIFIED
314 status = (
315 Statuses.NO_DIFF if prop.status == Statuses.NO_DIFF else Statuses.MODIFIED
316 )
317 el = CompareListElement(
318 self.config,
319 self.my_config,
320 value=None,
321 status=status,
322 compared_property=prop,
323 )
324 self.elements.append(el)
325 if status != Statuses.NO_DIFF:
326 self.changed_elements.append(el)
328 # все старые dict, что не подобрались → DELETED
329 for oi, ov in old_dicts:
330 if oi not in matched_old:
331 el = CompareListElement(
332 self.config, self.my_config, value=ov, status=Statuses.DELETED
333 )
334 el.compare()
335 self.elements.append(el)
336 self.changed_elements.append(el)
338 # все новые dict, что не подобрались → ADDED
339 for nj, nv in new_dicts:
340 if nj not in matched_new:
341 el = CompareListElement(
342 self.config, self.my_config, value=nv, status=Statuses.ADDED
343 )
344 el.compare()
345 self.elements.append(el)
346 self.changed_elements.append(el)
348 # ------------------------------
349 # 2) НЕ-словари: order-insensitive multiset diff
350 # ВАЖНО: словари из сравнения исключаем, чтобы не дублировать их как insert/delete
351 # ------------------------------
352 def filter_non_dict(src: list[Any]) -> list[Any]:
353 return [v for v in src if not isinstance(v, dict)]
355 old_rest = filter_non_dict(old_list)
356 new_rest = filter_non_dict(new_list)
358 old_pool: dict[str, deque[int]] = defaultdict(deque)
359 for old_idx, old_value in enumerate(old_rest):
360 old_pool[self._scalar_match_key(old_value)].append(old_idx)
362 matched_old_indices: set[int] = set()
364 def add_element(value: Any, status: Statuses) -> None:
365 element = CompareListElement(self.config, self.my_config, value, status)
366 element.compare()
367 self.elements.append(element)
368 if status != Statuses.NO_DIFF:
369 self.changed_elements.append(element)
371 # Рендерим в порядке new: совпавшие элементы считаем NO_DIFF, "лишние" справа — ADDED.
372 for new_value in new_rest:
373 key = self._scalar_match_key(new_value)
374 bucket = old_pool.get(key)
375 if bucket is not None and len(bucket) > 0:
376 old_idx = bucket.popleft()
377 matched_old_indices.add(old_idx)
378 add_element(old_rest[old_idx], Statuses.NO_DIFF)
379 else:
380 add_element(new_value, Statuses.ADDED)
382 # Все непокрытые элементы old — DELETED (в старом порядке).
383 for old_idx, old_value in enumerate(old_rest):
384 if old_idx not in matched_old_indices:
385 add_element(old_value, Statuses.DELETED)
387 if len(self.changed_elements) > 0:
388 self.status = Statuses.MODIFIED
389 else:
390 self.status = Statuses.NO_DIFF
391 else:
392 raise ValueError("Unsupported keys combination")
394 return self.status
396 def is_for_rendering(self) -> bool:
397 return super().is_for_rendering() or len(self.changed_elements) > 0
399 def render(
400 self, tab_level: int = 0, with_path: bool = True, to_crop: tuple[int, int] = (0, 0)
401 ) -> str:
402 lines = self._render_pairs(tab_level=tab_level, with_path=with_path, to_crop=to_crop)
403 return "\n".join([line for _status, line in lines])
405 def _render_pairs(
406 self, tab_level: int = 0, with_path: bool = True, to_crop: tuple[int, int] = (0, 0)
407 ) -> list[tuple[Statuses, str]]:
408 to_return: list[tuple[Statuses, str]] = [
409 (
410 self.status,
411 self._render_start_line(tab_level=tab_level, with_path=with_path, to_crop=to_crop),
412 )
413 ]
415 for i in self.elements:
416 to_return += i._render_pairs(tab_level + 1)
417 return to_return
419 @staticmethod
420 def legend() -> "LEGEND_RETURN_TYPE":
421 return {
422 "element": "Arrays\nLists",
423 "description": (
424 "Arrays are always displayed fully, with statuses of all elements "
425 "separately (left to them).\nIn example:\n"
426 '["Masha", "Misha", "Vasya"] replace to ["Masha", "Olya", "Misha"]'
427 ),
428 "example": {
429 "old_value": {"some_list": ["Masha", "Misha", "Vasya"]},
430 "new_value": {"some_list": ["Masha", "Olya", "Misha"]},
431 },
432 }