Coverage for genschema / comparators / enum.py: 97%

109 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-25 09:44 +0000

1"""Enum inference comparator. 

2 

3This module contains :class:`EnumComparator`, a comparator that promotes 

4low-cardinality string fields to JSON Schema ``enum`` definitions. 

5It is designed to work with mixed input sources: 

6 

7- raw JSON instances added via :meth:`genschema.pipeline.Converter.add_json` 

8- existing JSON Schemas added via :meth:`genschema.pipeline.Converter.add_schema` 

9 

10The comparator is intentionally conservative. If a field looks unsafe for enum 

11inference, it stores a reject flag directly in the generated schema so the same 

12field will not be reconsidered as an enum candidate on future runs. 

13""" 

14 

15from __future__ import annotations 

16 

17import re 

18from dataclasses import dataclass, field 

19from typing import Any 

20 

21from .template import Comparator, ComparatorResult, ProcessingContext 

22 

23ENUM_REJECT_FLAG = "j2sEnumRejected" 

24NUMERIC_LIKE_STRING_RE = re.compile(r"^[+-]?(?:\d+|\d+\.\d+|\d+\.|\.\d+)$") 

25 

26 

27@dataclass 

28class EnumComparator(Comparator): 

29 """Infer ``enum`` for compact string fields and persist rejection decisions. 

30 

31 Integer support is intentionally excluded. In practice it is very hard to 

32 build a reliable heuristic that consistently distinguishes real numeric 

33 enums from ordinary identifiers, counters, years, status codes, and other 

34 non-enum integer fields. A false positive here is much more damaging than a 

35 missed enum, so the comparator only handles strings. 

36 """ 

37 

38 name = "enum" 

39 

40 max_unique_values: int = 16 

41 """Maximum number of distinct values allowed for enum inference.""" 

42 

43 max_avg_string_length: int = 20 

44 """Maximum average length of unique string values before the field is treated as free text.""" 

45 

46 excluded_field_names: set[str] = field( 

47 default_factory=lambda: { 

48 "name", 

49 "title", 

50 "description", 

51 "message", 

52 "text", 

53 } 

54 ) 

55 """Field names that must be excluded from enum inference.""" 

56 

57 reject_flag: str = ENUM_REJECT_FLAG 

58 """Schema flag that persists enum rejection across repeated runs.""" 

59 

60 def _extract_field_name(self, env: str) -> str | None: 

61 """Extract the current property name from a pipeline path. 

62 

63 Parameters 

64 ---------- 

65 env: 

66 Internal path used by the converter, for example 

67 ``"/properties/status"`` or 

68 ``"/properties/meta/properties/status"``. 

69 

70 Returns 

71 ------- 

72 str | None 

73 The innermost property name for ``/properties/...`` paths, or 

74 ``None`` when the current node is not a named object property. 

75 """ 

76 marker = "/properties/" 

77 if marker not in env: 

78 return None 

79 return env.rsplit(marker, 1)[-1].split("/", 1)[0] 

80 

81 def _schema_type_matches(self, schema: Any, expected_type: str) -> bool: 

82 """Return ``True`` when a schema node explicitly matches the target type.""" 

83 return isinstance(schema, dict) and schema.get("type") == expected_type 

84 

85 def _collect_schema_values(self, ctx: ProcessingContext) -> list[str]: 

86 """Collect candidate enum values from input schemas. 

87 

88 Only explicit schema enums from nodes whose ``type`` is ``"string"`` 

89 are considered. 

90 """ 

91 values: list[str] = [] 

92 for schema in ctx.schemas: 

93 content = schema.content 

94 if not self._schema_type_matches(content, "string"): 

95 continue 

96 enum_values = content.get("enum") 

97 if not isinstance(enum_values, list): 

98 continue 

99 for value in enum_values: 

100 if isinstance(value, str): 

101 values.append(value) 

102 return values 

103 

104 def _collect_json_values(self, ctx: ProcessingContext) -> list[str]: 

105 """Collect candidate enum values from raw JSON resources.""" 

106 values: list[str] = [] 

107 for resource in ctx.jsons: 

108 value = resource.content 

109 if isinstance(value, str): 

110 values.append(value) 

111 return values 

112 

113 def _has_blank_string_value(self, values: list[str]) -> bool: 

114 """Return ``True`` when string candidates contain blank values.""" 

115 return any(isinstance(value, str) and value.strip() == "" for value in values) 

116 

117 def _has_digit_only_string_value(self, values: list[str]) -> bool: 

118 """Return ``True`` when string candidates contain digit-only values.""" 

119 return any(value.isdigit() for value in values) 

120 

121 def _has_float_like_string_value(self, values: list[str]) -> bool: 

122 """Return ``True`` when string candidates contain float-like values.""" 

123 return any(NUMERIC_LIKE_STRING_RE.fullmatch(value) is not None for value in values) 

124 

125 def _has_schema_flag(self, ctx: ProcessingContext, flag_name: str) -> bool: 

126 """Check whether any input schema already contains the reject flag.""" 

127 for schema in ctx.schemas: 

128 if isinstance(schema.content, dict) and schema.content.get(flag_name) is True: 

129 return True 

130 return False 

131 

132 def _first_schema_format(self, ctx: ProcessingContext) -> str | None: 

133 """Return the first explicit schema format declared for the current node.""" 

134 for schema in ctx.schemas: 

135 content = schema.content 

136 if not isinstance(content, dict): 

137 continue 

138 format_value = content.get("format") 

139 if isinstance(format_value, str): 

140 return format_value 

141 return None 

142 

143 def _reject(self, extra: dict[str, Any] | None = None) -> ComparatorResult: 

144 """Build a rejection result that persists the enum reject flag.""" 

145 result: dict[str, Any | bool] = {self.reject_flag: True} 

146 if extra: 

147 result.update(extra) 

148 return result, None 

149 

150 def can_process(self, ctx: ProcessingContext | None, env: str, prev_result: dict) -> bool: 

151 """Decide whether enum inference should run for the current node. 

152 

153 The comparator only participates for scalar nodes that still do not have 

154 a final enum decision. Excluded field names are intentionally *not* 

155 filtered here, because they still need to be processed in order to write 

156 the persistent reject flag. 

157 """ 

158 current_type = prev_result.get("type") 

159 if current_type != "string": 

160 return False 

161 if prev_result.get(self.reject_flag) is True: 

162 return False 

163 if "enum" in prev_result: 

164 return False 

165 if "format" in prev_result: 

166 return False 

167 if any(key in prev_result for key in ("anyOf", "oneOf", "allOf")): 

168 return False 

169 

170 return True 

171 

172 def process(self, ctx: ProcessingContext, env: str, prev_result: dict) -> ComparatorResult: 

173 """Infer enum values or persist a rejection marker. 

174 

175 The method merges candidate values from schema enums and JSON payloads, 

176 deduplicates them while preserving order, and applies the configured 

177 heuristics. If the field is rejected, the existing enum is effectively 

178 removed because the returned update contains only the reject marker and 

179 omits ``enum``. 

180 """ 

181 schema_format = self._first_schema_format(ctx) 

182 

183 if schema_format is not None: 

184 return self._reject({"format": schema_format}) 

185 

186 if self._has_schema_flag(ctx, self.reject_flag): 

187 return self._reject() 

188 

189 field_name = self._extract_field_name(env) 

190 if field_name in self.excluded_field_names: 

191 return self._reject() 

192 

193 values = self._collect_schema_values(ctx) 

194 values.extend(self._collect_json_values(ctx)) 

195 

196 if not values: 

197 return None, None 

198 

199 if self._has_blank_string_value(values): 

200 return self._reject() 

201 if self._has_digit_only_string_value(values): 

202 return self._reject() 

203 if self._has_float_like_string_value(values): 

204 return self._reject() 

205 

206 unique_values = list(dict.fromkeys(values)) 

207 if len(unique_values) > self.max_unique_values: 

208 return self._reject() 

209 

210 avg_length = sum(len(value) for value in unique_values) / len(unique_values) 

211 if avg_length > self.max_avg_string_length: 

212 return self._reject() 

213 

214 return {"enum": unique_values}, None