Coverage for jsonschema_diff / core / custom_compare / list.py: 61%

158 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-11-26 14:58 +0000

1import difflib 

2from dataclasses import dataclass 

3from typing import TYPE_CHECKING, Any, Dict, Optional 

4 

5from ..abstraction import Statuses 

6from ..compare_base import Compare 

7from ..property import Property 

8 

9if TYPE_CHECKING: 

10 from ..compare_base import LEGEND_RETURN_TYPE 

11 from ..config import Config 

12 

13 

14@dataclass 

15class CompareListElement: 

16 config: "Config" 

17 my_config: dict 

18 value: Any 

19 status: Statuses 

20 compared_property: Optional[Property] = None 

21 

22 def compare(self) -> None: 

23 # Если элемент списка — словарь, рендерим его как Property 

24 if isinstance(self.value, dict): 

25 # Подбираем old/new под статус элемента 

26 if self.status == Statuses.DELETED: 

27 old_schema = self.value 

28 new_schema = None 

29 elif self.status == Statuses.ADDED: 

30 old_schema = None 

31 new_schema = self.value 

32 else: 

33 # NO_DIFF и прочие — считаем, что значение одинаково слева и справа 

34 old_schema = self.value 

35 new_schema = self.value 

36 

37 self.compared_property = Property( 

38 config=self.config, 

39 name=None, 

40 schema_path=[], 

41 json_path=[], 

42 old_schema=old_schema, 

43 new_schema=new_schema, 

44 ) 

45 self.compared_property.compare() 

46 

47 def replace_penultimate_space(self, tab_level: int, s: str, repl: str) -> str: 

48 position = ( 

49 len(self.config.TAB) * tab_level 

50 ) # 1 + (len(self.config.TAB) * tab_level) - 1 # PREFIX + TAB * COUNT - 1 

51 return s[:position] + repl + s[position:] 

52 

53 def _real_render(self, tab_level: int = 0) -> str: 

54 if self.compared_property is not None: 

55 render_lines, _render_compares = self.compared_property.render(tab_level=tab_level) 

56 

57 return "\n".join(render_lines) 

58 

59 # Иначе — старое поведение (строка/число/пр. выводим как есть) 

60 return f"{self.status.value} {self.config.TAB * tab_level}{self.value}" 

61 

62 def render(self, tab_level: int = 0) -> Optional[str]: 

63 lines = [ 

64 line 

65 for line in self._real_render(tab_level=tab_level).split("\n") 

66 if line.strip() != "" 

67 ] 

68 # первая строка = START_LINE, последняя = END_LINE, остальное = MIDDLE_LINE 

69 if len(lines) > 1: 

70 prepare = [] 

71 for i, line in enumerate(lines): 

72 if i == 0: 

73 prepare.append( 

74 self.replace_penultimate_space( 

75 tab_level=tab_level, s=line, repl=self.my_config.get("START_LINE", " ") 

76 ) 

77 ) 

78 elif i == len(lines) - 1: 

79 prepare.append( 

80 self.replace_penultimate_space( 

81 tab_level=tab_level, s=line, repl=self.my_config.get("END_LINE", " ") 

82 ) 

83 ) 

84 else: 

85 prepare.append( 

86 self.replace_penultimate_space( 

87 tab_level=tab_level, s=line, repl=self.my_config.get("MIDDLE_LINE", " ") 

88 ) 

89 ) 

90 

91 return "\n".join(prepare) 

92 elif len(lines) == 1: 

93 return self.replace_penultimate_space( 

94 tab_level=tab_level, s=lines[0], repl=self.my_config.get("SINGLE_LINE", " ") 

95 ) 

96 else: 

97 # В крайне редких случаях, длина списка == 0 

98 # мне лень разбираться, так что легализуем 

99 return None 

100 

101 

102class CompareList(Compare): 

103 def __init__(self, *args: Any, **kwargs: Any) -> None: 

104 super().__init__(*args, **kwargs) 

105 self.elements: list[CompareListElement] = [] 

106 self.changed_elements: list[CompareListElement] = [] 

107 

108 # --- вспомогательное: score ∈ [0..1] из Property.calc_diff() 

109 def _score_from_stats(self, stats: Dict[str, int]) -> float: 

110 unchanged = stats.get("NO_DIFF", 0) + stats.get("UNKNOWN", 0) 

111 changed = ( 

112 stats.get("ADDED", 0) + stats.get("DELETED", 0) + stats.get("REPLACED", 0) 

113 ) # модификации не в счет + stats.get("MODIFIED", 0) 

114 denom = unchanged + changed 

115 if denom == 0: 

116 return 1.0 

117 return unchanged / float(denom) 

118 

119 def compare(self) -> Statuses: 

120 super().compare() 

121 

122 if self.status == Statuses.NO_DIFF: 

123 return self.status 

124 elif self.status in [Statuses.ADDED, Statuses.DELETED]: # add 

125 for v in self.value: 

126 element = CompareListElement(self.config, self.my_config, v, self.status) 

127 element.compare() 

128 self.elements.append(element) 

129 self.changed_elements.append(element) 

130 elif self.status == Statuses.REPLACED: # replace or no-diff 

131 # ------------------------------ 

132 # 1) Матричное сопоставление dict↔dict (order-independent) 

133 # ------------------------------ 

134 old_list = self.old_value if isinstance(self.old_value, list) else [self.old_value] 

135 new_list = self.new_value if isinstance(self.new_value, list) else [self.new_value] 

136 

137 old_dicts: list[tuple[int, dict]] = [ 

138 (i, v) for i, v in enumerate(old_list) if isinstance(v, dict) 

139 ] 

140 new_dicts: list[tuple[int, dict]] = [ 

141 (j, v) for j, v in enumerate(new_list) if isinstance(v, dict) 

142 ] 

143 

144 threshold = float(self.my_config.get("DICT_MATCH_THRESHOLD", 0.10)) 

145 

146 matched_old: set[int] = set() 

147 matched_new: set[int] = set() 

148 

149 # сформируем все кандидаты (score, i, j, prop), отсортируем по score по убыванию 

150 candidates: list[tuple[float, int, int, Property]] = [] 

151 for oi, ov in old_dicts: 

152 for nj, nv in new_dicts: 

153 prop = Property( 

154 config=self.config, 

155 name=None, 

156 schema_path=[], 

157 json_path=[], 

158 old_schema=ov, 

159 new_schema=nv, 

160 ) 

161 prop.compare() 

162 score = self._score_from_stats(prop.calc_diff()) 

163 candidates.append((score, oi, nj, prop)) 

164 candidates.sort(key=lambda t: t[0], reverse=True) 

165 

166 # жадный матч по убыванию score с порогом 

167 for score, oi, nj, prop in candidates: 

168 if score < threshold: 

169 break 

170 if oi in matched_old or nj in matched_new: 

171 continue 

172 matched_old.add(oi) 

173 matched_new.add(nj) 

174 

175 # добавляем как один элемент списка с compared_property 

176 # статус NO_DIFF, если проперти без отличий, иначе MODIFIED 

177 status = Statuses.NO_DIFF if prop.status == Statuses.NO_DIFF else Statuses.MODIFIED 

178 el = CompareListElement( 

179 self.config, self.my_config, value=None, status=status, compared_property=prop 

180 ) 

181 self.elements.append(el) 

182 if status != Statuses.NO_DIFF: 

183 self.changed_elements.append(el) 

184 

185 # все старые dict, что не подобрались → DELETED 

186 for oi, ov in old_dicts: 

187 if oi not in matched_old: 

188 el = CompareListElement( 

189 self.config, self.my_config, value=ov, status=Statuses.DELETED 

190 ) 

191 el.compare() 

192 self.elements.append(el) 

193 self.changed_elements.append(el) 

194 

195 # все новые dict, что не подобрались → ADDED 

196 for nj, nv in new_dicts: 

197 if nj not in matched_new: 

198 el = CompareListElement( 

199 self.config, self.my_config, value=nv, status=Statuses.ADDED 

200 ) 

201 el.compare() 

202 self.elements.append(el) 

203 self.changed_elements.append(el) 

204 

205 # ------------------------------ 

206 # 2) Прежняя логика для НЕ-словарей (order-sensitive) — через SequenceMatcher 

207 # ВАЖНО: словари из сравнения исключаем, чтобы не дублировать их как insert/delete 

208 # ------------------------------ 

209 def filter_non_dict(src: list[Any]) -> list[Any]: 

210 return [v for v in src if not isinstance(v, dict)] 

211 

212 old_rest = filter_non_dict(old_list) 

213 new_rest = filter_non_dict(new_list) 

214 

215 def get_str_list(v: Any) -> list[str] | str: 

216 if isinstance(v, list): 

217 return [str(i) for i in v] 

218 return str(v) 

219 

220 real_old_value = get_str_list(old_rest) 

221 real_new_value = get_str_list(new_rest) 

222 

223 sm = difflib.SequenceMatcher(a=real_old_value, b=real_new_value, autojunk=False) 

224 for tag, i1, i2, j1, j2 in sm.get_opcodes(): 

225 

226 def add_element( 

227 source: list[Any], status: Statuses, from_index: int, to_index: int 

228 ) -> None: 

229 is_change = status != Statuses.NO_DIFF 

230 for v in source[from_index:to_index]: 

231 element = CompareListElement(self.config, self.my_config, v, status) 

232 element.compare() 

233 self.elements.append(element) 

234 if is_change: 

235 self.changed_elements.append(element) 

236 

237 match tag: 

238 case "equal": 

239 add_element(old_rest, Statuses.NO_DIFF, i1, i2) 

240 case "delete": 

241 add_element(old_rest, Statuses.DELETED, i1, i2) 

242 case "insert": 

243 add_element(new_rest, Statuses.ADDED, j1, j2) 

244 case "replace": 

245 add_element(old_rest, Statuses.DELETED, i1, i2) 

246 add_element(new_rest, Statuses.ADDED, j1, j2) 

247 case _: 

248 raise ValueError(f"Unknown tag: {tag}") 

249 

250 if len(self.changed_elements) > 0: 

251 self.status = Statuses.MODIFIED 

252 else: 

253 self.status = Statuses.NO_DIFF 

254 else: 

255 raise ValueError("Unsupported keys combination") 

256 

257 return self.status 

258 

259 def is_for_rendering(self) -> bool: 

260 return super().is_for_rendering() or len(self.changed_elements) > 0 

261 

262 def render( 

263 self, tab_level: int = 0, with_path: bool = True, to_crop: tuple[int, int] = (0, 0) 

264 ) -> str: 

265 to_return = self._render_start_line( 

266 tab_level=tab_level, with_path=with_path, to_crop=to_crop 

267 ) 

268 

269 for i in self.elements: 

270 to_i_render = i.render(tab_level + 1) 

271 if to_i_render: 

272 to_return += f"\n{to_i_render}" 

273 return to_return 

274 

275 @staticmethod 

276 def legend() -> "LEGEND_RETURN_TYPE": 

277 return { 

278 "element": "Arrays\nLists", 

279 "description": ( 

280 "Arrays are always displayed fully, with statuses of all elements " 

281 "separately (left to them).\nIn example:\n" 

282 '["Masha", "Misha", "Vasya"] replace to ["Masha", "Olya", "Misha"]' 

283 ), 

284 "example": { 

285 "old_value": {"some_list": ["Masha", "Misha", "Vasya"]}, 

286 "new_value": {"some_list": ["Masha", "Olya", "Misha"]}, 

287 }, 

288 }