Coverage for genschema / comparators / enum.py: 97%
109 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-25 09:44 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-25 09:44 +0000
1"""Enum inference comparator.
3This module contains :class:`EnumComparator`, a comparator that promotes
4low-cardinality string fields to JSON Schema ``enum`` definitions.
5It is designed to work with mixed input sources:
7- raw JSON instances added via :meth:`genschema.pipeline.Converter.add_json`
8- existing JSON Schemas added via :meth:`genschema.pipeline.Converter.add_schema`
10The comparator is intentionally conservative. If a field looks unsafe for enum
11inference, it stores a reject flag directly in the generated schema so the same
12field will not be reconsidered as an enum candidate on future runs.
13"""
15from __future__ import annotations
17import re
18from dataclasses import dataclass, field
19from typing import Any
21from .template import Comparator, ComparatorResult, ProcessingContext
23ENUM_REJECT_FLAG = "j2sEnumRejected"
24NUMERIC_LIKE_STRING_RE = re.compile(r"^[+-]?(?:\d+|\d+\.\d+|\d+\.|\.\d+)$")
27@dataclass
28class EnumComparator(Comparator):
29 """Infer ``enum`` for compact string fields and persist rejection decisions.
31 Integer support is intentionally excluded. In practice it is very hard to
32 build a reliable heuristic that consistently distinguishes real numeric
33 enums from ordinary identifiers, counters, years, status codes, and other
34 non-enum integer fields. A false positive here is much more damaging than a
35 missed enum, so the comparator only handles strings.
36 """
38 name = "enum"
40 max_unique_values: int = 16
41 """Maximum number of distinct values allowed for enum inference."""
43 max_avg_string_length: int = 20
44 """Maximum average length of unique string values before the field is treated as free text."""
46 excluded_field_names: set[str] = field(
47 default_factory=lambda: {
48 "name",
49 "title",
50 "description",
51 "message",
52 "text",
53 }
54 )
55 """Field names that must be excluded from enum inference."""
57 reject_flag: str = ENUM_REJECT_FLAG
58 """Schema flag that persists enum rejection across repeated runs."""
60 def _extract_field_name(self, env: str) -> str | None:
61 """Extract the current property name from a pipeline path.
63 Parameters
64 ----------
65 env:
66 Internal path used by the converter, for example
67 ``"/properties/status"`` or
68 ``"/properties/meta/properties/status"``.
70 Returns
71 -------
72 str | None
73 The innermost property name for ``/properties/...`` paths, or
74 ``None`` when the current node is not a named object property.
75 """
76 marker = "/properties/"
77 if marker not in env:
78 return None
79 return env.rsplit(marker, 1)[-1].split("/", 1)[0]
81 def _schema_type_matches(self, schema: Any, expected_type: str) -> bool:
82 """Return ``True`` when a schema node explicitly matches the target type."""
83 return isinstance(schema, dict) and schema.get("type") == expected_type
85 def _collect_schema_values(self, ctx: ProcessingContext) -> list[str]:
86 """Collect candidate enum values from input schemas.
88 Only explicit schema enums from nodes whose ``type`` is ``"string"``
89 are considered.
90 """
91 values: list[str] = []
92 for schema in ctx.schemas:
93 content = schema.content
94 if not self._schema_type_matches(content, "string"):
95 continue
96 enum_values = content.get("enum")
97 if not isinstance(enum_values, list):
98 continue
99 for value in enum_values:
100 if isinstance(value, str):
101 values.append(value)
102 return values
104 def _collect_json_values(self, ctx: ProcessingContext) -> list[str]:
105 """Collect candidate enum values from raw JSON resources."""
106 values: list[str] = []
107 for resource in ctx.jsons:
108 value = resource.content
109 if isinstance(value, str):
110 values.append(value)
111 return values
113 def _has_blank_string_value(self, values: list[str]) -> bool:
114 """Return ``True`` when string candidates contain blank values."""
115 return any(isinstance(value, str) and value.strip() == "" for value in values)
117 def _has_digit_only_string_value(self, values: list[str]) -> bool:
118 """Return ``True`` when string candidates contain digit-only values."""
119 return any(value.isdigit() for value in values)
121 def _has_float_like_string_value(self, values: list[str]) -> bool:
122 """Return ``True`` when string candidates contain float-like values."""
123 return any(NUMERIC_LIKE_STRING_RE.fullmatch(value) is not None for value in values)
125 def _has_schema_flag(self, ctx: ProcessingContext, flag_name: str) -> bool:
126 """Check whether any input schema already contains the reject flag."""
127 for schema in ctx.schemas:
128 if isinstance(schema.content, dict) and schema.content.get(flag_name) is True:
129 return True
130 return False
132 def _first_schema_format(self, ctx: ProcessingContext) -> str | None:
133 """Return the first explicit schema format declared for the current node."""
134 for schema in ctx.schemas:
135 content = schema.content
136 if not isinstance(content, dict):
137 continue
138 format_value = content.get("format")
139 if isinstance(format_value, str):
140 return format_value
141 return None
143 def _reject(self, extra: dict[str, Any] | None = None) -> ComparatorResult:
144 """Build a rejection result that persists the enum reject flag."""
145 result: dict[str, Any | bool] = {self.reject_flag: True}
146 if extra:
147 result.update(extra)
148 return result, None
150 def can_process(self, ctx: ProcessingContext | None, env: str, prev_result: dict) -> bool:
151 """Decide whether enum inference should run for the current node.
153 The comparator only participates for scalar nodes that still do not have
154 a final enum decision. Excluded field names are intentionally *not*
155 filtered here, because they still need to be processed in order to write
156 the persistent reject flag.
157 """
158 current_type = prev_result.get("type")
159 if current_type != "string":
160 return False
161 if prev_result.get(self.reject_flag) is True:
162 return False
163 if "enum" in prev_result:
164 return False
165 if "format" in prev_result:
166 return False
167 if any(key in prev_result for key in ("anyOf", "oneOf", "allOf")):
168 return False
170 return True
172 def process(self, ctx: ProcessingContext, env: str, prev_result: dict) -> ComparatorResult:
173 """Infer enum values or persist a rejection marker.
175 The method merges candidate values from schema enums and JSON payloads,
176 deduplicates them while preserving order, and applies the configured
177 heuristics. If the field is rejected, the existing enum is effectively
178 removed because the returned update contains only the reject marker and
179 omits ``enum``.
180 """
181 schema_format = self._first_schema_format(ctx)
183 if schema_format is not None:
184 return self._reject({"format": schema_format})
186 if self._has_schema_flag(ctx, self.reject_flag):
187 return self._reject()
189 field_name = self._extract_field_name(env)
190 if field_name in self.excluded_field_names:
191 return self._reject()
193 values = self._collect_schema_values(ctx)
194 values.extend(self._collect_json_values(ctx))
196 if not values:
197 return None, None
199 if self._has_blank_string_value(values):
200 return self._reject()
201 if self._has_digit_only_string_value(values):
202 return self._reject()
203 if self._has_float_like_string_value(values):
204 return self._reject()
206 unique_values = list(dict.fromkeys(values))
207 if len(unique_values) > self.max_unique_values:
208 return self._reject()
210 avg_length = sum(len(value) for value in unique_values) / len(unique_values)
211 if avg_length > self.max_avg_string_length:
212 return self._reject()
214 return {"enum": unique_values}, None