Coverage for jsonschema_diff/core/property.py: 90%

147 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-15 18:01 +0000

1from typing import TYPE_CHECKING 

2 

3from .abstraction import Statuses, ToCompare 

4from .compare_base import Compare 

5from .tools import CompareRules, LogicCombinerHandler, RenderContextHandler 

6from .tools import RenderTool as RT 

7 

8if TYPE_CHECKING: 

9 from .config import Config 

10 

11 

12class Property: 

13 def __init__( 

14 self, 

15 config: "Config", 

16 schema_path: list[str | int], 

17 json_path: list[str | int], 

18 name: str | int | None, 

19 old_schema: dict | None, 

20 new_schema: dict | None, 

21 ): 

22 self.status: Statuses = Statuses.UNKNOWN 

23 self.parameters: dict[str, "Compare"] = {} 

24 self.propertys: dict[str | int, "Property"] = {} 

25 

26 self.config = config 

27 self.name = name 

28 self.schema_path = schema_path 

29 self.json_path = json_path 

30 

31 self.old_schema = {} if old_schema is None else old_schema 

32 self.new_schema = {} if new_schema is None else new_schema 

33 

34 @property 

35 def json_path_with_name(self) -> list[str | int]: 

36 json_path_with_name = self.json_path 

37 if self.name is not None: 

38 json_path_with_name = self.json_path + [self.name] 

39 

40 return json_path_with_name 

41 

42 @property 

43 def schema_path_with_name(self) -> list[str | int]: 

44 schema_path_with_name = self.schema_path 

45 if self.name is not None: 

46 schema_path_with_name = self.schema_path + [self.name] 

47 

48 return schema_path_with_name 

49 

50 def _get_keys(self, old: dict | None, new: dict | None) -> list[str]: 

51 """ 

52 Детерминированное объединение ключей: 

53 1) все ключи из old в их исходном порядке; 

54 2) затем ключи из new, которых не было в old, в их порядке. 

55 """ 

56 old_keys = list(old.keys()) if isinstance(old, dict) else [] 

57 new_keys = list(new.keys()) if isinstance(new, dict) else [] 

58 seen = set() 

59 merged = [] 

60 for k in old_keys: 

61 if k not in seen: 

62 merged.append(k) 

63 seen.add(k) 

64 for k in new_keys: 

65 if k not in seen: 

66 merged.append(k) 

67 seen.add(k) 

68 return merged 

69 

70 def compare(self) -> None: 

71 if len(self.old_schema) <= 0 and len(self.new_schema) > 0: 

72 self.status = Statuses.ADDED 

73 elif len(self.new_schema) <= 0: # безопасное разрешение конфликта когда пара пустая 

74 self.status = Statuses.DELETED 

75 

76 parameters_subset = {} 

77 keys = self._get_keys(self.old_schema, self.new_schema) 

78 for key in keys: 

79 old_key = key if key in self.old_schema else None 

80 old_value = self.old_schema.get(key, None) 

81 

82 new_key = key if key in self.new_schema else None 

83 new_value = self.new_schema.get(key, None) 

84 

85 if key in self.config.PROPERTY_KEY_GROUPS[dict]: # словари содержащие Property 

86 prop_keys = self._get_keys(old_value, new_value) 

87 for prop_key in prop_keys: 

88 old_to_prop = None if old_value is None else old_value.get(prop_key, None) 

89 new_to_prop = None if new_value is None else new_value.get(prop_key, None) 

90 

91 prop = Property( 

92 config=self.config, 

93 schema_path=self.schema_path_with_name + [key], 

94 json_path=self.json_path_with_name, 

95 name=prop_key, 

96 old_schema=old_to_prop, 

97 new_schema=new_to_prop, 

98 ) 

99 prop.compare() 

100 self.propertys[prop_key] = prop 

101 elif key in self.config.PROPERTY_KEY_GROUPS[list]: # массивы содержащие Property 

102 if not isinstance(old_value, list): 

103 old_value = [old_value] 

104 old_len = len(old_value) 

105 if not isinstance(new_value, list): 

106 new_value = [new_value] 

107 new_len = len(new_value) 

108 

109 for i in range(max(new_len, old_len)): 

110 old_to_prop = None if i >= old_len else old_value[i] 

111 new_to_prop = None if i >= new_len else new_value[i] 

112 

113 prop = Property( 

114 config=self.config, 

115 schema_path=self.schema_path_with_name + [key], 

116 json_path=self.json_path_with_name, 

117 name=i, 

118 old_schema=old_to_prop, 

119 new_schema=new_to_prop, 

120 ) 

121 prop.compare() 

122 self.propertys[i] = prop 

123 else: 

124 parameters_subset[key] = { 

125 "comparator": CompareRules.get_comparator_from_values( 

126 rules=self.config.COMPARE_RULES, 

127 default=Compare, 

128 key=key, 

129 old=old_value, 

130 new=new_value, 

131 ), 

132 "to_compare": ToCompare( 

133 old_key=old_key, 

134 old_value=old_value, 

135 new_key=new_key, 

136 new_value=new_value, 

137 ), 

138 } 

139 

140 result_combine = LogicCombinerHandler.combine( 

141 subset=parameters_subset, 

142 rules=self.config.COMBINE_RULES, 

143 inner_key_field="comparator", 

144 inner_value_field="to_compare", 

145 ) 

146 

147 for values in result_combine.values(): 

148 comparator_cls = values["comparator"] 

149 comparator = comparator_cls( 

150 self.config, 

151 self.schema_path_with_name, 

152 self.json_path_with_name, 

153 values["to_compare"], 

154 ) 

155 

156 comparator.compare() 

157 

158 if comparator.is_for_rendering() and self.status == Statuses.UNKNOWN: 

159 self.status = Statuses.MODIFIED 

160 

161 self.parameters[comparator.get_name()] = comparator 

162 

163 if self.status == Statuses.UNKNOWN: 

164 self.status = Statuses.NO_DIFF 

165 

166 def is_for_rendering(self) -> bool: 

167 return self.status in [ 

168 Statuses.ADDED, 

169 Statuses.DELETED, 

170 Statuses.REPLACED, 

171 Statuses.MODIFIED, 

172 ] 

173 

174 def calc_diff(self) -> dict[str, int]: 

175 """ 

176 Summarizes the difference statistics: 

177 - all parameters (Compare.calc_diff) 

178 - child properties (Property.calc_diff) 

179 - plus the status of the current Property (as a single observation) 

180 """ 

181 stats: dict[str, int] = { 

182 "ADDED": 0, 

183 "DELETED": 0, 

184 "REPLACED": 0, 

185 "MODIFIED": 0, 

186 "NO_DIFF": 0, 

187 "UNKNOWN": 0, 

188 } 

189 # current Property status 

190 stats[self.status.name] += 1 

191 

192 def _merge_stats(dst: dict[str, int], src: dict[str, int]) -> None: 

193 for key, value in src.items(): 

194 dst[key] = dst.get(key, 0) + value 

195 

196 # parameters (Compare) 

197 for cmp in self.parameters.values(): 

198 _merge_stats(stats, cmp.calc_diff()) 

199 

200 # child properties 

201 for prop in self.propertys.values(): 

202 _merge_stats(stats, prop.calc_diff()) 

203 

204 return stats 

205 

206 def get_for_rendering(self) -> list["Compare"]: 

207 # Определение что рендерить 

208 not_for_render = {} 

209 for_render = {} 

210 for param_name, param in self.parameters.items(): 

211 if param.is_for_rendering(): 

212 for_render[param_name] = param 

213 else: 

214 not_for_render[param_name] = param 

215 

216 with_context = RenderContextHandler.resolve( 

217 pair_context_rules=self.config.PAIR_CONTEXT_RULES, 

218 context_rules=self.config.CONTEXT_RULES, 

219 for_render=for_render, 

220 not_for_render=not_for_render, 

221 ) 

222 

223 return list(with_context.values()) 

224 

225 def _make_path_line(self, tab_level: int = 0, to_crop: tuple[int, int] = (0, 0)) -> str: 

226 rendered_path = RT.make_path( 

227 self.schema_path[to_crop[0] :] + [self.name], 

228 self.json_path[to_crop[1] :] + [self.name], 

229 ignore=self.config.PATH_MAKER_IGNORE, 

230 ) 

231 

232 return ( 

233 f"{RT.make_prefix(self.status)} " 

234 f"{RT.make_tab(self.config, tab_level)}" 

235 f"{rendered_path}:" 

236 ) 

237 

238 def self_render( 

239 self, 

240 tab_level: int = 0, 

241 to_crop: tuple[int, int] = (0, 0), 

242 force_multiline: bool = False, 

243 ) -> tuple[str, list[type["Compare"]]]: 

244 # Определение что рендерить 

245 to_render_count = ( 

246 self.get_for_rendering() 

247 if not self.config.ALL_FOR_RENDERING 

248 else list(self.parameters.values()) 

249 ) 

250 

251 # Рендер заголовка / пути 

252 my_to_render = [] 

253 property_line_render = self.name is not None and ( 

254 len(to_render_count) > 1 or force_multiline # or self.status == Statuses.MODIFIED 

255 ) 

256 params_tab_level = tab_level 

257 if property_line_render: 

258 my_to_render.append(self._make_path_line(tab_level, to_crop)) 

259 params_tab_level += 1 

260 

261 # Рендер компараторов 

262 for compare in to_render_count: 

263 my_to_render.append(compare.render(params_tab_level, not property_line_render, to_crop)) 

264 

265 to_render = "\n".join(my_to_render) 

266 

267 compare_list = [] 

268 for compare in to_render_count: 

269 compare_list.append(type(compare)) 

270 

271 return to_render, list(dict.fromkeys([*compare_list])) 

272 

273 def render( 

274 self, 

275 tab_level: int = 0, 

276 _to_crop: tuple[int, int] = (0, 0), # [schema, json] 

277 ) -> tuple[list[str], list[type["Compare"]]]: 

278 to_return: list[str] = [] 

279 compare_list: list[type["Compare"]] = [] 

280 

281 children_for_rendering = [] 

282 for prop in self.propertys.values(): 

283 if prop.is_for_rendering() or self.config.ALL_FOR_RENDERING: 

284 children_for_rendering.append(prop) 

285 

286 if self.config.ALL_FOR_RENDERING or self.is_for_rendering(): 

287 start_line, start_compare = self.self_render( 

288 tab_level=tab_level, 

289 to_crop=_to_crop, 

290 force_multiline=len(children_for_rendering) > 0 and self.config.CROP_PATH, 

291 ) 

292 to_return.append(start_line) 

293 compare_list = list(dict.fromkeys([*compare_list, *start_compare])) 

294 

295 next_to_crop: bool = ( 

296 (len(children_for_rendering) > 0) and self.config.CROP_PATH and self.name is not None 

297 ) 

298 

299 if next_to_crop: 

300 if not (self.config.ALL_FOR_RENDERING or self.is_for_rendering()): 

301 to_return.append(self._make_path_line(tab_level=tab_level, to_crop=_to_crop)) 

302 _to_crop = (len(self.schema_path) + 1, len(self.json_path) + 1) 

303 

304 for prop in self.propertys.values(): 

305 part_lines, part_compare = prop.render( 

306 tab_level=tab_level + (1 if next_to_crop else 0), 

307 _to_crop=_to_crop, 

308 ) 

309 to_return += part_lines 

310 compare_list = list(dict.fromkeys([*compare_list, *part_compare])) 

311 

312 return to_return, compare_list