Coverage for jsonschema_diff/core/property.py: 90%
147 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-15 18:01 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-15 18:01 +0000
1from typing import TYPE_CHECKING
3from .abstraction import Statuses, ToCompare
4from .compare_base import Compare
5from .tools import CompareRules, LogicCombinerHandler, RenderContextHandler
6from .tools import RenderTool as RT
8if TYPE_CHECKING:
9 from .config import Config
12class Property:
13 def __init__(
14 self,
15 config: "Config",
16 schema_path: list[str | int],
17 json_path: list[str | int],
18 name: str | int | None,
19 old_schema: dict | None,
20 new_schema: dict | None,
21 ):
22 self.status: Statuses = Statuses.UNKNOWN
23 self.parameters: dict[str, "Compare"] = {}
24 self.propertys: dict[str | int, "Property"] = {}
26 self.config = config
27 self.name = name
28 self.schema_path = schema_path
29 self.json_path = json_path
31 self.old_schema = {} if old_schema is None else old_schema
32 self.new_schema = {} if new_schema is None else new_schema
34 @property
35 def json_path_with_name(self) -> list[str | int]:
36 json_path_with_name = self.json_path
37 if self.name is not None:
38 json_path_with_name = self.json_path + [self.name]
40 return json_path_with_name
42 @property
43 def schema_path_with_name(self) -> list[str | int]:
44 schema_path_with_name = self.schema_path
45 if self.name is not None:
46 schema_path_with_name = self.schema_path + [self.name]
48 return schema_path_with_name
50 def _get_keys(self, old: dict | None, new: dict | None) -> list[str]:
51 """
52 Детерминированное объединение ключей:
53 1) все ключи из old в их исходном порядке;
54 2) затем ключи из new, которых не было в old, в их порядке.
55 """
56 old_keys = list(old.keys()) if isinstance(old, dict) else []
57 new_keys = list(new.keys()) if isinstance(new, dict) else []
58 seen = set()
59 merged = []
60 for k in old_keys:
61 if k not in seen:
62 merged.append(k)
63 seen.add(k)
64 for k in new_keys:
65 if k not in seen:
66 merged.append(k)
67 seen.add(k)
68 return merged
70 def compare(self) -> None:
71 if len(self.old_schema) <= 0 and len(self.new_schema) > 0:
72 self.status = Statuses.ADDED
73 elif len(self.new_schema) <= 0: # безопасное разрешение конфликта когда пара пустая
74 self.status = Statuses.DELETED
76 parameters_subset = {}
77 keys = self._get_keys(self.old_schema, self.new_schema)
78 for key in keys:
79 old_key = key if key in self.old_schema else None
80 old_value = self.old_schema.get(key, None)
82 new_key = key if key in self.new_schema else None
83 new_value = self.new_schema.get(key, None)
85 if key in self.config.PROPERTY_KEY_GROUPS[dict]: # словари содержащие Property
86 prop_keys = self._get_keys(old_value, new_value)
87 for prop_key in prop_keys:
88 old_to_prop = None if old_value is None else old_value.get(prop_key, None)
89 new_to_prop = None if new_value is None else new_value.get(prop_key, None)
91 prop = Property(
92 config=self.config,
93 schema_path=self.schema_path_with_name + [key],
94 json_path=self.json_path_with_name,
95 name=prop_key,
96 old_schema=old_to_prop,
97 new_schema=new_to_prop,
98 )
99 prop.compare()
100 self.propertys[prop_key] = prop
101 elif key in self.config.PROPERTY_KEY_GROUPS[list]: # массивы содержащие Property
102 if not isinstance(old_value, list):
103 old_value = [old_value]
104 old_len = len(old_value)
105 if not isinstance(new_value, list):
106 new_value = [new_value]
107 new_len = len(new_value)
109 for i in range(max(new_len, old_len)):
110 old_to_prop = None if i >= old_len else old_value[i]
111 new_to_prop = None if i >= new_len else new_value[i]
113 prop = Property(
114 config=self.config,
115 schema_path=self.schema_path_with_name + [key],
116 json_path=self.json_path_with_name,
117 name=i,
118 old_schema=old_to_prop,
119 new_schema=new_to_prop,
120 )
121 prop.compare()
122 self.propertys[i] = prop
123 else:
124 parameters_subset[key] = {
125 "comparator": CompareRules.get_comparator_from_values(
126 rules=self.config.COMPARE_RULES,
127 default=Compare,
128 key=key,
129 old=old_value,
130 new=new_value,
131 ),
132 "to_compare": ToCompare(
133 old_key=old_key,
134 old_value=old_value,
135 new_key=new_key,
136 new_value=new_value,
137 ),
138 }
140 result_combine = LogicCombinerHandler.combine(
141 subset=parameters_subset,
142 rules=self.config.COMBINE_RULES,
143 inner_key_field="comparator",
144 inner_value_field="to_compare",
145 )
147 for values in result_combine.values():
148 comparator_cls = values["comparator"]
149 comparator = comparator_cls(
150 self.config,
151 self.schema_path_with_name,
152 self.json_path_with_name,
153 values["to_compare"],
154 )
156 comparator.compare()
158 if comparator.is_for_rendering() and self.status == Statuses.UNKNOWN:
159 self.status = Statuses.MODIFIED
161 self.parameters[comparator.get_name()] = comparator
163 if self.status == Statuses.UNKNOWN:
164 self.status = Statuses.NO_DIFF
166 def is_for_rendering(self) -> bool:
167 return self.status in [
168 Statuses.ADDED,
169 Statuses.DELETED,
170 Statuses.REPLACED,
171 Statuses.MODIFIED,
172 ]
174 def calc_diff(self) -> dict[str, int]:
175 """
176 Summarizes the difference statistics:
177 - all parameters (Compare.calc_diff)
178 - child properties (Property.calc_diff)
179 - plus the status of the current Property (as a single observation)
180 """
181 stats: dict[str, int] = {
182 "ADDED": 0,
183 "DELETED": 0,
184 "REPLACED": 0,
185 "MODIFIED": 0,
186 "NO_DIFF": 0,
187 "UNKNOWN": 0,
188 }
189 # current Property status
190 stats[self.status.name] += 1
192 def _merge_stats(dst: dict[str, int], src: dict[str, int]) -> None:
193 for key, value in src.items():
194 dst[key] = dst.get(key, 0) + value
196 # parameters (Compare)
197 for cmp in self.parameters.values():
198 _merge_stats(stats, cmp.calc_diff())
200 # child properties
201 for prop in self.propertys.values():
202 _merge_stats(stats, prop.calc_diff())
204 return stats
206 def get_for_rendering(self) -> list["Compare"]:
207 # Определение что рендерить
208 not_for_render = {}
209 for_render = {}
210 for param_name, param in self.parameters.items():
211 if param.is_for_rendering():
212 for_render[param_name] = param
213 else:
214 not_for_render[param_name] = param
216 with_context = RenderContextHandler.resolve(
217 pair_context_rules=self.config.PAIR_CONTEXT_RULES,
218 context_rules=self.config.CONTEXT_RULES,
219 for_render=for_render,
220 not_for_render=not_for_render,
221 )
223 return list(with_context.values())
225 def _make_path_line(self, tab_level: int = 0, to_crop: tuple[int, int] = (0, 0)) -> str:
226 rendered_path = RT.make_path(
227 self.schema_path[to_crop[0] :] + [self.name],
228 self.json_path[to_crop[1] :] + [self.name],
229 ignore=self.config.PATH_MAKER_IGNORE,
230 )
232 return (
233 f"{RT.make_prefix(self.status)} "
234 f"{RT.make_tab(self.config, tab_level)}"
235 f"{rendered_path}:"
236 )
238 def self_render(
239 self,
240 tab_level: int = 0,
241 to_crop: tuple[int, int] = (0, 0),
242 force_multiline: bool = False,
243 ) -> tuple[str, list[type["Compare"]]]:
244 # Определение что рендерить
245 to_render_count = (
246 self.get_for_rendering()
247 if not self.config.ALL_FOR_RENDERING
248 else list(self.parameters.values())
249 )
251 # Рендер заголовка / пути
252 my_to_render = []
253 property_line_render = self.name is not None and (
254 len(to_render_count) > 1 or force_multiline # or self.status == Statuses.MODIFIED
255 )
256 params_tab_level = tab_level
257 if property_line_render:
258 my_to_render.append(self._make_path_line(tab_level, to_crop))
259 params_tab_level += 1
261 # Рендер компараторов
262 for compare in to_render_count:
263 my_to_render.append(compare.render(params_tab_level, not property_line_render, to_crop))
265 to_render = "\n".join(my_to_render)
267 compare_list = []
268 for compare in to_render_count:
269 compare_list.append(type(compare))
271 return to_render, list(dict.fromkeys([*compare_list]))
273 def render(
274 self,
275 tab_level: int = 0,
276 _to_crop: tuple[int, int] = (0, 0), # [schema, json]
277 ) -> tuple[list[str], list[type["Compare"]]]:
278 to_return: list[str] = []
279 compare_list: list[type["Compare"]] = []
281 children_for_rendering = []
282 for prop in self.propertys.values():
283 if prop.is_for_rendering() or self.config.ALL_FOR_RENDERING:
284 children_for_rendering.append(prop)
286 if self.config.ALL_FOR_RENDERING or self.is_for_rendering():
287 start_line, start_compare = self.self_render(
288 tab_level=tab_level,
289 to_crop=_to_crop,
290 force_multiline=len(children_for_rendering) > 0 and self.config.CROP_PATH,
291 )
292 to_return.append(start_line)
293 compare_list = list(dict.fromkeys([*compare_list, *start_compare]))
295 next_to_crop: bool = (
296 (len(children_for_rendering) > 0) and self.config.CROP_PATH and self.name is not None
297 )
299 if next_to_crop:
300 if not (self.config.ALL_FOR_RENDERING or self.is_for_rendering()):
301 to_return.append(self._make_path_line(tab_level=tab_level, to_crop=_to_crop))
302 _to_crop = (len(self.schema_path) + 1, len(self.json_path) + 1)
304 for prop in self.propertys.values():
305 part_lines, part_compare = prop.render(
306 tab_level=tab_level + (1 if next_to_crop else 0),
307 _to_crop=_to_crop,
308 )
309 to_return += part_lines
310 compare_list = list(dict.fromkeys([*compare_list, *part_compare]))
312 return to_return, compare_list