Coverage for jsonschema_diff/core/custom_compare/list.py: 60%

154 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-15 18:01 +0000

1import difflib 

2from dataclasses import dataclass 

3from typing import TYPE_CHECKING, Any, Dict, Optional 

4 

5from ..abstraction import Statuses 

6from ..compare_base import Compare 

7from ..property import Property 

8 

9if TYPE_CHECKING: 

10 from ..compare_base import LEGEND_RETURN_TYPE 

11 from ..config import Config 

12 

13 

14@dataclass 

15class CompareListElement: 

16 config: "Config" 

17 my_config: dict 

18 value: Any 

19 status: Statuses 

20 compared_property: Optional[Property] = None 

21 

22 def compare(self) -> None: 

23 # Если элемент списка — словарь, рендерим его как Property 

24 if isinstance(self.value, dict): 

25 # Подбираем old/new под статус элемента 

26 if self.status == Statuses.DELETED: 

27 old_schema = self.value 

28 new_schema = None 

29 elif self.status == Statuses.ADDED: 

30 old_schema = None 

31 new_schema = self.value 

32 else: 

33 # NO_DIFF и прочие — считаем, что значение одинаково слева и справа 

34 old_schema = self.value 

35 new_schema = self.value 

36 

37 self.compared_property = Property( 

38 config=self.config, 

39 name=None, 

40 schema_path=[], 

41 json_path=[], 

42 old_schema=old_schema, 

43 new_schema=new_schema, 

44 ) 

45 self.compared_property.compare() 

46 

47 def replace_penultimate_space(self, tab_level: int, s: str, repl: str) -> str: 

48 position = ( 

49 len(self.config.TAB) * tab_level 

50 ) # 1 + (len(self.config.TAB) * tab_level) - 1 # PREFIX + TAB * COUNT - 1 

51 return s[:position] + repl + s[position:] 

52 

53 def _real_render(self, tab_level: int = 0) -> str: 

54 if self.compared_property is not None: 

55 render_lines, _render_compares = self.compared_property.render(tab_level=tab_level) 

56 

57 return "\n".join(render_lines) 

58 

59 # Иначе — старое поведение (строка/число/пр. выводим как есть) 

60 return f"{self.status.value} {self.config.TAB * tab_level}{self.value}" 

61 

62 def render(self, tab_level: int = 0) -> str: 

63 lines = [ 

64 line 

65 for line in self._real_render(tab_level=tab_level).split("\n") 

66 if line.strip() != "" 

67 ] 

68 # первая строка = START_LINE, последняя = END_LINE, остальное = MIDDLE_LINE 

69 if len(lines) > 1: 

70 prepare = [] 

71 for i, line in enumerate(lines): 

72 if i == 0: 

73 prepare.append( 

74 self.replace_penultimate_space( 

75 tab_level=tab_level, s=line, repl=self.my_config.get("START_LINE", " ") 

76 ) 

77 ) 

78 elif i == len(lines) - 1: 

79 prepare.append( 

80 self.replace_penultimate_space( 

81 tab_level=tab_level, s=line, repl=self.my_config.get("END_LINE", " ") 

82 ) 

83 ) 

84 else: 

85 prepare.append( 

86 self.replace_penultimate_space( 

87 tab_level=tab_level, s=line, repl=self.my_config.get("MIDDLE_LINE", " ") 

88 ) 

89 ) 

90 

91 return "\n".join(prepare) 

92 else: 

93 return self.replace_penultimate_space( 

94 tab_level=tab_level, s=lines[0], repl=self.my_config.get("SINGLE_LINE", " ") 

95 ) 

96 

97 

98class CompareList(Compare): 

99 def __init__(self, *args: Any, **kwargs: Any) -> None: 

100 super().__init__(*args, **kwargs) 

101 self.elements: list[CompareListElement] = [] 

102 self.changed_elements: list[CompareListElement] = [] 

103 

104 # --- вспомогательное: score ∈ [0..1] из Property.calc_diff() 

105 def _score_from_stats(self, stats: Dict[str, int]) -> float: 

106 unchanged = stats.get("NO_DIFF", 0) + stats.get("UNKNOWN", 0) 

107 changed = ( 

108 stats.get("ADDED", 0) + stats.get("DELETED", 0) + stats.get("REPLACED", 0) 

109 ) # модификации не в счет + stats.get("MODIFIED", 0) 

110 denom = unchanged + changed 

111 if denom == 0: 

112 return 1.0 

113 return unchanged / float(denom) 

114 

115 def compare(self) -> Statuses: 

116 super().compare() 

117 

118 if self.status == Statuses.NO_DIFF: 

119 return self.status 

120 elif self.status in [Statuses.ADDED, Statuses.DELETED]: # add 

121 for v in self.value: 

122 element = CompareListElement(self.config, self.my_config, v, self.status) 

123 element.compare() 

124 self.elements.append(element) 

125 self.changed_elements.append(element) 

126 elif self.status == Statuses.REPLACED: # replace or no-diff 

127 # ------------------------------ 

128 # 1) Матричное сопоставление dict↔dict (order-independent) 

129 # ------------------------------ 

130 old_list = self.old_value if isinstance(self.old_value, list) else [self.old_value] 

131 new_list = self.new_value if isinstance(self.new_value, list) else [self.new_value] 

132 

133 old_dicts: list[tuple[int, dict]] = [ 

134 (i, v) for i, v in enumerate(old_list) if isinstance(v, dict) 

135 ] 

136 new_dicts: list[tuple[int, dict]] = [ 

137 (j, v) for j, v in enumerate(new_list) if isinstance(v, dict) 

138 ] 

139 

140 threshold = float(self.my_config.get("DICT_MATCH_THRESHOLD", 0.10)) 

141 

142 matched_old: set[int] = set() 

143 matched_new: set[int] = set() 

144 

145 # сформируем все кандидаты (score, i, j, prop), отсортируем по score по убыванию 

146 candidates: list[tuple[float, int, int, Property]] = [] 

147 for oi, ov in old_dicts: 

148 for nj, nv in new_dicts: 

149 prop = Property( 

150 config=self.config, 

151 name=None, 

152 schema_path=[], 

153 json_path=[], 

154 old_schema=ov, 

155 new_schema=nv, 

156 ) 

157 prop.compare() 

158 score = self._score_from_stats(prop.calc_diff()) 

159 candidates.append((score, oi, nj, prop)) 

160 candidates.sort(key=lambda t: t[0], reverse=True) 

161 

162 # жадный матч по убыванию score с порогом 

163 for score, oi, nj, prop in candidates: 

164 if score < threshold: 

165 break 

166 if oi in matched_old or nj in matched_new: 

167 continue 

168 matched_old.add(oi) 

169 matched_new.add(nj) 

170 

171 # добавляем как один элемент списка с compared_property 

172 # статус NO_DIFF, если проперти без отличий, иначе MODIFIED 

173 status = Statuses.NO_DIFF if prop.status == Statuses.NO_DIFF else Statuses.MODIFIED 

174 el = CompareListElement( 

175 self.config, self.my_config, value=None, status=status, compared_property=prop 

176 ) 

177 self.elements.append(el) 

178 if status != Statuses.NO_DIFF: 

179 self.changed_elements.append(el) 

180 

181 # все старые dict, что не подобрались → DELETED 

182 for oi, ov in old_dicts: 

183 if oi not in matched_old: 

184 el = CompareListElement( 

185 self.config, self.my_config, value=ov, status=Statuses.DELETED 

186 ) 

187 el.compare() 

188 self.elements.append(el) 

189 self.changed_elements.append(el) 

190 

191 # все новые dict, что не подобрались → ADDED 

192 for nj, nv in new_dicts: 

193 if nj not in matched_new: 

194 el = CompareListElement( 

195 self.config, self.my_config, value=nv, status=Statuses.ADDED 

196 ) 

197 el.compare() 

198 self.elements.append(el) 

199 self.changed_elements.append(el) 

200 

201 # ------------------------------ 

202 # 2) Прежняя логика для НЕ-словарей (order-sensitive) — через SequenceMatcher 

203 # ВАЖНО: словари из сравнения исключаем, чтобы не дублировать их как insert/delete 

204 # ------------------------------ 

205 def filter_non_dict(src: list[Any]) -> list[Any]: 

206 return [v for v in src if not isinstance(v, dict)] 

207 

208 old_rest = filter_non_dict(old_list) 

209 new_rest = filter_non_dict(new_list) 

210 

211 def get_str_list(v: Any) -> list[str] | str: 

212 if isinstance(v, list): 

213 return [str(i) for i in v] 

214 return str(v) 

215 

216 real_old_value = get_str_list(old_rest) 

217 real_new_value = get_str_list(new_rest) 

218 

219 sm = difflib.SequenceMatcher(a=real_old_value, b=real_new_value, autojunk=False) 

220 for tag, i1, i2, j1, j2 in sm.get_opcodes(): 

221 

222 def add_element( 

223 source: list[Any], status: Statuses, from_index: int, to_index: int 

224 ) -> None: 

225 is_change = status != Statuses.NO_DIFF 

226 for v in source[from_index:to_index]: 

227 element = CompareListElement(self.config, self.my_config, v, status) 

228 element.compare() 

229 self.elements.append(element) 

230 if is_change: 

231 self.changed_elements.append(element) 

232 

233 match tag: 

234 case "equal": 

235 add_element(old_rest, Statuses.NO_DIFF, i1, i2) 

236 case "delete": 

237 add_element(old_rest, Statuses.DELETED, i1, i2) 

238 case "insert": 

239 add_element(new_rest, Statuses.ADDED, j1, j2) 

240 case "replace": 

241 add_element(old_rest, Statuses.DELETED, i1, i2) 

242 add_element(new_rest, Statuses.ADDED, j1, j2) 

243 case _: 

244 raise ValueError(f"Unknown tag: {tag}") 

245 

246 if len(self.changed_elements) > 0: 

247 self.status = Statuses.MODIFIED 

248 else: 

249 self.status = Statuses.NO_DIFF 

250 else: 

251 raise ValueError("Unsupported keys combination") 

252 

253 return self.status 

254 

255 def is_for_rendering(self) -> bool: 

256 return super().is_for_rendering() or len(self.changed_elements) > 0 

257 

258 def render( 

259 self, tab_level: int = 0, with_path: bool = True, to_crop: tuple[int, int] = (0, 0) 

260 ) -> str: 

261 to_return = self._render_start_line( 

262 tab_level=tab_level, with_path=with_path, to_crop=to_crop 

263 ) 

264 

265 for i in self.elements: 

266 to_return += f"\n{i.render(tab_level + 1)}" 

267 return to_return 

268 

269 @staticmethod 

270 def legend() -> "LEGEND_RETURN_TYPE": 

271 return { 

272 "element": "Arrays\nLists", 

273 "description": ( 

274 "Arrays are always displayed fully, with statuses of all elements " 

275 "separately (left to them).\nIn example:\n" 

276 '["Masha", "Misha", "Vasya"] replace to ["Masha", "Olya", "Misha"]' 

277 ), 

278 "example": { 

279 "old_value": {"some_list": ["Masha", "Misha", "Vasya"]}, 

280 "new_value": {"some_list": ["Masha", "Olya", "Misha"]}, 

281 }, 

282 }