Coverage for genschema / pipeline.py: 86%
207 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-14 22:23 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-14 22:23 +0000
1import json
2import logging
3import re
4from typing import Literal, Optional
6from .comparators import TypeComparator
7from .comparators.template import Comparator, ProcessingContext, Resource, ToDelete
8from .pseudo_arrays import PseudoArrayHandlerBase
10logging.basicConfig(level=logging.ERROR)
11logger = logging.getLogger(__name__)
14class Converter:
15 def __init__(
16 self,
17 pseudo_handler: Optional[PseudoArrayHandlerBase] = None,
18 base_of: Literal["anyOf", "oneOf", "allOf"] = "anyOf",
19 core_comparator: Optional[TypeComparator] = None,
20 ):
21 """
22 Конвертер JSON + JSON Schema структур в JSON Schema.
24 :param pseudo_handler: Обработчик псевдомассивов
25 (большие словари с одинаковым паттерном значений, а ключами являются индефикаторы).
26 :type pseudo_handler: Optional[PseudoArrayHandlerBase]
28 :param base_of: Базовый оператор объединения схем.
29 Логики определения конкретного типа Of индивидуально не предусмотрено.
30 :type base_of: Literal["anyOf", "oneOf", "allOf"]
32 :param core_comparator: Базовый компаратор типов.
33 Он вынесен отдельно,
34 так как type - единственное поле без которого Converter не может построить структуру.
35 :type core_comparator: TypeComparator
36 """
37 self._schemas: list[Resource] = []
38 self._jsons: list[Resource] = []
39 self._comparators: list[Comparator] = []
40 self._core_comparator = core_comparator or TypeComparator()
41 self._id = 0
42 self._pseudo_handler = pseudo_handler
43 self._base_of = base_of
45 def add_schema(self, s: dict | str) -> None:
46 if isinstance(s, str):
47 with open(s, "r") as f:
48 s = json.loads(f.read())
50 self._schemas.append(Resource(str(self._id), "schema", s))
51 self._id += 1
53 def add_json(self, j: dict | list | str) -> None:
54 if isinstance(j, str):
55 with open(j, "r") as f:
56 j = json.loads(f.read())
58 self._jsons.append(Resource(str(self._id), "json", j))
59 self._id += 1
61 def clear_data(self) -> None:
62 self._id = 0
63 self._jsons = []
64 self._schemas = []
66 def register(self, c: Comparator) -> None:
67 if isinstance(c, TypeComparator):
68 raise UserWarning(
69 "A TypeComparator-like comparator must be provided during initialization "
70 "using the core_comparator attribute."
71 )
72 self._comparators.append(c)
74 # ---------------- utils ----------------
76 def _collect_prop_names(self, schemas: list[Resource], jsons: list[Resource]) -> list[str]:
77 names = set()
78 for s in schemas:
79 c = s.content
80 if isinstance(c, dict) and isinstance(c.get("properties"), dict):
81 names.update(c["properties"].keys())
82 for j in jsons:
83 if isinstance(j.content, dict):
84 names.update(j.content.keys())
85 return sorted(names)
87 def _gather_property_candidates(
88 self, schemas: list[Resource], jsons: list[Resource], prop: str
89 ) -> tuple[list[Resource], list[Resource]]:
90 s_out, j_out = [], []
92 for s in schemas:
93 c = s.content
94 if isinstance(c, dict) and prop in c.get("properties", {}):
95 s_out.append(Resource(f"{s.id}/{prop}", "schema", c["properties"][prop]))
97 for j in jsons:
98 if isinstance(j.content, dict) and prop in j.content:
99 j_out.append(Resource(f"{j.id}/{prop}", "json", j.content[prop]))
101 return s_out, j_out
103 def _keys_matched_by_pattern(self, pattern: str, keys: list[str]) -> set[str]:
104 try:
105 regex = re.compile(pattern)
106 except re.error:
107 return set()
108 return {key for key in keys if regex.fullmatch(key)}
110 def _split_array_ctx(
111 self, ctx: ProcessingContext
112 ) -> tuple[ProcessingContext, ProcessingContext]:
113 obj_jsons = []
114 item_jsons = []
116 for j in ctx.jsons:
117 c = j.content
118 if isinstance(c, list):
119 for i, el in enumerate(c):
120 item_jsons.append(Resource(f"{j.id}/{i}", "json", el))
121 elif isinstance(c, dict):
122 keys = self._collect_prop_names([], [j])
123 is_pseudo_array = False
124 if self._pseudo_handler:
125 is_pseudo_array, _ = self._pseudo_handler.is_pseudo_array(keys, ctx)
126 if is_pseudo_array:
127 sorted_keys = sorted(keys, key=lambda k: int(k) if k.isdigit() else -1)
128 for i, k in enumerate(sorted_keys):
129 item_jsons.append(Resource(f"{j.id}/{i}", "json", c[k]))
130 else:
131 obj_jsons.append(j)
132 else:
133 obj_jsons.append(j)
135 obj_schemas = []
136 item_schemas = []
138 for s in ctx.schemas:
139 c = s.content
140 if isinstance(c, dict):
141 t = c.get("type")
142 if t == "array" and "items" in c:
143 item_schemas.append(Resource(f"{s.id}/items", "schema", c["items"]))
144 elif t == "object" and "properties" in c:
145 keys = sorted(c["properties"].keys())
146 is_pseudo_array = False
147 if self._pseudo_handler:
148 is_pseudo_array, _ = self._pseudo_handler.is_pseudo_array(keys, ctx)
149 if is_pseudo_array:
150 sorted_keys = sorted(keys, key=lambda k: int(k) if k.isdigit() else -1)
151 for i, k in enumerate(sorted_keys):
152 item_schemas.append(
153 Resource(f"{s.id}/{i}", "schema", c["properties"][k])
154 )
155 else:
156 obj_schemas.append(s)
157 elif t == "object" and isinstance(c.get("patternProperties"), dict):
158 pattern_props = c["patternProperties"]
159 if not self._pseudo_handler:
160 obj_schemas.append(s)
161 continue
163 keys = self._collect_prop_names([], ctx.jsons)
164 is_pseudo_array, _ = self._pseudo_handler.is_pseudo_array(keys, ctx)
166 # If a branch is not pseudo-array, keep schema on object path.
167 if not is_pseudo_array or not keys:
168 obj_schemas.append(s)
169 continue
171 matched_patterns: list[str] = []
172 covered_keys: set[str] = set()
173 for pattern in pattern_props:
174 matched_keys = self._keys_matched_by_pattern(pattern, keys)
175 if not matched_keys:
176 continue
177 matched_patterns.append(pattern)
178 covered_keys.update(matched_keys)
180 # Branch contains non-pattern keys: do not map patternProperties as items.
181 has_non_pattern_keys = any(key not in covered_keys for key in keys)
182 if has_non_pattern_keys or not matched_patterns:
183 obj_schemas.append(s)
184 continue
186 for index, pattern in enumerate(matched_patterns):
187 item_schemas.append(
188 Resource(
189 f"{s.id}/patternProperties/{index}",
190 "schema",
191 pattern_props[pattern],
192 )
193 )
194 else:
195 obj_schemas.append(s)
196 else:
197 obj_schemas.append(s)
199 return (
200 ProcessingContext(obj_schemas, obj_jsons, ctx.sealed),
201 ProcessingContext(item_schemas, item_jsons, ctx.sealed),
202 )
204 def _filter_ctx_by_ids(self, ctx: ProcessingContext, ids: set) -> ProcessingContext:
205 if not ids:
206 return ctx
207 schemas = [s for s in ctx.schemas if s.id in ids]
208 jsons = [j for j in ctx.jsons if j.id in ids]
209 return ProcessingContext(schemas, jsons, ctx.sealed)
211 # ---------------- core ----------------
213 def _run_level(self, ctx: ProcessingContext, env: str, prev: dict) -> dict:
214 logger.debug("Entering _run_level: env=%s, prev_result=%s", env, prev)
215 node = dict(prev)
217 def use_comp(comp: Comparator) -> bool:
218 if not comp.can_process(ctx, env, node):
219 return False
221 g, alts = comp.process(ctx, env, node)
222 if g:
223 node.update(g)
224 if alts:
225 node.setdefault(self._base_of, []).extend(alts)
226 return True
228 # Вызов базового компаратора
229 use_comp(self._core_comparator)
231 # Определение является ли объект псевдомассивом
232 if node.get("type") == "object":
233 props = self._collect_prop_names(ctx.schemas, ctx.jsons)
234 if self._pseudo_handler:
235 is_pseudo_array, pattern = self._pseudo_handler.is_pseudo_array(props, ctx)
236 node["isPseudoArray"] = is_pseudo_array
237 else:
238 # node["isPseudoArray"] = False
239 is_pseudo_array = False
241 # Вызов остальных компараторов
242 for comp in self._comparators:
243 use_comp(comp)
245 # Удаление атрибутов помеченных на удаление
246 to_delete_keys = []
247 for key, element in node.items():
248 if isinstance(element, ToDelete):
249 to_delete_keys.append(key)
250 for key in to_delete_keys:
251 del node[key]
253 # если есть Of — обработаем каждую альтернативу через _run_level
254 if self._base_of in node:
255 new_of = []
256 for idx, alt in enumerate(node[self._base_of]):
257 alt_ids = set(alt.get("j2sElementTrigger", []))
258 alt_ctx = self._filter_ctx_by_ids(ctx, alt_ids) if alt_ids else ctx
259 processed_alt = self._run_level(alt_ctx, env + f"/{self._base_of}/{idx}", alt)
260 new_of.append(processed_alt)
261 node[self._base_of] = new_of
262 logger.debug(
263 "Exiting _run_level (%s handled): env=%s, node=%s", self._base_of, env, node
264 )
265 return node
267 # recursion based on type
268 if node.get("type") == "object":
269 if is_pseudo_array:
270 node = self._run_pseudo_array(ctx, env, node, str(pattern))
271 else:
272 node = self._run_object(ctx, env, node)
273 elif node.get("type") == "array":
274 node = self._run_array(ctx, env, node)
276 logger.debug("Exiting _run_level: env=%s, node=%s", env, node)
277 return node
279 # ---------------- object ----------------
281 def _run_object(self, ctx: ProcessingContext, env: str, node: dict) -> dict:
282 node = dict(node)
283 node.setdefault("properties", {})
285 props = self._collect_prop_names(ctx.schemas, ctx.jsons)
286 for name in props:
287 s, j = self._gather_property_candidates(ctx.schemas, ctx.jsons, name)
288 sub_ctx = ProcessingContext(s, j, ctx.sealed)
289 node["properties"][name] = self._run_level(
290 sub_ctx, f"{env}/properties/{name}", node["properties"].get(name, {})
291 )
293 if not node["properties"]:
294 node.pop("properties", None)
296 return node
298 # ---------------- pseudo array ----------------
300 def _run_pseudo_array(self, ctx: ProcessingContext, env: str, node: dict, pattern: str) -> dict:
301 node = dict(node)
302 node.setdefault("patternProperties", {})
303 _, items_ctx = self._split_array_ctx(ctx)
304 node["patternProperties"][pattern] = self._run_level(
305 items_ctx, f"{env}/patternProperties/{pattern}", {}
306 )
307 if not node["patternProperties"]:
308 node.pop("patternProperties", None)
309 return node
311 # ---------------- array ----------------
313 def _run_array(self, ctx: ProcessingContext, env: str, node: dict) -> dict:
314 node = dict(node)
315 node.setdefault("items", {})
317 _, items_ctx = self._split_array_ctx(ctx)
318 node["items"] = self._run_level(items_ctx, f"{env}/items", node.get("items", {}))
320 return node
322 # ---------------- entry ----------------
324 def run(self) -> dict:
325 ctx = ProcessingContext(self._schemas, self._jsons, sealed=False)
326 return self._run_level(ctx, "/", {})