Coverage for genschema / cli.py: 71%

112 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-25 09:44 +0000

1import argparse 

2import json 

3import sys 

4import time 

5 

6from rich.console import Console 

7 

8from . import Converter, PseudoArrayHandler 

9from .comparators import ( 

10 DeleteElement, 

11 EmptyComparator, 

12 EnumComparator, 

13 FormatComparator, 

14 RequiredComparator, 

15 SchemaVersionComparator, 

16) 

17from .postprocessing import ( 

18 SchemaReferenceExtractionConfig, 

19 SchemaReferencePostprocessor, 

20) 

21 

22console = Console() 

23 

24 

25def _build_parser() -> argparse.ArgumentParser: 

26 parser = argparse.ArgumentParser( 

27 description="Generate JSON Schema from JSON input using genschema.", 

28 formatter_class=argparse.RawDescriptionHelpFormatter, 

29 epilog=""" 

30Examples: 

31 genschema input.json -o schema.json 

32 genschema input1.json input2.json --base-of oneOf 

33 genschema input.json --extract-refs -o schema.json 

34 cat input.json | genschema - 

35 genschema --base-of anyOf < input.json 

36 genschema dir/file1.json dir/file2.json -o schema.json 

37 """, 

38 ) 

39 parser.add_argument( 

40 "inputs", 

41 nargs="*", 

42 help="Paths to input JSON files. Use '-' for stdin. " 

43 "If no arguments are provided, show this help message.", 

44 ) 

45 parser.add_argument( 

46 "-o", 

47 "--output", 

48 help="Path to output JSON Schema file. If not specified, output to stdout.", 

49 ) 

50 parser.add_argument( 

51 "--base-of", 

52 choices=["anyOf", "oneOf"], 

53 default="anyOf", 

54 help="Combinator for differing types (default: anyOf).", 

55 ) 

56 parser.add_argument( 

57 "--no-pseudo-array", action="store_true", help="Disable pseudo-array handling." 

58 ) 

59 parser.add_argument("--no-format", action="store_true", help="Disable FormatComparator.") 

60 parser.add_argument("--no-enum", action="store_true", help="Disable EnumComparator.") 

61 parser.add_argument("--no-required", action="store_true", help="Disable RequiredComparator.") 

62 parser.add_argument("--no-empty", action="store_true", help="Disable EmptyComparator.") 

63 parser.add_argument( 

64 "--no-schema-version", 

65 action="store_true", 

66 help="Disable SchemaVersionComparator.", 

67 ) 

68 parser.add_argument( 

69 "--no-delete-element", action="store_true", help="Disable DeleteElement comparators." 

70 ) 

71 parser.add_argument( 

72 "--extract-refs", 

73 action="store_true", 

74 help="Run reference-extraction postprocessing and emit shared $defs/$ref blocks.", 

75 ) 

76 parser.add_argument( 

77 "--refs-similarity-threshold", 

78 type=float, 

79 default=0.85, 

80 help="Similarity threshold for grouping shared-reference candidates (default: 0.85).", 

81 ) 

82 parser.add_argument( 

83 "--refs-min-total-keys", 

84 type=int, 

85 default=3, 

86 help="Minimum total number of structural keys before extraction is applied (default: 3).", 

87 ) 

88 parser.add_argument( 

89 "--refs-min-occurrences", 

90 type=int, 

91 default=2, 

92 help="Minimum number of similar occurrences required for extraction (default: 2).", 

93 ) 

94 parser.add_argument( 

95 "--refs-defs-key", 

96 default="$defs", 

97 help="Definition container key used for extracted shared refs (default: $defs).", 

98 ) 

99 return parser 

100 

101 

102def main(argv: list[str] | None = None) -> None: 

103 parser = _build_parser() 

104 raw_args = sys.argv[1:] if argv is None else argv 

105 

106 # If no arguments, show help and exit 

107 if not raw_args: 

108 parser.print_help(sys.stderr) 

109 sys.exit(1) 

110 

111 args = parser.parse_args(raw_args) 

112 

113 # Collect input data 

114 datas = [] 

115 if not args.inputs: 

116 # This case shouldn't happen due to the check above, but for safety 

117 try: 

118 data = json.load(sys.stdin) 

119 datas.append(data) 

120 except json.JSONDecodeError as e: 

121 console.print(f"[red]Error reading JSON from stdin: {e}[/red]") 

122 sys.exit(1) 

123 else: 

124 for input_path in args.inputs: 

125 if input_path == "-": 

126 try: 

127 data = json.load(sys.stdin) 

128 datas.append(data) 

129 except json.JSONDecodeError as e: 

130 console.print(f"[red]Error reading JSON from stdin: {e}[/red]") 

131 sys.exit(1) 

132 else: 

133 try: 

134 with open(input_path, "r", encoding="utf-8") as f: 

135 data = json.load(f) 

136 datas.append(data) 

137 except FileNotFoundError: 

138 console.print(f"[red]File not found: {input_path}[/red]") 

139 sys.exit(1) 

140 except json.JSONDecodeError as e: 

141 console.print(f"[red]Invalid JSON in file {input_path}: {e}[/red]") 

142 sys.exit(1) 

143 

144 if not datas: 

145 console.print("[red]No valid JSON provided.[/red]") 

146 sys.exit(1) 

147 

148 # Converter setup 

149 pseudo_handler = None if args.no_pseudo_array else PseudoArrayHandler() 

150 conv = Converter(pseudo_handler=pseudo_handler, base_of=args.base_of) 

151 

152 for data in datas: 

153 conv.add_json(data) 

154 

155 # Register comparators conditionally 

156 if not args.no_format: 

157 conv.register(FormatComparator()) 

158 if not args.no_enum: 

159 conv.register(EnumComparator()) 

160 if not args.no_schema_version: 

161 conv.register(SchemaVersionComparator()) 

162 if not args.no_required: 

163 conv.register(RequiredComparator()) 

164 if not args.no_empty: 

165 conv.register(EmptyComparator()) 

166 if not args.no_delete_element: 

167 conv.register(DeleteElement()) 

168 conv.register(DeleteElement("isPseudoArray")) 

169 

170 # Generate schema 

171 start_time = time.time() 

172 try: 

173 result = conv.run() 

174 except Exception as e: 

175 console.print(f"[red]Error generating schema: {e}[/red]") 

176 sys.exit(1) 

177 

178 if args.extract_refs: 

179 try: 

180 refs_config = SchemaReferenceExtractionConfig( 

181 similarity_threshold=args.refs_similarity_threshold, 

182 min_total_keys=args.refs_min_total_keys, 

183 min_occurrences=args.refs_min_occurrences, 

184 defs_key=args.refs_defs_key, 

185 merge_base_of=args.base_of, 

186 merge_pseudo_handler=pseudo_handler, 

187 ) 

188 result = SchemaReferencePostprocessor.process(result, refs_config) 

189 except Exception as e: 

190 console.print(f"[red]Error extracting schema references: {e}[/red]") 

191 sys.exit(1) 

192 

193 elapsed = round(time.time() - start_time, 4) 

194 

195 # Output result 

196 if args.output: 

197 try: 

198 with open(args.output, "w", encoding="utf-8") as f: 

199 json.dump(result, f, indent=2, ensure_ascii=False) 

200 console.print(f"[green]Schema successfully written to {args.output}[/green]") 

201 except Exception as e: 

202 console.print(f"[red]Error writing file {args.output}: {e}[/red]") 

203 sys.exit(1) 

204 else: 

205 console.print(result) 

206 

207 # Execution info 

208 instances_word = "instance" if len(datas) == 1 else "instances" 

209 console.print(f"Generated from {len(datas)} JSON {instances_word}.") 

210 if args.extract_refs: 

211 defs = result.get(args.refs_defs_key, {}) 

212 defs_count = len(defs) if isinstance(defs, dict) else 0 

213 console.print(f"Extracted {defs_count} shared definitions into {args.refs_defs_key}.") 

214 console.print(f"Elapsed time: {elapsed} sec.") 

215 

216 

217if __name__ == "__main__": 

218 main()