Coverage for jsonschema_diff / core / custom_compare / list.py: 61%
158 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-26 14:58 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-11-26 14:58 +0000
1import difflib
2from dataclasses import dataclass
3from typing import TYPE_CHECKING, Any, Dict, Optional
5from ..abstraction import Statuses
6from ..compare_base import Compare
7from ..property import Property
9if TYPE_CHECKING:
10 from ..compare_base import LEGEND_RETURN_TYPE
11 from ..config import Config
14@dataclass
15class CompareListElement:
16 config: "Config"
17 my_config: dict
18 value: Any
19 status: Statuses
20 compared_property: Optional[Property] = None
22 def compare(self) -> None:
23 # Если элемент списка — словарь, рендерим его как Property
24 if isinstance(self.value, dict):
25 # Подбираем old/new под статус элемента
26 if self.status == Statuses.DELETED:
27 old_schema = self.value
28 new_schema = None
29 elif self.status == Statuses.ADDED:
30 old_schema = None
31 new_schema = self.value
32 else:
33 # NO_DIFF и прочие — считаем, что значение одинаково слева и справа
34 old_schema = self.value
35 new_schema = self.value
37 self.compared_property = Property(
38 config=self.config,
39 name=None,
40 schema_path=[],
41 json_path=[],
42 old_schema=old_schema,
43 new_schema=new_schema,
44 )
45 self.compared_property.compare()
47 def replace_penultimate_space(self, tab_level: int, s: str, repl: str) -> str:
48 position = (
49 len(self.config.TAB) * tab_level
50 ) # 1 + (len(self.config.TAB) * tab_level) - 1 # PREFIX + TAB * COUNT - 1
51 return s[:position] + repl + s[position:]
53 def _real_render(self, tab_level: int = 0) -> str:
54 if self.compared_property is not None:
55 render_lines, _render_compares = self.compared_property.render(tab_level=tab_level)
57 return "\n".join(render_lines)
59 # Иначе — старое поведение (строка/число/пр. выводим как есть)
60 return f"{self.status.value} {self.config.TAB * tab_level}{self.value}"
62 def render(self, tab_level: int = 0) -> Optional[str]:
63 lines = [
64 line
65 for line in self._real_render(tab_level=tab_level).split("\n")
66 if line.strip() != ""
67 ]
68 # первая строка = START_LINE, последняя = END_LINE, остальное = MIDDLE_LINE
69 if len(lines) > 1:
70 prepare = []
71 for i, line in enumerate(lines):
72 if i == 0:
73 prepare.append(
74 self.replace_penultimate_space(
75 tab_level=tab_level, s=line, repl=self.my_config.get("START_LINE", " ")
76 )
77 )
78 elif i == len(lines) - 1:
79 prepare.append(
80 self.replace_penultimate_space(
81 tab_level=tab_level, s=line, repl=self.my_config.get("END_LINE", " ")
82 )
83 )
84 else:
85 prepare.append(
86 self.replace_penultimate_space(
87 tab_level=tab_level, s=line, repl=self.my_config.get("MIDDLE_LINE", " ")
88 )
89 )
91 return "\n".join(prepare)
92 elif len(lines) == 1:
93 return self.replace_penultimate_space(
94 tab_level=tab_level, s=lines[0], repl=self.my_config.get("SINGLE_LINE", " ")
95 )
96 else:
97 # В крайне редких случаях, длина списка == 0
98 # мне лень разбираться, так что легализуем
99 return None
102class CompareList(Compare):
103 def __init__(self, *args: Any, **kwargs: Any) -> None:
104 super().__init__(*args, **kwargs)
105 self.elements: list[CompareListElement] = []
106 self.changed_elements: list[CompareListElement] = []
108 # --- вспомогательное: score ∈ [0..1] из Property.calc_diff()
109 def _score_from_stats(self, stats: Dict[str, int]) -> float:
110 unchanged = stats.get("NO_DIFF", 0) + stats.get("UNKNOWN", 0)
111 changed = (
112 stats.get("ADDED", 0) + stats.get("DELETED", 0) + stats.get("REPLACED", 0)
113 ) # модификации не в счет + stats.get("MODIFIED", 0)
114 denom = unchanged + changed
115 if denom == 0:
116 return 1.0
117 return unchanged / float(denom)
119 def compare(self) -> Statuses:
120 super().compare()
122 if self.status == Statuses.NO_DIFF:
123 return self.status
124 elif self.status in [Statuses.ADDED, Statuses.DELETED]: # add
125 for v in self.value:
126 element = CompareListElement(self.config, self.my_config, v, self.status)
127 element.compare()
128 self.elements.append(element)
129 self.changed_elements.append(element)
130 elif self.status == Statuses.REPLACED: # replace or no-diff
131 # ------------------------------
132 # 1) Матричное сопоставление dict↔dict (order-independent)
133 # ------------------------------
134 old_list = self.old_value if isinstance(self.old_value, list) else [self.old_value]
135 new_list = self.new_value if isinstance(self.new_value, list) else [self.new_value]
137 old_dicts: list[tuple[int, dict]] = [
138 (i, v) for i, v in enumerate(old_list) if isinstance(v, dict)
139 ]
140 new_dicts: list[tuple[int, dict]] = [
141 (j, v) for j, v in enumerate(new_list) if isinstance(v, dict)
142 ]
144 threshold = float(self.my_config.get("DICT_MATCH_THRESHOLD", 0.10))
146 matched_old: set[int] = set()
147 matched_new: set[int] = set()
149 # сформируем все кандидаты (score, i, j, prop), отсортируем по score по убыванию
150 candidates: list[tuple[float, int, int, Property]] = []
151 for oi, ov in old_dicts:
152 for nj, nv in new_dicts:
153 prop = Property(
154 config=self.config,
155 name=None,
156 schema_path=[],
157 json_path=[],
158 old_schema=ov,
159 new_schema=nv,
160 )
161 prop.compare()
162 score = self._score_from_stats(prop.calc_diff())
163 candidates.append((score, oi, nj, prop))
164 candidates.sort(key=lambda t: t[0], reverse=True)
166 # жадный матч по убыванию score с порогом
167 for score, oi, nj, prop in candidates:
168 if score < threshold:
169 break
170 if oi in matched_old or nj in matched_new:
171 continue
172 matched_old.add(oi)
173 matched_new.add(nj)
175 # добавляем как один элемент списка с compared_property
176 # статус NO_DIFF, если проперти без отличий, иначе MODIFIED
177 status = Statuses.NO_DIFF if prop.status == Statuses.NO_DIFF else Statuses.MODIFIED
178 el = CompareListElement(
179 self.config, self.my_config, value=None, status=status, compared_property=prop
180 )
181 self.elements.append(el)
182 if status != Statuses.NO_DIFF:
183 self.changed_elements.append(el)
185 # все старые dict, что не подобрались → DELETED
186 for oi, ov in old_dicts:
187 if oi not in matched_old:
188 el = CompareListElement(
189 self.config, self.my_config, value=ov, status=Statuses.DELETED
190 )
191 el.compare()
192 self.elements.append(el)
193 self.changed_elements.append(el)
195 # все новые dict, что не подобрались → ADDED
196 for nj, nv in new_dicts:
197 if nj not in matched_new:
198 el = CompareListElement(
199 self.config, self.my_config, value=nv, status=Statuses.ADDED
200 )
201 el.compare()
202 self.elements.append(el)
203 self.changed_elements.append(el)
205 # ------------------------------
206 # 2) Прежняя логика для НЕ-словарей (order-sensitive) — через SequenceMatcher
207 # ВАЖНО: словари из сравнения исключаем, чтобы не дублировать их как insert/delete
208 # ------------------------------
209 def filter_non_dict(src: list[Any]) -> list[Any]:
210 return [v for v in src if not isinstance(v, dict)]
212 old_rest = filter_non_dict(old_list)
213 new_rest = filter_non_dict(new_list)
215 def get_str_list(v: Any) -> list[str] | str:
216 if isinstance(v, list):
217 return [str(i) for i in v]
218 return str(v)
220 real_old_value = get_str_list(old_rest)
221 real_new_value = get_str_list(new_rest)
223 sm = difflib.SequenceMatcher(a=real_old_value, b=real_new_value, autojunk=False)
224 for tag, i1, i2, j1, j2 in sm.get_opcodes():
226 def add_element(
227 source: list[Any], status: Statuses, from_index: int, to_index: int
228 ) -> None:
229 is_change = status != Statuses.NO_DIFF
230 for v in source[from_index:to_index]:
231 element = CompareListElement(self.config, self.my_config, v, status)
232 element.compare()
233 self.elements.append(element)
234 if is_change:
235 self.changed_elements.append(element)
237 match tag:
238 case "equal":
239 add_element(old_rest, Statuses.NO_DIFF, i1, i2)
240 case "delete":
241 add_element(old_rest, Statuses.DELETED, i1, i2)
242 case "insert":
243 add_element(new_rest, Statuses.ADDED, j1, j2)
244 case "replace":
245 add_element(old_rest, Statuses.DELETED, i1, i2)
246 add_element(new_rest, Statuses.ADDED, j1, j2)
247 case _:
248 raise ValueError(f"Unknown tag: {tag}")
250 if len(self.changed_elements) > 0:
251 self.status = Statuses.MODIFIED
252 else:
253 self.status = Statuses.NO_DIFF
254 else:
255 raise ValueError("Unsupported keys combination")
257 return self.status
259 def is_for_rendering(self) -> bool:
260 return super().is_for_rendering() or len(self.changed_elements) > 0
262 def render(
263 self, tab_level: int = 0, with_path: bool = True, to_crop: tuple[int, int] = (0, 0)
264 ) -> str:
265 to_return = self._render_start_line(
266 tab_level=tab_level, with_path=with_path, to_crop=to_crop
267 )
269 for i in self.elements:
270 to_i_render = i.render(tab_level + 1)
271 if to_i_render:
272 to_return += f"\n{to_i_render}"
273 return to_return
275 @staticmethod
276 def legend() -> "LEGEND_RETURN_TYPE":
277 return {
278 "element": "Arrays\nLists",
279 "description": (
280 "Arrays are always displayed fully, with statuses of all elements "
281 "separately (left to them).\nIn example:\n"
282 '["Masha", "Misha", "Vasya"] replace to ["Masha", "Olya", "Misha"]'
283 ),
284 "example": {
285 "old_value": {"some_list": ["Masha", "Misha", "Vasya"]},
286 "new_value": {"some_list": ["Masha", "Olya", "Misha"]},
287 },
288 }