Coverage for genschema / comparators / preserve_common_keywords.py: 93%
28 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-25 09:44 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-25 09:44 +0000
1from __future__ import annotations
3import copy
4from dataclasses import dataclass, field
6from .template import Comparator, ComparatorResult, ProcessingContext
8DEFAULT_MERGE_OWNED_KEYWORDS = {
9 "$defs",
10 "$ref",
11 "$schema",
12 "allOf",
13 "anyOf",
14 "contains",
15 "else",
16 "enum",
17 "format",
18 "if",
19 "isPseudoArray",
20 "items",
21 "j2sElementTrigger",
22 "not",
23 "oneOf",
24 "patternProperties",
25 "prefixItems",
26 "properties",
27 "propertyNames",
28 "required",
29 "then",
30 "type",
31 "unevaluatedItems",
32 "unevaluatedProperties",
33}
36@dataclass
37class PreserveCommonKeywordsComparator(Comparator):
38 """
39 Restores shared schema-only keywords that the main merge pipeline does not
40 rebuild on its own.
42 The comparator is intended to run last in a chain. It inspects only input
43 schema fragments from ``ctx.schemas`` and copies back identical keys that:
45 - are present in every schema at the current level
46 - are equal across all those schemas
47 - are still absent from the merged node
49 Structural keywords that belong to the merge itself (``type``,
50 ``properties``, ``required``, ``anyOf``, etc.) are intentionally excluded.
51 """
53 name = "preserve-common-keywords"
55 excluded_keywords: set[str] = field(default_factory=lambda: set(DEFAULT_MERGE_OWNED_KEYWORDS))
57 def can_process(self, ctx: ProcessingContext, env: str, node: dict) -> bool:
58 return any(isinstance(schema.content, dict) for schema in ctx.schemas)
60 def process(self, ctx: ProcessingContext, env: str, node: dict) -> ComparatorResult:
61 schema_dicts = [
62 schema.content for schema in ctx.schemas if isinstance(schema.content, dict)
63 ]
64 if not schema_dicts:
65 return None, None
67 shared_keys = set(schema_dicts[0].keys())
68 for schema in schema_dicts[1:]:
69 shared_keys &= set(schema.keys())
71 if not shared_keys:
72 return None, None
74 updates: dict = {}
75 for key in sorted(shared_keys):
76 if key in self.excluded_keywords or key in node:
77 continue
79 reference_value = schema_dicts[0][key]
80 if all(schema.get(key) == reference_value for schema in schema_dicts[1:]):
81 updates[key] = copy.deepcopy(reference_value)
83 return (updates or None), None