Coverage for genschema / comparators / type.py: 98%
92 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-14 22:23 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-14 22:23 +0000
1from typing import Any
3from .template import Comparator, ComparatorResult, ProcessingContext
6def infer_json_type(v: Any) -> str:
7 if v is None:
8 return "null"
9 if isinstance(v, bool):
10 return "boolean"
11 if isinstance(v, int):
12 return "integer"
13 if isinstance(v, float):
14 return "number"
15 if isinstance(v, str):
16 return "string"
17 if isinstance(v, list):
18 return "array"
19 if isinstance(v, dict):
20 return "object"
21 return "any"
24def infer_schema_type(s: dict | str) -> None | str:
25 if not isinstance(s, dict):
26 return None
27 if "type" in s:
28 t = s["type"]
29 if isinstance(t, str):
30 return t
31 if "properties" in s:
32 return "object"
33 if "items" in s:
34 return "array"
35 return None
38def _unique_keep_order(values: list[str]) -> list[str]:
39 seen: set[str] = set()
40 out: list[str] = []
41 for value in values:
42 if value in seen:
43 continue
44 seen.add(value)
45 out.append(value)
46 return out
49def infer_schema_types(s: dict | str) -> list[str]:
50 """
51 Return all detectable schema types.
52 Unlike infer_schema_type(), this helper can extract types from unions
53 (type list / anyOf / oneOf / allOf).
54 """
56 if not isinstance(s, dict):
57 return []
59 t = s.get("type")
60 if isinstance(t, str):
61 return [t]
62 if isinstance(t, list):
63 return _unique_keep_order([item for item in t if isinstance(item, str)])
65 result: list[str] = []
67 for key in ("anyOf", "oneOf"):
68 variants = s.get(key)
69 if isinstance(variants, list):
70 for variant in variants:
71 result.extend(infer_schema_types(variant))
73 all_of = s.get("allOf")
74 if isinstance(all_of, list):
75 intersections: set[str] | None = None
76 for variant in all_of:
77 types = set(infer_schema_types(variant))
78 if not types:
79 continue
80 intersections = types if intersections is None else (intersections & types)
81 if intersections:
82 result.extend(sorted(intersections))
84 if result:
85 return _unique_keep_order(result)
87 inferred = infer_schema_type(s)
88 if inferred:
89 return [inferred]
90 return []
93class TypeComparator(Comparator):
94 name = "type"
96 def can_process(self, ctx: ProcessingContext, env: str, prev_result: dict) -> bool:
97 return "type" not in prev_result and bool(ctx.schemas or ctx.jsons)
99 def process(self, ctx: ProcessingContext, env: str, prev_result: dict) -> ComparatorResult:
100 type_map: dict[str, set[str]] = {}
102 for s in ctx.schemas:
103 for t in infer_schema_types(s.content):
104 type_map.setdefault(t, set()).add(s.id)
106 for j in ctx.jsons:
107 t = infer_json_type(j.content)
108 type_map.setdefault(t, set()).add(j.id)
110 # Нормализация: number поглощает integer
111 if "number" in type_map and "integer" in type_map:
112 type_map["number"].update(type_map["integer"])
113 del type_map["integer"]
115 if not type_map:
116 return None, None
118 variants: list[dict[str, Any]] = [
119 {"type": t, "j2sElementTrigger": sorted(ids)} for t, ids in type_map.items()
120 ]
122 if ctx.sealed:
123 # cannot create Of inside sealed context — choose first deterministic
124 return variants[0], None
126 if len(variants) == 1:
127 return variants[0], None
129 return None, variants