Coverage for genschema / comparators / required.py: 97%
30 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-14 22:23 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-14 22:23 +0000
1import logging
3from .template import Comparator, ComparatorResult, ProcessingContext
5logger = logging.getLogger(__name__)
8class RequiredComparator(Comparator):
9 """
10 Компаратор для определения обязательных полей.
11 Устанавливает "required" на основе наличия ключей в JSON на текущем уровне.
12 """
14 def can_process(self, ctx: ProcessingContext, env: str, node: dict) -> bool:
15 # обрабатываем только объекты
16 return (
17 (node.get("type") == "object" and not node.get("isPseudoArray", False))
18 or node.get("type") is None
19 or not ctx.jsons
20 )
22 def process(self, ctx: ProcessingContext, env: str, node: dict) -> ComparatorResult:
23 required_sets: list[set[str]] = []
25 # Если есть хотя бы один JSON, который не является объектом,
26 # мы не можем корректно определить обязательные ключи.
27 if ctx.jsons and any(not isinstance(j.content, dict) for j in ctx.jsons):
28 return None, None
30 # ---------- из json ----------
31 objects = [j.content for j in ctx.jsons if isinstance(j.content, dict)]
32 if objects:
33 keys: set[str] = set()
34 for obj in objects:
35 keys.update(obj.keys())
37 required_from_json = {k for k in keys if all(k in obj for obj in objects)}
38 required_sets.append(required_from_json)
40 # ---------- из схем ----------
41 for schema in ctx.schemas:
42 content = schema.content
43 if not isinstance(content, dict):
44 continue
45 req = content.get("required")
46 if isinstance(req, list):
47 required_sets.append(set(req))
49 if not required_sets:
50 return None, None
52 # ---------- минимальное пересечение ----------
53 required = sorted(set.intersection(*required_sets))
55 if required:
56 return {"required": required}, None
57 return None, None