Coverage for jsonschema_diff / core / custom_compare / list.py: 89%

252 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-18 17:05 +0000

1import hashlib 

2import json 

3from collections import defaultdict, deque 

4from dataclasses import dataclass 

5from typing import TYPE_CHECKING, Any, Dict, Optional 

6 

7from ..abstraction import Statuses 

8from ..compare_base import Compare 

9from ..property import Property 

10 

11if TYPE_CHECKING: 

12 from ..compare_base import LEGEND_RETURN_TYPE 

13 from ..config import Config 

14 

15 

16@dataclass 

17class CompareListElement: 

18 config: "Config" 

19 my_config: dict 

20 value: Any 

21 status: Statuses 

22 compared_property: Optional[Property] = None 

23 

24 def compare(self) -> None: 

25 # Если элемент списка — словарь, рендерим его как Property 

26 if isinstance(self.value, dict): 

27 # Подбираем old/new под статус элемента 

28 if self.status == Statuses.DELETED: 

29 old_schema = self.value 

30 new_schema = None 

31 elif self.status == Statuses.ADDED: 

32 old_schema = None 

33 new_schema = self.value 

34 else: 

35 # NO_DIFF и прочие — считаем, что значение одинаково слева и справа 

36 old_schema = self.value 

37 new_schema = self.value 

38 

39 self.compared_property = Property( 

40 config=self.config, 

41 name=None, 

42 schema_path=[], 

43 json_path=[], 

44 old_schema=old_schema, 

45 new_schema=new_schema, 

46 ) 

47 self.compared_property.compare() 

48 

49 def replace_penultimate_space(self, tab_level: int, s: str, repl: str) -> str: 

50 position = ( 

51 len(self.config.TAB) * tab_level 

52 ) # 1 + (len(self.config.TAB) * tab_level) - 1 # PREFIX + TAB * COUNT - 1 

53 return s[:position] + repl + s[position:] 

54 

55 def _raw_pairs(self, tab_level: int = 0) -> list[tuple[Statuses, str]]: 

56 if self.compared_property is not None: 

57 render_lines, _render_compares = self.compared_property._render_pairs( 

58 tab_level=tab_level 

59 ) 

60 return render_lines 

61 

62 # Иначе — старое поведение (строка/число/пр. выводим как есть) 

63 return [(self.status, f"{self.status.value} {self.config.TAB * tab_level}{self.value}")] 

64 

65 def _render_pairs(self, tab_level: int = 0) -> list[tuple[Statuses, str]]: 

66 lines = [ 

67 (status, line) 

68 for status, line in self._raw_pairs(tab_level=tab_level) 

69 if line.strip() != "" 

70 ] 

71 if len(lines) <= 0: 

72 # В крайне редких случаях, длина списка == 0 

73 # мне лень разбираться, так что легализуем 

74 return [] 

75 

76 to_return: list[tuple[Statuses, str]] = [] 

77 i = 0 

78 # Рендерим «рамку» по группам одинаковых статусов. 

79 # Так границы не «перетекают» между DELETED/ADDED/NO_DIFF блоками. 

80 while i < len(lines): 

81 status = lines[i][0] 

82 j = i + 1 

83 while j < len(lines) and lines[j][0] == status: 

84 j += 1 

85 

86 group = lines[i:j] 

87 if len(group) == 1: 

88 to_return.append( 

89 ( 

90 group[0][0], 

91 self.replace_penultimate_space( 

92 tab_level=tab_level, 

93 s=group[0][1], 

94 repl=self.my_config.get("SINGLE_LINE", " "), 

95 ), 

96 ) 

97 ) 

98 else: 

99 for k, (line_status, line) in enumerate(group): 

100 if k == 0: 

101 repl = self.my_config.get("START_LINE", " ") 

102 elif k == len(group) - 1: 

103 repl = self.my_config.get("END_LINE", " ") 

104 else: 

105 repl = self.my_config.get("MIDDLE_LINE", " ") 

106 

107 to_return.append( 

108 ( 

109 line_status, 

110 self.replace_penultimate_space(tab_level=tab_level, s=line, repl=repl), 

111 ) 

112 ) 

113 

114 i = j 

115 

116 return to_return 

117 

118 def render(self, tab_level: int = 0) -> Optional[str]: 

119 lines_with_status = self._render_pairs(tab_level=tab_level) 

120 if len(lines_with_status) <= 0: 

121 return None 

122 return "\n".join([line for _status, line in lines_with_status]) 

123 

124 

125class CompareList(Compare): 

126 def __init__(self, *args: Any, **kwargs: Any) -> None: 

127 super().__init__(*args, **kwargs) 

128 self.elements: list[CompareListElement] = [] 

129 self.changed_elements: list[CompareListElement] = [] 

130 

131 # --- вспомогательное: score ∈ [0..1] из Property.calc_diff() 

132 def _score_from_stats(self, stats: Dict[str, int]) -> float: 

133 unchanged = stats.get("NO_DIFF", 0) + stats.get("UNKNOWN", 0) 

134 changed = ( 

135 stats.get("ADDED", 0) + stats.get("DELETED", 0) + stats.get("REPLACED", 0) 

136 ) # модификации не в счет + stats.get("MODIFIED", 0) 

137 denom = unchanged + changed 

138 if denom == 0: 

139 return 1.0 

140 return unchanged / float(denom) 

141 

142 @staticmethod 

143 def _stable_repr(value: Any) -> str: 

144 try: 

145 return json.dumps(value, ensure_ascii=True, sort_keys=True, separators=(",", ":")) 

146 except TypeError: 

147 return str(value) 

148 

149 @staticmethod 

150 def _scalar_match_key(value: Any) -> str: 

151 return f"{type(value).__qualname__}:{CompareList._stable_repr(value)}" 

152 

153 @staticmethod 

154 def _stable_tie_break(old_repr: str, new_repr: str) -> float: 

155 digest = hashlib.sha1(f"{old_repr}|{new_repr}".encode("utf-8")).digest() 

156 return (int.from_bytes(digest[:8], byteorder="big", signed=False) / (2**64)) * 1e-9 

157 

158 @staticmethod 

159 def _hungarian_max(weights: list[list[float]]) -> list[int]: 

160 row_count = len(weights) 

161 if row_count == 0: 

162 return [] 

163 

164 column_count = len(weights[0]) 

165 if column_count < row_count: 

166 raise ValueError("Hungarian solver expects columns >= rows") 

167 

168 max_weight = max(max(row) for row in weights) 

169 cost = [[max_weight - w for w in row] for row in weights] 

170 

171 u = [0.0] * (row_count + 1) 

172 v = [0.0] * (column_count + 1) 

173 p = [0] * (column_count + 1) 

174 way = [0] * (column_count + 1) 

175 

176 for i in range(1, row_count + 1): 

177 p[0] = i 

178 j0 = 0 

179 minv = [float("inf")] * (column_count + 1) 

180 used = [False] * (column_count + 1) 

181 

182 while True: 

183 used[j0] = True 

184 i0 = p[j0] 

185 delta = float("inf") 

186 j1 = 0 

187 

188 for j in range(1, column_count + 1): 

189 if used[j]: 

190 continue 

191 cur = cost[i0 - 1][j - 1] - u[i0] - v[j] 

192 if cur < minv[j]: 

193 minv[j] = cur 

194 way[j] = j0 

195 if minv[j] < delta: 

196 delta = minv[j] 

197 j1 = j 

198 

199 for j in range(column_count + 1): 

200 if used[j]: 

201 u[p[j]] += delta 

202 v[j] -= delta 

203 else: 

204 minv[j] -= delta 

205 

206 j0 = j1 

207 if p[j0] == 0: 

208 break 

209 

210 while True: 

211 j1 = way[j0] 

212 p[j0] = p[j1] 

213 j0 = j1 

214 if j0 == 0: 

215 break 

216 

217 assignment = [-1] * row_count 

218 for j in range(1, column_count + 1): 

219 if p[j] != 0: 

220 assignment[p[j] - 1] = j - 1 

221 

222 return assignment 

223 

224 def compare(self) -> Statuses: 

225 super().compare() 

226 

227 if self.status == Statuses.NO_DIFF: 

228 return self.status 

229 elif self.status in [Statuses.ADDED, Statuses.DELETED]: # add 

230 for v in self.value: 

231 element = CompareListElement(self.config, self.my_config, v, self.status) 

232 element.compare() 

233 self.elements.append(element) 

234 self.changed_elements.append(element) 

235 elif self.status == Statuses.REPLACED: # replace or no-diff 

236 # ------------------------------ 

237 # 1) Матричное сопоставление dict↔dict (order-independent) 

238 # ------------------------------ 

239 old_list = self.old_value if isinstance(self.old_value, list) else [self.old_value] 

240 new_list = self.new_value if isinstance(self.new_value, list) else [self.new_value] 

241 

242 old_dicts: list[tuple[int, dict]] = [ 

243 (i, v) for i, v in enumerate(old_list) if isinstance(v, dict) 

244 ] 

245 new_dicts: list[tuple[int, dict]] = [ 

246 (j, v) for j, v in enumerate(new_list) if isinstance(v, dict) 

247 ] 

248 

249 threshold = float(self.my_config.get("DICT_MATCH_THRESHOLD", 0.10)) 

250 

251 matched_old: set[int] = set() 

252 matched_new: set[int] = set() 

253 

254 if len(old_dicts) > 0 and len(new_dicts) > 0: 

255 row_count = len(old_dicts) 

256 real_column_count = len(new_dicts) 

257 

258 old_repr = [self._stable_repr(item[1]) for item in old_dicts] 

259 new_repr = [self._stable_repr(item[1]) for item in new_dicts] 

260 

261 # Добавляем "dummy" колонки (unmatched old), чтобы не форсировать плохие пары. 

262 score_matrix: list[list[float]] = [ 

263 [0.0 for _ in range(real_column_count + row_count)] for _ in range(row_count) 

264 ] 

265 properties: dict[tuple[int, int], tuple[float, Property]] = {} 

266 

267 disallowed_score = -1_000_000.0 

268 for old_row, (_oi, ov) in enumerate(old_dicts): 

269 for new_col, (_nj, nv) in enumerate(new_dicts): 

270 prop = Property( 

271 config=self.config, 

272 name=None, 

273 schema_path=[], 

274 json_path=[], 

275 old_schema=ov, 

276 new_schema=nv, 

277 ) 

278 prop.compare() 

279 score = self._score_from_stats(prop.calc_diff()) 

280 properties[(old_row, new_col)] = (score, prop) 

281 

282 if score >= threshold: 

283 score_matrix[old_row][new_col] = score + self._stable_tie_break( 

284 old_repr[old_row], 

285 new_repr[new_col], 

286 ) 

287 else: 

288 score_matrix[old_row][new_col] = disallowed_score 

289 

290 assignment = self._hungarian_max(score_matrix) 

291 matched_by_old_row: dict[int, tuple[int, Property]] = {} 

292 

293 for old_row, matched_col in enumerate(assignment): 

294 if matched_col < 0 or matched_col >= real_column_count: 

295 continue 

296 

297 score, prop = properties[(old_row, matched_col)] 

298 if score >= threshold: 

299 matched_by_old_row[old_row] = (matched_col, prop) 

300 

301 for old_row, (oi, _ov) in enumerate(old_dicts): 

302 pair = matched_by_old_row.get(old_row) 

303 if pair is None: 

304 continue 

305 

306 matched_col, prop = pair 

307 nj, _nv = new_dicts[matched_col] 

308 

309 matched_old.add(oi) 

310 matched_new.add(nj) 

311 

312 # добавляем как один элемент списка с compared_property 

313 # статус NO_DIFF, если проперти без отличий, иначе MODIFIED 

314 status = ( 

315 Statuses.NO_DIFF if prop.status == Statuses.NO_DIFF else Statuses.MODIFIED 

316 ) 

317 el = CompareListElement( 

318 self.config, 

319 self.my_config, 

320 value=None, 

321 status=status, 

322 compared_property=prop, 

323 ) 

324 self.elements.append(el) 

325 if status != Statuses.NO_DIFF: 

326 self.changed_elements.append(el) 

327 

328 # все старые dict, что не подобрались → DELETED 

329 for oi, ov in old_dicts: 

330 if oi not in matched_old: 

331 el = CompareListElement( 

332 self.config, self.my_config, value=ov, status=Statuses.DELETED 

333 ) 

334 el.compare() 

335 self.elements.append(el) 

336 self.changed_elements.append(el) 

337 

338 # все новые dict, что не подобрались → ADDED 

339 for nj, nv in new_dicts: 

340 if nj not in matched_new: 

341 el = CompareListElement( 

342 self.config, self.my_config, value=nv, status=Statuses.ADDED 

343 ) 

344 el.compare() 

345 self.elements.append(el) 

346 self.changed_elements.append(el) 

347 

348 # ------------------------------ 

349 # 2) НЕ-словари: order-insensitive multiset diff 

350 # ВАЖНО: словари из сравнения исключаем, чтобы не дублировать их как insert/delete 

351 # ------------------------------ 

352 def filter_non_dict(src: list[Any]) -> list[Any]: 

353 return [v for v in src if not isinstance(v, dict)] 

354 

355 old_rest = filter_non_dict(old_list) 

356 new_rest = filter_non_dict(new_list) 

357 

358 old_pool: dict[str, deque[int]] = defaultdict(deque) 

359 for old_idx, old_value in enumerate(old_rest): 

360 old_pool[self._scalar_match_key(old_value)].append(old_idx) 

361 

362 matched_old_indices: set[int] = set() 

363 

364 def add_element(value: Any, status: Statuses) -> None: 

365 element = CompareListElement(self.config, self.my_config, value, status) 

366 element.compare() 

367 self.elements.append(element) 

368 if status != Statuses.NO_DIFF: 

369 self.changed_elements.append(element) 

370 

371 # Рендерим в порядке new: совпавшие элементы считаем NO_DIFF, "лишние" справа — ADDED. 

372 for new_value in new_rest: 

373 key = self._scalar_match_key(new_value) 

374 bucket = old_pool.get(key) 

375 if bucket is not None and len(bucket) > 0: 

376 old_idx = bucket.popleft() 

377 matched_old_indices.add(old_idx) 

378 add_element(old_rest[old_idx], Statuses.NO_DIFF) 

379 else: 

380 add_element(new_value, Statuses.ADDED) 

381 

382 # Все непокрытые элементы old — DELETED (в старом порядке). 

383 for old_idx, old_value in enumerate(old_rest): 

384 if old_idx not in matched_old_indices: 

385 add_element(old_value, Statuses.DELETED) 

386 

387 if len(self.changed_elements) > 0: 

388 self.status = Statuses.MODIFIED 

389 else: 

390 self.status = Statuses.NO_DIFF 

391 else: 

392 raise ValueError("Unsupported keys combination") 

393 

394 return self.status 

395 

396 def is_for_rendering(self) -> bool: 

397 return super().is_for_rendering() or len(self.changed_elements) > 0 

398 

399 def render( 

400 self, tab_level: int = 0, with_path: bool = True, to_crop: tuple[int, int] = (0, 0) 

401 ) -> str: 

402 lines = self._render_pairs(tab_level=tab_level, with_path=with_path, to_crop=to_crop) 

403 return "\n".join([line for _status, line in lines]) 

404 

405 def _render_pairs( 

406 self, tab_level: int = 0, with_path: bool = True, to_crop: tuple[int, int] = (0, 0) 

407 ) -> list[tuple[Statuses, str]]: 

408 to_return: list[tuple[Statuses, str]] = [ 

409 ( 

410 self.status, 

411 self._render_start_line(tab_level=tab_level, with_path=with_path, to_crop=to_crop), 

412 ) 

413 ] 

414 

415 for i in self.elements: 

416 to_return += i._render_pairs(tab_level + 1) 

417 return to_return 

418 

419 @staticmethod 

420 def legend() -> "LEGEND_RETURN_TYPE": 

421 return { 

422 "element": "Arrays\nLists", 

423 "description": ( 

424 "Arrays are always displayed fully, with statuses of all elements " 

425 "separately (left to them).\nIn example:\n" 

426 '["Masha", "Misha", "Vasya"] replace to ["Masha", "Olya", "Misha"]' 

427 ), 

428 "example": { 

429 "old_value": {"some_list": ["Masha", "Misha", "Vasya"]}, 

430 "new_value": {"some_list": ["Masha", "Olya", "Misha"]}, 

431 }, 

432 }