Coverage for jsonschema_diff/core/property.py: 98%
125 statements
« prev ^ index » next coverage.py v7.10.5, created at 2025-08-25 07:00 +0000
« prev ^ index » next coverage.py v7.10.5, created at 2025-08-25 07:00 +0000
1from typing import TYPE_CHECKING
3from .abstraction import Statuses, ToCompare
4from .parameter_base import Compare
5from .tools import CompareRules, LogicCombinerHandler, RenderContextHandler
6from .tools import RenderTool as RT
8if TYPE_CHECKING:
9 from .config import Config
12class Property:
13 def __init__(
14 self,
15 config: "Config",
16 schema_path: list[str | int],
17 json_path: list[str | int],
18 name: str | int | None,
19 old_schema: dict | None,
20 new_schema: dict | None,
21 ):
22 self.status: Statuses = Statuses.UNKNOWN
23 self.parameters: dict[str, "Compare"] = {}
24 self.propertys: dict[str | int, "Property"] = {}
26 self.config = config
27 self.name = name
28 self.schema_path = schema_path
29 self.json_path = json_path
31 self.old_schema = {} if old_schema is None else old_schema
32 self.new_schema = {} if new_schema is None else new_schema
34 @property
35 def json_path_with_name(self) -> list[str | int]:
36 json_path_with_name = self.json_path
37 if self.name is not None:
38 json_path_with_name = self.json_path + [self.name]
40 return json_path_with_name
42 @property
43 def schema_path_with_name(self) -> list[str | int]:
44 schema_path_with_name = self.schema_path
45 if self.name is not None:
46 schema_path_with_name = self.schema_path + [self.name]
48 return schema_path_with_name
50 def _get_keys(self, old: dict | None, new: dict | None) -> list[str]:
51 """
52 Детерминированное объединение ключей:
53 1) все ключи из old в их исходном порядке;
54 2) затем ключи из new, которых не было в old, в их порядке.
55 """
56 old_keys = list(old.keys()) if isinstance(old, dict) else []
57 new_keys = list(new.keys()) if isinstance(new, dict) else []
58 seen = set()
59 merged = []
60 for k in old_keys:
61 if k not in seen:
62 merged.append(k)
63 seen.add(k)
64 for k in new_keys:
65 if k not in seen:
66 merged.append(k)
67 seen.add(k)
68 return merged
70 def compare(self) -> None:
71 if len(self.old_schema) <= 0:
72 self.status = Statuses.ADDED
73 elif len(self.new_schema) <= 0:
74 self.status = Statuses.DELETED
76 parameters_subset = {}
77 keys = self._get_keys(self.old_schema, self.new_schema)
78 for key in keys:
79 old_key = key if key in self.old_schema else None
80 old_value = self.old_schema.get(key, None)
82 new_key = key if key in self.new_schema else None
83 new_value = self.new_schema.get(key, None)
85 if key in ["properties", "$defs"]:
86 prop_keys = self._get_keys(old_value, new_value)
87 for prop_key in prop_keys:
88 old_to_prop = None if old_value is None else old_value.get(prop_key, None)
89 new_to_prop = None if new_value is None else new_value.get(prop_key, None)
91 prop = Property(
92 config=self.config,
93 schema_path=self.schema_path_with_name + [key],
94 json_path=self.json_path_with_name,
95 name=prop_key,
96 old_schema=old_to_prop,
97 new_schema=new_to_prop,
98 )
99 prop.compare()
100 self.propertys[prop_key] = prop
101 elif key in ["prefixItems", "items"]:
102 if not isinstance(old_value, list):
103 old_value = [old_value]
104 old_len = len(old_value)
105 if not isinstance(new_value, list):
106 new_value = [new_value]
107 new_len = len(new_value)
109 for i in range(max(new_len, old_len)):
110 old_to_prop = None if i >= old_len else old_value[i]
111 new_to_prop = None if i >= new_len else new_value[i]
113 prop = Property(
114 config=self.config,
115 schema_path=self.schema_path_with_name + [key],
116 json_path=self.json_path_with_name,
117 name=i,
118 old_schema=old_to_prop,
119 new_schema=new_to_prop,
120 )
121 prop.compare()
122 self.propertys[i] = prop
123 else:
124 parameters_subset[key] = {
125 "comparator": CompareRules.get_comparator_from_values(
126 rules=self.config.COMPARE_RULES,
127 default=Compare,
128 key=key,
129 old=old_value,
130 new=new_value,
131 ),
132 "to_compare": ToCompare(
133 old_key=old_key,
134 old_value=old_value,
135 new_key=new_key,
136 new_value=new_value,
137 ),
138 }
140 result_combine = LogicCombinerHandler.combine(
141 subset=parameters_subset,
142 rules=self.config.COMBINE_RULES,
143 inner_key_field="comparator",
144 inner_value_field="to_compare",
145 )
147 for key_tuple, values in result_combine.items():
148 comparator_cls = values["comparator"]
149 comparator = comparator_cls(
150 self.config,
151 self.schema_path_with_name,
152 self.json_path_with_name,
153 values["to_compare"],
154 )
156 comparator.compare()
158 if comparator.is_for_rendering() and self.status == Statuses.UNKNOWN:
159 self.status = Statuses.MODIFIED
161 self.parameters[comparator.get_name()] = comparator
163 if self.status == Statuses.UNKNOWN:
164 self.status = Statuses.NO_DIFF
166 def is_for_rendering(self) -> bool:
167 return self.status in [
168 Statuses.ADDED,
169 Statuses.DELETED,
170 Statuses.REPLACED,
171 Statuses.MODIFIED,
172 ]
174 def get_for_rendering(self) -> list["Compare"]:
175 # Определение что рендерить
176 not_for_render = {}
177 for_render = {}
178 for param_name, param in self.parameters.items():
179 if param.is_for_rendering():
180 for_render[param_name] = param
181 else:
182 not_for_render[param_name] = param
184 with_context = RenderContextHandler.resolve(
185 pair_context_rules=self.config.PAIR_CONTEXT_RULES,
186 context_rules=self.config.CONTEXT_RULES,
187 for_render=for_render,
188 not_for_render=not_for_render,
189 )
191 return list(with_context.values())
193 def self_render(self, tab_level: int = 0) -> tuple[str, list[type["Compare"]]]:
194 # Определение что рендерить
195 to_render_count = self.get_for_rendering()
197 # Рендер заголовка / пути
198 my_to_render = []
199 property_line_render = self.name is not None and (
200 self.status != Statuses.MODIFIED or len(to_render_count) > 1
201 )
202 params_tab_level = tab_level
203 if property_line_render:
204 rendered_path = RT.make_path(
205 self.schema_path + [self.name],
206 self.json_path + [self.name],
207 ignore=self.config.PATH_MAKER_IGNORE,
208 )
210 my_to_render.append(
211 f"{RT.make_prefix(self.status)} "
212 f"{RT.make_tab(self.config, tab_level)}"
213 f"{rendered_path}:"
214 )
215 params_tab_level += 1
217 # Рендер параметров
218 for param in to_render_count:
219 my_to_render.append(param.render(params_tab_level, not property_line_render))
221 to_render = "\n".join(my_to_render)
223 compare_list = []
224 for compare in to_render_count:
225 compare_list.append(type(compare))
227 return to_render, list(dict.fromkeys([*compare_list]))
229 def render(self, tab_level: int = 0) -> tuple[list[str], list[type["Compare"]]]:
230 to_return: list[str] = []
231 compare_list: list[type["Compare"]] = []
233 if self.is_for_rendering():
234 start_line, start_compare = self.self_render(tab_level=tab_level)
235 to_return.append(start_line)
236 compare_list = list(dict.fromkeys([*compare_list, *start_compare]))
238 for prop in self.propertys.values():
239 part_lines, part_compare = prop.render(tab_level=tab_level)
240 to_return += part_lines
241 compare_list = list(dict.fromkeys([*compare_list, *part_compare]))
243 return to_return, compare_list