Coverage for genschema / comparators / type.py: 98%

92 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-14 22:23 +0000

1from typing import Any 

2 

3from .template import Comparator, ComparatorResult, ProcessingContext 

4 

5 

6def infer_json_type(v: Any) -> str: 

7 if v is None: 

8 return "null" 

9 if isinstance(v, bool): 

10 return "boolean" 

11 if isinstance(v, int): 

12 return "integer" 

13 if isinstance(v, float): 

14 return "number" 

15 if isinstance(v, str): 

16 return "string" 

17 if isinstance(v, list): 

18 return "array" 

19 if isinstance(v, dict): 

20 return "object" 

21 return "any" 

22 

23 

24def infer_schema_type(s: dict | str) -> None | str: 

25 if not isinstance(s, dict): 

26 return None 

27 if "type" in s: 

28 t = s["type"] 

29 if isinstance(t, str): 

30 return t 

31 if "properties" in s: 

32 return "object" 

33 if "items" in s: 

34 return "array" 

35 return None 

36 

37 

38def _unique_keep_order(values: list[str]) -> list[str]: 

39 seen: set[str] = set() 

40 out: list[str] = [] 

41 for value in values: 

42 if value in seen: 

43 continue 

44 seen.add(value) 

45 out.append(value) 

46 return out 

47 

48 

49def infer_schema_types(s: dict | str) -> list[str]: 

50 """ 

51 Return all detectable schema types. 

52 Unlike infer_schema_type(), this helper can extract types from unions 

53 (type list / anyOf / oneOf / allOf). 

54 """ 

55 

56 if not isinstance(s, dict): 

57 return [] 

58 

59 t = s.get("type") 

60 if isinstance(t, str): 

61 return [t] 

62 if isinstance(t, list): 

63 return _unique_keep_order([item for item in t if isinstance(item, str)]) 

64 

65 result: list[str] = [] 

66 

67 for key in ("anyOf", "oneOf"): 

68 variants = s.get(key) 

69 if isinstance(variants, list): 

70 for variant in variants: 

71 result.extend(infer_schema_types(variant)) 

72 

73 all_of = s.get("allOf") 

74 if isinstance(all_of, list): 

75 intersections: set[str] | None = None 

76 for variant in all_of: 

77 types = set(infer_schema_types(variant)) 

78 if not types: 

79 continue 

80 intersections = types if intersections is None else (intersections & types) 

81 if intersections: 

82 result.extend(sorted(intersections)) 

83 

84 if result: 

85 return _unique_keep_order(result) 

86 

87 inferred = infer_schema_type(s) 

88 if inferred: 

89 return [inferred] 

90 return [] 

91 

92 

93class TypeComparator(Comparator): 

94 name = "type" 

95 

96 def can_process(self, ctx: ProcessingContext, env: str, prev_result: dict) -> bool: 

97 return "type" not in prev_result and bool(ctx.schemas or ctx.jsons) 

98 

99 def process(self, ctx: ProcessingContext, env: str, prev_result: dict) -> ComparatorResult: 

100 type_map: dict[str, set[str]] = {} 

101 

102 for s in ctx.schemas: 

103 for t in infer_schema_types(s.content): 

104 type_map.setdefault(t, set()).add(s.id) 

105 

106 for j in ctx.jsons: 

107 t = infer_json_type(j.content) 

108 type_map.setdefault(t, set()).add(j.id) 

109 

110 # Нормализация: number поглощает integer 

111 if "number" in type_map and "integer" in type_map: 

112 type_map["number"].update(type_map["integer"]) 

113 del type_map["integer"] 

114 

115 if not type_map: 

116 return None, None 

117 

118 variants: list[dict[str, Any]] = [ 

119 {"type": t, "j2sElementTrigger": sorted(ids)} for t, ids in type_map.items() 

120 ] 

121 

122 if ctx.sealed: 

123 # cannot create Of inside sealed context — choose first deterministic 

124 return variants[0], None 

125 

126 if len(variants) == 1: 

127 return variants[0], None 

128 

129 return None, variants