Coverage for jsonschema_diff/core/property.py: 98%

125 statements  

« prev     ^ index     » next       coverage.py v7.10.5, created at 2025-08-25 07:00 +0000

1from typing import TYPE_CHECKING 

2 

3from .abstraction import Statuses, ToCompare 

4from .parameter_base import Compare 

5from .tools import CompareRules, LogicCombinerHandler, RenderContextHandler 

6from .tools import RenderTool as RT 

7 

8if TYPE_CHECKING: 

9 from .config import Config 

10 

11 

12class Property: 

13 def __init__( 

14 self, 

15 config: "Config", 

16 schema_path: list[str | int], 

17 json_path: list[str | int], 

18 name: str | int | None, 

19 old_schema: dict | None, 

20 new_schema: dict | None, 

21 ): 

22 self.status: Statuses = Statuses.UNKNOWN 

23 self.parameters: dict[str, "Compare"] = {} 

24 self.propertys: dict[str | int, "Property"] = {} 

25 

26 self.config = config 

27 self.name = name 

28 self.schema_path = schema_path 

29 self.json_path = json_path 

30 

31 self.old_schema = {} if old_schema is None else old_schema 

32 self.new_schema = {} if new_schema is None else new_schema 

33 

34 @property 

35 def json_path_with_name(self) -> list[str | int]: 

36 json_path_with_name = self.json_path 

37 if self.name is not None: 

38 json_path_with_name = self.json_path + [self.name] 

39 

40 return json_path_with_name 

41 

42 @property 

43 def schema_path_with_name(self) -> list[str | int]: 

44 schema_path_with_name = self.schema_path 

45 if self.name is not None: 

46 schema_path_with_name = self.schema_path + [self.name] 

47 

48 return schema_path_with_name 

49 

50 def _get_keys(self, old: dict | None, new: dict | None) -> list[str]: 

51 """ 

52 Детерминированное объединение ключей: 

53 1) все ключи из old в их исходном порядке; 

54 2) затем ключи из new, которых не было в old, в их порядке. 

55 """ 

56 old_keys = list(old.keys()) if isinstance(old, dict) else [] 

57 new_keys = list(new.keys()) if isinstance(new, dict) else [] 

58 seen = set() 

59 merged = [] 

60 for k in old_keys: 

61 if k not in seen: 

62 merged.append(k) 

63 seen.add(k) 

64 for k in new_keys: 

65 if k not in seen: 

66 merged.append(k) 

67 seen.add(k) 

68 return merged 

69 

70 def compare(self) -> None: 

71 if len(self.old_schema) <= 0: 

72 self.status = Statuses.ADDED 

73 elif len(self.new_schema) <= 0: 

74 self.status = Statuses.DELETED 

75 

76 parameters_subset = {} 

77 keys = self._get_keys(self.old_schema, self.new_schema) 

78 for key in keys: 

79 old_key = key if key in self.old_schema else None 

80 old_value = self.old_schema.get(key, None) 

81 

82 new_key = key if key in self.new_schema else None 

83 new_value = self.new_schema.get(key, None) 

84 

85 if key in ["properties", "$defs"]: 

86 prop_keys = self._get_keys(old_value, new_value) 

87 for prop_key in prop_keys: 

88 old_to_prop = None if old_value is None else old_value.get(prop_key, None) 

89 new_to_prop = None if new_value is None else new_value.get(prop_key, None) 

90 

91 prop = Property( 

92 config=self.config, 

93 schema_path=self.schema_path_with_name + [key], 

94 json_path=self.json_path_with_name, 

95 name=prop_key, 

96 old_schema=old_to_prop, 

97 new_schema=new_to_prop, 

98 ) 

99 prop.compare() 

100 self.propertys[prop_key] = prop 

101 elif key in ["prefixItems", "items"]: 

102 if not isinstance(old_value, list): 

103 old_value = [old_value] 

104 old_len = len(old_value) 

105 if not isinstance(new_value, list): 

106 new_value = [new_value] 

107 new_len = len(new_value) 

108 

109 for i in range(max(new_len, old_len)): 

110 old_to_prop = None if i >= old_len else old_value[i] 

111 new_to_prop = None if i >= new_len else new_value[i] 

112 

113 prop = Property( 

114 config=self.config, 

115 schema_path=self.schema_path_with_name + [key], 

116 json_path=self.json_path_with_name, 

117 name=i, 

118 old_schema=old_to_prop, 

119 new_schema=new_to_prop, 

120 ) 

121 prop.compare() 

122 self.propertys[i] = prop 

123 else: 

124 parameters_subset[key] = { 

125 "comparator": CompareRules.get_comparator_from_values( 

126 rules=self.config.COMPARE_RULES, 

127 default=Compare, 

128 key=key, 

129 old=old_value, 

130 new=new_value, 

131 ), 

132 "to_compare": ToCompare( 

133 old_key=old_key, 

134 old_value=old_value, 

135 new_key=new_key, 

136 new_value=new_value, 

137 ), 

138 } 

139 

140 result_combine = LogicCombinerHandler.combine( 

141 subset=parameters_subset, 

142 rules=self.config.COMBINE_RULES, 

143 inner_key_field="comparator", 

144 inner_value_field="to_compare", 

145 ) 

146 

147 for key_tuple, values in result_combine.items(): 

148 comparator_cls = values["comparator"] 

149 comparator = comparator_cls( 

150 self.config, 

151 self.schema_path_with_name, 

152 self.json_path_with_name, 

153 values["to_compare"], 

154 ) 

155 

156 comparator.compare() 

157 

158 if comparator.is_for_rendering() and self.status == Statuses.UNKNOWN: 

159 self.status = Statuses.MODIFIED 

160 

161 self.parameters[comparator.get_name()] = comparator 

162 

163 if self.status == Statuses.UNKNOWN: 

164 self.status = Statuses.NO_DIFF 

165 

166 def is_for_rendering(self) -> bool: 

167 return self.status in [ 

168 Statuses.ADDED, 

169 Statuses.DELETED, 

170 Statuses.REPLACED, 

171 Statuses.MODIFIED, 

172 ] 

173 

174 def get_for_rendering(self) -> list["Compare"]: 

175 # Определение что рендерить 

176 not_for_render = {} 

177 for_render = {} 

178 for param_name, param in self.parameters.items(): 

179 if param.is_for_rendering(): 

180 for_render[param_name] = param 

181 else: 

182 not_for_render[param_name] = param 

183 

184 with_context = RenderContextHandler.resolve( 

185 pair_context_rules=self.config.PAIR_CONTEXT_RULES, 

186 context_rules=self.config.CONTEXT_RULES, 

187 for_render=for_render, 

188 not_for_render=not_for_render, 

189 ) 

190 

191 return list(with_context.values()) 

192 

193 def self_render(self, tab_level: int = 0) -> tuple[str, list[type["Compare"]]]: 

194 # Определение что рендерить 

195 to_render_count = self.get_for_rendering() 

196 

197 # Рендер заголовка / пути 

198 my_to_render = [] 

199 property_line_render = self.name is not None and ( 

200 self.status != Statuses.MODIFIED or len(to_render_count) > 1 

201 ) 

202 params_tab_level = tab_level 

203 if property_line_render: 

204 rendered_path = RT.make_path( 

205 self.schema_path + [self.name], 

206 self.json_path + [self.name], 

207 ignore=self.config.PATH_MAKER_IGNORE, 

208 ) 

209 

210 my_to_render.append( 

211 f"{RT.make_prefix(self.status)} " 

212 f"{RT.make_tab(self.config, tab_level)}" 

213 f"{rendered_path}:" 

214 ) 

215 params_tab_level += 1 

216 

217 # Рендер параметров 

218 for param in to_render_count: 

219 my_to_render.append(param.render(params_tab_level, not property_line_render)) 

220 

221 to_render = "\n".join(my_to_render) 

222 

223 compare_list = [] 

224 for compare in to_render_count: 

225 compare_list.append(type(compare)) 

226 

227 return to_render, list(dict.fromkeys([*compare_list])) 

228 

229 def render(self, tab_level: int = 0) -> tuple[list[str], list[type["Compare"]]]: 

230 to_return: list[str] = [] 

231 compare_list: list[type["Compare"]] = [] 

232 

233 if self.is_for_rendering(): 

234 start_line, start_compare = self.self_render(tab_level=tab_level) 

235 to_return.append(start_line) 

236 compare_list = list(dict.fromkeys([*compare_list, *start_compare])) 

237 

238 for prop in self.propertys.values(): 

239 part_lines, part_compare = prop.render(tab_level=tab_level) 

240 to_return += part_lines 

241 compare_list = list(dict.fromkeys([*compare_list, *part_compare])) 

242 

243 return to_return, compare_list