Coverage for pytest_jsonschema_snapshot / core.py: 46%

200 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-29 08:31 +0000

1""" 

2Core logic of the plugin. 

3""" 

4 

5import json 

6import logging 

7import shutil 

8from pathlib import Path 

9from typing import TYPE_CHECKING, Any, Callable, Literal, Optional 

10 

11import pathvalidate 

12 

13if TYPE_CHECKING: 

14 from jsonschema_diff import JsonSchemaDiff 

15 

16import pytest 

17from genschema import Converter, PseudoArrayHandler 

18from genschema.comparators import ( 

19 DeleteElement, 

20 EmptyComparator, 

21 EnumComparator, 

22 FormatComparator, 

23 RequiredComparator, 

24 SchemaVersionComparator, 

25) 

26from genschema.postprocessing import ( 

27 SchemaReferenceExtractionConfig, 

28 SchemaReferencePostprocessor, 

29) 

30from jsonschema import FormatChecker, ValidationError, validate 

31 

32from .stats import GLOBAL_STATS 

33from .tools import NameMaker 

34 

35 

36class SchemaShot: 

37 def __init__( 

38 self, 

39 root_dir: Path, 

40 differ: "JsonSchemaDiff", 

41 callable_regex: str = "{class_method=.}", 

42 format_mode: str = "on", 

43 update_mode: bool = False, 

44 ci_cd_mode: bool = False, 

45 reset_mode: bool = False, 

46 update_actions: Optional[dict[str, bool]] = {}, 

47 save_original: bool = False, 

48 debug_mode: bool = False, 

49 snapshot_dir_name: str = "__snapshots__", 

50 ): 

51 """ 

52 Initializes SchemaShot. 

53 

54 Args: 

55 root_dir: Project root directory 

56 update_mode: Update mode (--schema-update) 

57 snapshot_dir_name: Name of the directory for snapshots 

58 """ 

59 self.root_dir: Path = root_dir 

60 self.differ: "JsonSchemaDiff" = differ 

61 self.callable_regex: str = callable_regex 

62 self.format_mode: str = format_mode.lower() 

63 self.ci_cd_mode: bool = ci_cd_mode 

64 # self.examples_limit: int = examples_limit 

65 self.update_mode: bool = update_mode 

66 self.reset_mode: bool = reset_mode 

67 self.update_actions: dict[str, bool] = dict(update_actions or {}) 

68 self.save_original: bool = save_original 

69 self.debug_mode: bool = debug_mode 

70 self.snapshot_dir: Path = root_dir / snapshot_dir_name 

71 self.used_schemas: set[str] = set() 

72 

73 if self.format_mode not in {"on", "safe", "off"}: 

74 raise ValueError( 

75 "Invalid jsss_format_mode value. Expected one of: 'on', 'safe', 'off'." 

76 ) 

77 

78 self.conv = Converter( 

79 pseudo_handler=PseudoArrayHandler(), 

80 base_of="anyOf", 

81 ) 

82 if self._is_format_annotation_enabled(): 

83 self.conv.register(FormatComparator()) 

84 self.conv.register(EnumComparator()) 

85 self.conv.register(RequiredComparator()) 

86 # self.conv.register(EmptyComparator()) 

87 self.conv.register(SchemaVersionComparator()) 

88 self.conv.register(DeleteElement()) 

89 self.conv.register(DeleteElement("isPseudoArray")) 

90 self.reference_extraction_config = SchemaReferenceExtractionConfig( 

91 merge_base_of="anyOf", 

92 merge_pseudo_handler=PseudoArrayHandler(), 

93 merge_comparator_factories=self._make_reference_extraction_comparator_factories(), 

94 ) 

95 

96 self.logger = logging.getLogger(__name__) 

97 # добавляем вывод в stderr 

98 handler = logging.StreamHandler() 

99 handler.setFormatter(logging.Formatter("%(levelname)s %(name)s: %(message)s")) 

100 self.logger.addHandler(handler) 

101 # и поднимаем уровень, чтобы INFO/DEBUG прошли через handler 

102 self.logger.setLevel(logging.INFO) 

103 

104 # ci.cd is only needed in dedicated CI/CD mode. Touching it in every worker 

105 # causes avoidable races under pytest-xdist. 

106 self.snapshot_dir.mkdir(parents=True, exist_ok=True) 

107 if self.ci_cd_mode: 

108 cicd = self.snapshot_dir / "ci.cd" 

109 shutil.rmtree(cicd, ignore_errors=True) 

110 cicd.mkdir(parents=True, exist_ok=True) 

111 

112 def _is_format_annotation_enabled(self) -> bool: 

113 return self.format_mode in {"on", "safe"} 

114 

115 def _is_format_validation_enabled(self) -> bool: 

116 return self.format_mode == "on" 

117 

118 def _make_reference_extraction_comparator_factories(self) -> tuple[Callable[[], Any], ...]: 

119 factories: list[Callable[[], Any]] = [] 

120 if self._is_format_annotation_enabled(): 

121 factories.append(FormatComparator) 

122 factories.extend( 

123 ( 

124 EnumComparator, 

125 RequiredComparator, 

126 EmptyComparator, 

127 DeleteElement, 

128 lambda: DeleteElement("isPseudoArray"), 

129 ) 

130 ) 

131 return tuple(factories) 

132 

133 def _finalize_generated_schema(self, schema: dict[str, Any]) -> dict[str, Any]: 

134 return SchemaReferencePostprocessor.process(schema, self.reference_extraction_config) 

135 

136 def _validate_instance(self, instance: Any, schema: dict[str, Any]) -> None: 

137 validate_kwargs: dict[str, Any] = {} 

138 if self._is_format_validation_enabled(): 

139 validate_kwargs["format_checker"] = FormatChecker() 

140 validate(instance=instance, schema=schema, **validate_kwargs) 

141 

142 def _process_name(self, name: str | int | Callable | list[str | int | Callable]) -> str: 

143 """ 

144 1. Converts callable to string 

145 2. Checks for validity 

146 

147 Returns: 

148 str 

149 Raises: 

150 ValueError 

151 """ 

152 

153 __tracebackhide__ = not self.debug_mode # прячем из стека pytest 

154 

155 def process_name_part(part: str | int | Callable) -> str: 

156 if callable(part): 

157 return NameMaker.format(part, self.callable_regex) 

158 else: 

159 return str(part) 

160 

161 if isinstance(name, (list, tuple)): 

162 name = ".".join([process_name_part(part) for part in name]) 

163 else: 

164 name = process_name_part(name) 

165 

166 if not isinstance(name, str) or not name: 

167 raise ValueError("Schema name must be a non-empty string") 

168 

169 try: 

170 # auto подберёт правила под текущую ОС 

171 pathvalidate.validate_filename( 

172 name, platform="auto" 

173 ) # allow_reserved=False по умолчанию 

174 except pathvalidate.ValidationError as e: 

175 raise ValueError(f"Invalid schema name: {e}") from None 

176 

177 return name 

178 

179 def _save_process_original(self, real_name: str, status: Optional[bool], data: dict) -> None: 

180 json_name = f"{real_name}.json" 

181 schema_name = f"{real_name}.schema.json" 

182 base_j_path = self.snapshot_dir / json_name 

183 base_s_path = self.snapshot_dir / json_name 

184 if not self.ci_cd_mode: 

185 json_path = base_j_path 

186 schema_path = base_s_path 

187 else: 

188 json_path = self.snapshot_dir / "ci.cd" / json_name 

189 schema_path = self.snapshot_dir / "ci.cd" / schema_name 

190 

191 if self.save_original: 

192 available_to_create = ( 

193 (not json_path.exists() or status is None) and not self.ci_cd_mode 

194 ) or (schema_path.exists() and not base_s_path.exists() and self.ci_cd_mode) 

195 available_to_update = (status is True and not self.ci_cd_mode) or ( 

196 schema_path.exists() and base_s_path.exists() and self.ci_cd_mode 

197 ) 

198 

199 if (available_to_create and self.update_actions.get("add")) or ( 

200 available_to_update and self.update_actions.get("update") 

201 ): 

202 with open(json_path, "w", encoding="utf-8") as f: 

203 json.dump(data, f, indent=2, ensure_ascii=False) 

204 

205 if available_to_create: 

206 GLOBAL_STATS.add_created(json_name) 

207 elif available_to_update: 

208 GLOBAL_STATS.add_updated(json_name) 

209 else: 

210 raise ValueError(f"Unexpected status: {status}") 

211 elif not self.ci_cd_mode and json_path.exists() and self.update_actions.get("delete"): 

212 # удаляем 

213 json_path.unlink() 

214 GLOBAL_STATS.add_deleted(json_name) 

215 

216 def assert_json_match( 

217 self, 

218 data: dict, 

219 name: str | int | Callable | list[str | int | Callable], 

220 ) -> Optional[bool]: 

221 """ 

222 Asserts for JSON, converts it to schema and then compares. 

223 

224 Returns: 

225 True – the schema has been updated, 

226 False – the schema has not changed, 

227 None – a new schema has been created. 

228 """ 

229 

230 real_name = self._process_name(name) 

231 

232 real_name, status = self._base_match(data, data, "json", real_name) 

233 

234 if self.update_mode or self.reset_mode or self.ci_cd_mode: 

235 self._save_process_original(real_name=real_name, status=status, data=data) 

236 

237 return status 

238 

239 def assert_schema_match( 

240 self, 

241 schema: dict[str, Any], 

242 name: str | int | Callable | list[str | int | Callable], 

243 *, 

244 data: Optional[dict] = None, 

245 ) -> Optional[bool]: 

246 """ 

247 Accepts a JSON-schema directly and compares it immediately. 

248 

249 Returns: 

250 True – the schema has been updated, 

251 False – the schema has not changed, 

252 None – a new schema has been created. 

253 """ 

254 

255 real_name = self._process_name(name) 

256 

257 real_name, status = self._base_match(data, schema, "schema", real_name) 

258 

259 if self.update_mode and data is not None: 

260 self._save_process_original(real_name=real_name, status=status, data=data) 

261 

262 return status 

263 

264 def _base_match( 

265 self, 

266 data: Optional[dict], 

267 current_data: dict, 

268 type_data: Literal["json", "schema"], 

269 name: str, 

270 ) -> tuple[str, Optional[bool]]: 

271 """ 

272 Checks if data matches the JSON schema, creates/updates it if needed, 

273 and writes statistics to GLOBAL_STATS. 

274 

275 Returns: 

276 True – the schema has been updated, 

277 False – the schema has not changed, 

278 None – a new schema has been created. 

279 """ 

280 __tracebackhide__ = not self.debug_mode # прячем из стека pytest 

281 

282 # Проверка имени 

283 name = self._process_name(name) 

284 

285 base_path = self.snapshot_dir / f"{name}.schema.json" 

286 if not self.ci_cd_mode: 

287 schema_path = base_path 

288 else: 

289 schema_path = self.snapshot_dir / "ci.cd" / f"{name}.schema.json" 

290 self.used_schemas.add(schema_path.name) 

291 

292 # --- состояние ДО проверки --- 

293 schema_exists_before = base_path.exists() 

294 

295 def make_schema(current_data: dict | list, type_data: Literal["json", "schema"]) -> dict: 

296 if type_data == "schema": 

297 return dict(current_data) 

298 elif type_data == "json": 

299 self.conv.clear_data() 

300 self.conv.add_json(current_data) 

301 return self._finalize_generated_schema(self.conv.run()) 

302 else: 

303 raise ValueError("Not correct type argument") 

304 

305 # --- когда схемы ещё нет --- 

306 if not schema_exists_before: 

307 if not self.update_mode and not self.reset_mode and not self.ci_cd_mode: 

308 raise pytest.fail.Exception( 

309 f"Schema `{name}` not found." 

310 "Run the test with the --schema-update option to create it." 

311 ) 

312 elif not self.update_actions.get("add"): 

313 raise pytest.fail.Exception( 

314 f"Schema `{name}` not found and adding new schemas is disabled." 

315 ) 

316 

317 current_schema = make_schema(current_data, type_data) 

318 

319 with open(schema_path, "w", encoding="utf-8") as f: 

320 json.dump(current_schema, f, indent=2, ensure_ascii=False) 

321 

322 self.logger.info(f"New schema `{name}` has been created.") 

323 GLOBAL_STATS.add_created(schema_path.name) # статистика «создана» 

324 return name, None 

325 else: 

326 with open(base_path, "r", encoding="utf-8") as f: 

327 existing_schema = json.load(f) 

328 

329 # --- схема уже была: сравнение и валидация -------------------------------- 

330 schema_updated = False 

331 

332 def merge_schemas( 

333 old: dict, new: dict | list, type_data: Literal["json", "schema"] 

334 ) -> dict: 

335 self.conv.clear_data() 

336 self.conv.add_schema(old) 

337 if type_data == "schema": 

338 self.conv.add_schema(dict(new)) 

339 elif type_data == "json": 

340 self.conv.add_json(new) 

341 else: 

342 raise ValueError("Not correct type argument") 

343 result = self.conv.run() 

344 if type_data == "json": 

345 result = self._finalize_generated_schema(result) 

346 return result 

347 

348 if ( 

349 type_data == "json" or existing_schema != current_data 

350 ): # есть отличия или могут быть 

351 if ( 

352 self.update_mode or self.ci_cd_mode or self.reset_mode 

353 ) and self.update_actions.get("update"): 

354 # обновляем файл 

355 if self.reset_mode and not self.update_mode and not self.ci_cd_mode: 

356 current_schema = make_schema(current_data, type_data) 

357 

358 if existing_schema != current_schema: 

359 differences = self.differ.compare( 

360 dict(existing_schema), current_schema 

361 ).render() 

362 GLOBAL_STATS.add_updated(schema_path.name, differences) 

363 

364 with open(schema_path, "w", encoding="utf-8") as f: 

365 json.dump(current_schema, f, indent=2, ensure_ascii=False) 

366 self.logger.warning(f"Schema `{name}` reseted.\n\n{differences}") 

367 elif self.update_mode or self.ci_cd_mode and not self.reset_mode: 

368 merged_schema = merge_schemas(existing_schema, current_data, type_data) 

369 

370 if existing_schema != merged_schema: 

371 differences = self.differ.compare( 

372 dict(existing_schema), merged_schema 

373 ).render() 

374 GLOBAL_STATS.add_updated(schema_path.name, differences) 

375 

376 with open(schema_path, "w", encoding="utf-8") as f: 

377 json.dump(merged_schema, f, indent=2, ensure_ascii=False) 

378 

379 self.logger.warning(f"Schema `{name}` updated.\n\n{differences}") 

380 else: # both update_mode and reset_mode are True 

381 raise ValueError( 

382 "update_mode, ci_cd_mode and reset_mode" 

383 " cannot be True at the same time." 

384 ) 

385 schema_updated = True 

386 elif data is not None: 

387 merged_schema = merge_schemas(existing_schema, current_data, type_data) 

388 

389 differences = "" 

390 if existing_schema != merged_schema: 

391 differences = self.differ.compare( 

392 dict(existing_schema), merged_schema 

393 ).render() 

394 GLOBAL_STATS.add_uncommitted(schema_path.name, differences) 

395 

396 # только валидируем по старой схеме 

397 try: 

398 self._validate_instance(instance=data, schema=existing_schema) 

399 except ValidationError as e: 

400 pytest.fail( 

401 f"\n\n{differences}\n\nValidation error in `{name}`: {e.message}" 

402 ) 

403 elif data is not None and type_data == "schema": 

404 # схемы совпали – всё равно валидируем на случай формальных ошибок 

405 try: 

406 self._validate_instance(instance=data, schema=existing_schema) 

407 except ValidationError as e: 

408 merged_schema = merge_schemas(existing_schema, current_data, type_data) 

409 

410 differences = "" 

411 if existing_schema != merged_schema: 

412 differences = self.differ.compare( 

413 dict(existing_schema), merged_schema 

414 ).render() 

415 pytest.fail(f"\n\n{differences}\n\nValidation error in `{name}`: {e.message}") 

416 

417 return name, schema_updated