Coverage for genschema / comparators / required.py: 97%
30 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-25 09:44 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-25 09:44 +0000
1import logging
3from .template import Comparator, ComparatorResult, ProcessingContext
5logger = logging.getLogger(__name__)
8class RequiredComparator(Comparator):
9 """
10 Компаратор для определения обязательных полей.
11 Устанавливает "required" на основе наличия ключей в JSON на текущем уровне.
12 """
14 def can_process(self, ctx: ProcessingContext, env: str, node: dict) -> bool:
15 # обрабатываем только объекты
16 return node.get("type") == "object" and not node.get("isPseudoArray", False)
18 def process(self, ctx: ProcessingContext, env: str, node: dict) -> ComparatorResult:
19 required_sets: list[set[str]] = []
21 # Если есть хотя бы один JSON, который не является объектом,
22 # мы не можем корректно определить обязательные ключи.
23 if ctx.jsons and any(not isinstance(j.content, dict) for j in ctx.jsons):
24 return None, None
26 # ---------- из json ----------
27 objects = [j.content for j in ctx.jsons if isinstance(j.content, dict)]
28 if objects:
29 keys: set[str] = set()
30 for obj in objects:
31 keys.update(obj.keys())
33 required_from_json = {k for k in keys if all(k in obj for obj in objects)}
34 required_sets.append(required_from_json)
36 # ---------- из схем ----------
37 for schema in ctx.schemas:
38 content = schema.content
39 if not isinstance(content, dict):
40 continue
41 req = content.get("required")
42 if isinstance(req, list):
43 required_sets.append(set(req))
45 if not required_sets:
46 return None, None
48 # ---------- минимальное пересечение ----------
49 required = sorted(set.intersection(*required_sets))
51 if required:
52 return {"required": required}, None
53 return None, None