Coverage for pytest_jsonschema_snapshot / core.py: 42%

174 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-08 01:59 +0000

1""" 

2Core logic of the plugin. 

3""" 

4 

5import json 

6import logging 

7import shutil 

8from pathlib import Path 

9from typing import TYPE_CHECKING, Any, Callable, Literal, Optional 

10 

11import pathvalidate 

12 

13if TYPE_CHECKING: 

14 from jsonschema_diff import JsonSchemaDiff 

15 

16import pytest 

17from genschema import Converter, PseudoArrayHandler 

18from genschema.comparators import ( 

19 DeleteElement, 

20 FormatComparator, 

21 RequiredComparator, 

22 SchemaVersionComparator, 

23) 

24from jsonschema import FormatChecker, ValidationError, validate 

25 

26from .stats import GLOBAL_STATS 

27from .tools import NameMaker 

28 

29 

30class SchemaShot: 

31 def __init__( 

32 self, 

33 root_dir: Path, 

34 differ: "JsonSchemaDiff", 

35 callable_regex: str = "{class_method=.}", 

36 format_mode: str = "on", 

37 update_mode: bool = False, 

38 ci_cd_mode: bool = False, 

39 reset_mode: bool = False, 

40 update_actions: Optional[dict[str, bool]] = {}, 

41 save_original: bool = False, 

42 debug_mode: bool = False, 

43 snapshot_dir_name: str = "__snapshots__", 

44 ): 

45 """ 

46 Initializes SchemaShot. 

47 

48 Args: 

49 root_dir: Project root directory 

50 update_mode: Update mode (--schema-update) 

51 snapshot_dir_name: Name of the directory for snapshots 

52 """ 

53 self.root_dir: Path = root_dir 

54 self.differ: "JsonSchemaDiff" = differ 

55 self.callable_regex: str = callable_regex 

56 self.format_mode: str = format_mode 

57 self.ci_cd_mode: bool = ci_cd_mode 

58 # self.examples_limit: int = examples_limit 

59 self.update_mode: bool = update_mode 

60 self.reset_mode: bool = reset_mode 

61 self.update_actions: dict[str, bool] = dict(update_actions or {}) 

62 self.save_original: bool = save_original 

63 self.debug_mode: bool = debug_mode 

64 self.snapshot_dir: Path = root_dir / snapshot_dir_name 

65 self.used_schemas: set[str] = set() 

66 

67 self.conv = Converter( 

68 pseudo_handler=PseudoArrayHandler(), 

69 base_of="anyOf", 

70 ) 

71 self.conv.register(FormatComparator()) 

72 self.conv.register(RequiredComparator()) 

73 # self.conv.register(EmptyComparator()) 

74 self.conv.register(SchemaVersionComparator()) 

75 self.conv.register(DeleteElement()) 

76 self.conv.register(DeleteElement("isPseudoArray")) 

77 

78 self.logger = logging.getLogger(__name__) 

79 # добавляем вывод в stderr 

80 handler = logging.StreamHandler() 

81 handler.setFormatter(logging.Formatter("%(levelname)s %(name)s: %(message)s")) 

82 self.logger.addHandler(handler) 

83 # и поднимаем уровень, чтобы INFO/DEBUG прошли через handler 

84 self.logger.setLevel(logging.INFO) 

85 

86 # Создаем директорию для снэпшотов, если её нет 

87 if not self.snapshot_dir.exists(): 

88 self.snapshot_dir.mkdir(parents=True) 

89 cicd = self.snapshot_dir / "ci.cd" 

90 if cicd.exists(): 

91 shutil.rmtree(cicd) 

92 cicd.mkdir(parents=True) 

93 

94 def _process_name(self, name: str | int | Callable | list[str | int | Callable]) -> str: 

95 """ 

96 1. Converts callable to string 

97 2. Checks for validity 

98 

99 Returns: 

100 str 

101 Raises: 

102 ValueError 

103 """ 

104 

105 __tracebackhide__ = not self.debug_mode # прячем из стека pytest 

106 

107 def process_name_part(part: str | int | Callable) -> str: 

108 if callable(part): 

109 return NameMaker.format(part, self.callable_regex) 

110 else: 

111 return str(part) 

112 

113 if isinstance(name, (list, tuple)): 

114 name = ".".join([process_name_part(part) for part in name]) 

115 else: 

116 name = process_name_part(name) 

117 

118 if not isinstance(name, str) or not name: 

119 raise ValueError("Schema name must be a non-empty string") 

120 

121 try: 

122 # auto подберёт правила под текущую ОС 

123 pathvalidate.validate_filename( 

124 name, platform="auto" 

125 ) # allow_reserved=False по умолчанию 

126 except pathvalidate.ValidationError as e: 

127 raise ValueError(f"Invalid schema name: {e}") from None 

128 

129 return name 

130 

131 def _save_process_original(self, real_name: str, status: Optional[bool], data: dict) -> None: 

132 json_name = f"{real_name}.json" 

133 schema_name = f"{real_name}.schema.json" 

134 base_j_path = self.snapshot_dir / json_name 

135 base_s_path = self.snapshot_dir / json_name 

136 if not self.ci_cd_mode: 

137 json_path = base_j_path 

138 schema_path = base_s_path 

139 else: 

140 json_path = self.snapshot_dir / "ci.cd" / json_name 

141 schema_path = self.snapshot_dir / "ci.cd" / schema_name 

142 

143 if self.save_original: 

144 available_to_create = ( 

145 (not json_path.exists() or status is None) and not self.ci_cd_mode 

146 ) or (schema_path.exists() and not base_s_path.exists() and self.ci_cd_mode) 

147 available_to_update = (status is True and not self.ci_cd_mode) or ( 

148 schema_path.exists() and base_s_path.exists() and self.ci_cd_mode 

149 ) 

150 

151 if (available_to_create and self.update_actions.get("add")) or ( 

152 available_to_update and self.update_actions.get("update") 

153 ): 

154 with open(json_path, "w", encoding="utf-8") as f: 

155 json.dump(data, f, indent=2, ensure_ascii=False) 

156 

157 if available_to_create: 

158 GLOBAL_STATS.add_created(json_name) 

159 elif available_to_update: 

160 GLOBAL_STATS.add_updated(json_name) 

161 else: 

162 raise ValueError(f"Unexpected status: {status}") 

163 elif not self.ci_cd_mode and json_path.exists() and self.update_actions.get("delete"): 

164 # удаляем 

165 json_path.unlink() 

166 GLOBAL_STATS.add_deleted(json_name) 

167 

168 def assert_json_match( 

169 self, 

170 data: dict, 

171 name: str | int | Callable | list[str | int | Callable], 

172 ) -> Optional[bool]: 

173 """ 

174 Asserts for JSON, converts it to schema and then compares. 

175 

176 Returns: 

177 True – the schema has been updated, 

178 False – the schema has not changed, 

179 None – a new schema has been created. 

180 """ 

181 

182 real_name = self._process_name(name) 

183 

184 real_name, status = self._base_match(data, data, "json", real_name) 

185 

186 if self.update_mode or self.reset_mode or self.ci_cd_mode: 

187 self._save_process_original(real_name=real_name, status=status, data=data) 

188 

189 return status 

190 

191 def assert_schema_match( 

192 self, 

193 schema: dict[str, Any], 

194 name: str | int | Callable | list[str | int | Callable], 

195 *, 

196 data: Optional[dict] = None, 

197 ) -> Optional[bool]: 

198 """ 

199 Accepts a JSON-schema directly and compares it immediately. 

200 

201 Returns: 

202 True – the schema has been updated, 

203 False – the schema has not changed, 

204 None – a new schema has been created. 

205 """ 

206 

207 real_name = self._process_name(name) 

208 

209 real_name, status = self._base_match(data, schema, "schema", real_name) 

210 

211 if self.update_mode and data is not None: 

212 self._save_process_original(real_name=real_name, status=status, data=data) 

213 

214 return status 

215 

216 def _base_match( 

217 self, 

218 data: Optional[dict], 

219 current_data: dict, 

220 type_data: Literal["json", "schema"], 

221 name: str, 

222 ) -> tuple[str, Optional[bool]]: 

223 """ 

224 Checks if data matches the JSON schema, creates/updates it if needed, 

225 and writes statistics to GLOBAL_STATS. 

226 

227 Returns: 

228 True – the schema has been updated, 

229 False – the schema has not changed, 

230 None – a new schema has been created. 

231 """ 

232 __tracebackhide__ = not self.debug_mode # прячем из стека pytest 

233 

234 # Проверка имени 

235 name = self._process_name(name) 

236 

237 base_path = self.snapshot_dir / f"{name}.schema.json" 

238 if not self.ci_cd_mode: 

239 schema_path = base_path 

240 else: 

241 schema_path = self.snapshot_dir / "ci.cd" / f"{name}.schema.json" 

242 self.used_schemas.add(schema_path.name) 

243 

244 # --- состояние ДО проверки --- 

245 schema_exists_before = base_path.exists() 

246 

247 def make_schema(current_data: dict | list, type_data: Literal["json", "schema"]) -> dict: 

248 if type_data == "schema": 

249 return dict(current_data) 

250 elif type_data == "json": 

251 self.conv.clear_data() 

252 self.conv.add_json(current_data) 

253 return self.conv.run() 

254 else: 

255 raise ValueError("Not correct type argument") 

256 

257 # --- когда схемы ещё нет --- 

258 if not schema_exists_before: 

259 if not self.update_mode and not self.reset_mode and not self.ci_cd_mode: 

260 raise pytest.fail.Exception( 

261 f"Schema `{name}` not found." 

262 "Run the test with the --schema-update option to create it." 

263 ) 

264 elif not self.update_actions.get("add"): 

265 raise pytest.fail.Exception( 

266 f"Schema `{name}` not found and adding new schemas is disabled." 

267 ) 

268 

269 current_schema = make_schema(current_data, type_data) 

270 

271 with open(schema_path, "w", encoding="utf-8") as f: 

272 json.dump(current_schema, f, indent=2, ensure_ascii=False) 

273 

274 self.logger.info(f"New schema `{name}` has been created.") 

275 GLOBAL_STATS.add_created(schema_path.name) # статистика «создана» 

276 return name, None 

277 else: 

278 with open(base_path, "r", encoding="utf-8") as f: 

279 existing_schema = json.load(f) 

280 

281 # --- схема уже была: сравнение и валидация -------------------------------- 

282 schema_updated = False 

283 

284 def merge_schemas( 

285 old: dict, new: dict | list, type_data: Literal["json", "schema"] 

286 ) -> dict: 

287 self.conv.clear_data() 

288 self.conv.add_schema(old) 

289 if type_data == "schema": 

290 self.conv.add_schema(dict(new)) 

291 elif type_data == "json": 

292 self.conv.add_json(new) 

293 else: 

294 raise ValueError("Not correct type argument") 

295 result = self.conv.run() 

296 return result 

297 

298 if ( 

299 type_data == "json" or existing_schema != current_data 

300 ): # есть отличия или могут быть 

301 if ( 

302 self.update_mode or self.ci_cd_mode or self.reset_mode 

303 ) and self.update_actions.get("update"): 

304 # обновляем файл 

305 if self.reset_mode and not self.update_mode and not self.ci_cd_mode: 

306 current_schema = make_schema(current_data, type_data) 

307 

308 differences = self.differ.compare( 

309 dict(existing_schema), current_schema 

310 ).render() 

311 diff_count = self.differ.property.calc_diff() 

312 if any(diff_count[key] > 0 for key in diff_count if key != "UNKNOWN"): 

313 GLOBAL_STATS.add_updated(schema_path.name, differences) 

314 

315 with open(schema_path, "w", encoding="utf-8") as f: 

316 json.dump(current_schema, f, indent=2, ensure_ascii=False) 

317 self.logger.warning(f"Schema `{name}` reseted.\n\n{differences}") 

318 elif self.update_mode or self.ci_cd_mode and not self.reset_mode: 

319 merged_schema = merge_schemas(existing_schema, current_data, type_data) 

320 

321 differences = self.differ.compare( 

322 dict(existing_schema), merged_schema 

323 ).render() 

324 diff_count = self.differ.property.calc_diff() 

325 if any( 

326 diff_count[key] > 0 

327 for key in diff_count 

328 if key not in ["UNKNOWN", "NO_DIFF"] 

329 ): 

330 GLOBAL_STATS.add_updated(schema_path.name, differences) 

331 

332 with open(schema_path, "w", encoding="utf-8") as f: 

333 json.dump(merged_schema, f, indent=2, ensure_ascii=False) 

334 

335 self.logger.warning(f"Schema `{name}` updated.\n\n{differences}") 

336 else: # both update_mode and reset_mode are True 

337 raise ValueError( 

338 "update_mode, ci_cd_mode and reset_mode" 

339 " cannot be True at the same time." 

340 ) 

341 schema_updated = True 

342 elif data is not None: 

343 merged_schema = merge_schemas(existing_schema, current_data, type_data) 

344 

345 differences = self.differ.compare(dict(existing_schema), merged_schema).render() 

346 GLOBAL_STATS.add_uncommitted(schema_path.name, differences) 

347 

348 # только валидируем по старой схеме 

349 try: 

350 validate( 

351 instance=data, 

352 schema=existing_schema, 

353 format_checker=FormatChecker(), 

354 ) 

355 except ValidationError as e: 

356 pytest.fail( 

357 f"\n\n{differences}\n\nValidation error in `{name}`: {e.message}" 

358 ) 

359 elif data is not None and type_data == "schema": 

360 # схемы совпали – всё равно валидируем на случай формальных ошибок 

361 try: 

362 validate( 

363 instance=data, 

364 schema=existing_schema, 

365 format_checker=FormatChecker(), 

366 ) 

367 except ValidationError as e: 

368 merged_schema = merge_schemas(existing_schema, current_data, type_data) 

369 

370 differences = self.differ.compare(dict(existing_schema), merged_schema).render() 

371 pytest.fail(f"\n\n{differences}\n\nValidation error in `{name}`: {e.message}") 

372 

373 return name, schema_updated