Coverage for genschema / cli.py: 71%
112 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-25 09:44 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-25 09:44 +0000
1import argparse
2import json
3import sys
4import time
6from rich.console import Console
8from . import Converter, PseudoArrayHandler
9from .comparators import (
10 DeleteElement,
11 EmptyComparator,
12 EnumComparator,
13 FormatComparator,
14 RequiredComparator,
15 SchemaVersionComparator,
16)
17from .postprocessing import (
18 SchemaReferenceExtractionConfig,
19 SchemaReferencePostprocessor,
20)
22console = Console()
25def _build_parser() -> argparse.ArgumentParser:
26 parser = argparse.ArgumentParser(
27 description="Generate JSON Schema from JSON input using genschema.",
28 formatter_class=argparse.RawDescriptionHelpFormatter,
29 epilog="""
30Examples:
31 genschema input.json -o schema.json
32 genschema input1.json input2.json --base-of oneOf
33 genschema input.json --extract-refs -o schema.json
34 cat input.json | genschema -
35 genschema --base-of anyOf < input.json
36 genschema dir/file1.json dir/file2.json -o schema.json
37 """,
38 )
39 parser.add_argument(
40 "inputs",
41 nargs="*",
42 help="Paths to input JSON files. Use '-' for stdin. "
43 "If no arguments are provided, show this help message.",
44 )
45 parser.add_argument(
46 "-o",
47 "--output",
48 help="Path to output JSON Schema file. If not specified, output to stdout.",
49 )
50 parser.add_argument(
51 "--base-of",
52 choices=["anyOf", "oneOf"],
53 default="anyOf",
54 help="Combinator for differing types (default: anyOf).",
55 )
56 parser.add_argument(
57 "--no-pseudo-array", action="store_true", help="Disable pseudo-array handling."
58 )
59 parser.add_argument("--no-format", action="store_true", help="Disable FormatComparator.")
60 parser.add_argument("--no-enum", action="store_true", help="Disable EnumComparator.")
61 parser.add_argument("--no-required", action="store_true", help="Disable RequiredComparator.")
62 parser.add_argument("--no-empty", action="store_true", help="Disable EmptyComparator.")
63 parser.add_argument(
64 "--no-schema-version",
65 action="store_true",
66 help="Disable SchemaVersionComparator.",
67 )
68 parser.add_argument(
69 "--no-delete-element", action="store_true", help="Disable DeleteElement comparators."
70 )
71 parser.add_argument(
72 "--extract-refs",
73 action="store_true",
74 help="Run reference-extraction postprocessing and emit shared $defs/$ref blocks.",
75 )
76 parser.add_argument(
77 "--refs-similarity-threshold",
78 type=float,
79 default=0.85,
80 help="Similarity threshold for grouping shared-reference candidates (default: 0.85).",
81 )
82 parser.add_argument(
83 "--refs-min-total-keys",
84 type=int,
85 default=3,
86 help="Minimum total number of structural keys before extraction is applied (default: 3).",
87 )
88 parser.add_argument(
89 "--refs-min-occurrences",
90 type=int,
91 default=2,
92 help="Minimum number of similar occurrences required for extraction (default: 2).",
93 )
94 parser.add_argument(
95 "--refs-defs-key",
96 default="$defs",
97 help="Definition container key used for extracted shared refs (default: $defs).",
98 )
99 return parser
102def main(argv: list[str] | None = None) -> None:
103 parser = _build_parser()
104 raw_args = sys.argv[1:] if argv is None else argv
106 # If no arguments, show help and exit
107 if not raw_args:
108 parser.print_help(sys.stderr)
109 sys.exit(1)
111 args = parser.parse_args(raw_args)
113 # Collect input data
114 datas = []
115 if not args.inputs:
116 # This case shouldn't happen due to the check above, but for safety
117 try:
118 data = json.load(sys.stdin)
119 datas.append(data)
120 except json.JSONDecodeError as e:
121 console.print(f"[red]Error reading JSON from stdin: {e}[/red]")
122 sys.exit(1)
123 else:
124 for input_path in args.inputs:
125 if input_path == "-":
126 try:
127 data = json.load(sys.stdin)
128 datas.append(data)
129 except json.JSONDecodeError as e:
130 console.print(f"[red]Error reading JSON from stdin: {e}[/red]")
131 sys.exit(1)
132 else:
133 try:
134 with open(input_path, "r", encoding="utf-8") as f:
135 data = json.load(f)
136 datas.append(data)
137 except FileNotFoundError:
138 console.print(f"[red]File not found: {input_path}[/red]")
139 sys.exit(1)
140 except json.JSONDecodeError as e:
141 console.print(f"[red]Invalid JSON in file {input_path}: {e}[/red]")
142 sys.exit(1)
144 if not datas:
145 console.print("[red]No valid JSON provided.[/red]")
146 sys.exit(1)
148 # Converter setup
149 pseudo_handler = None if args.no_pseudo_array else PseudoArrayHandler()
150 conv = Converter(pseudo_handler=pseudo_handler, base_of=args.base_of)
152 for data in datas:
153 conv.add_json(data)
155 # Register comparators conditionally
156 if not args.no_format:
157 conv.register(FormatComparator())
158 if not args.no_enum:
159 conv.register(EnumComparator())
160 if not args.no_schema_version:
161 conv.register(SchemaVersionComparator())
162 if not args.no_required:
163 conv.register(RequiredComparator())
164 if not args.no_empty:
165 conv.register(EmptyComparator())
166 if not args.no_delete_element:
167 conv.register(DeleteElement())
168 conv.register(DeleteElement("isPseudoArray"))
170 # Generate schema
171 start_time = time.time()
172 try:
173 result = conv.run()
174 except Exception as e:
175 console.print(f"[red]Error generating schema: {e}[/red]")
176 sys.exit(1)
178 if args.extract_refs:
179 try:
180 refs_config = SchemaReferenceExtractionConfig(
181 similarity_threshold=args.refs_similarity_threshold,
182 min_total_keys=args.refs_min_total_keys,
183 min_occurrences=args.refs_min_occurrences,
184 defs_key=args.refs_defs_key,
185 merge_base_of=args.base_of,
186 merge_pseudo_handler=pseudo_handler,
187 )
188 result = SchemaReferencePostprocessor.process(result, refs_config)
189 except Exception as e:
190 console.print(f"[red]Error extracting schema references: {e}[/red]")
191 sys.exit(1)
193 elapsed = round(time.time() - start_time, 4)
195 # Output result
196 if args.output:
197 try:
198 with open(args.output, "w", encoding="utf-8") as f:
199 json.dump(result, f, indent=2, ensure_ascii=False)
200 console.print(f"[green]Schema successfully written to {args.output}[/green]")
201 except Exception as e:
202 console.print(f"[red]Error writing file {args.output}: {e}[/red]")
203 sys.exit(1)
204 else:
205 console.print(result)
207 # Execution info
208 instances_word = "instance" if len(datas) == 1 else "instances"
209 console.print(f"Generated from {len(datas)} JSON {instances_word}.")
210 if args.extract_refs:
211 defs = result.get(args.refs_defs_key, {})
212 defs_count = len(defs) if isinstance(defs, dict) else 0
213 console.print(f"Extracted {defs_count} shared definitions into {args.refs_defs_key}.")
214 console.print(f"Elapsed time: {elapsed} sec.")
217if __name__ == "__main__":
218 main()