Coverage for pytest_jsonschema_snapshot / core.py: 42%

158 statements  

« prev     ^ index     » next       coverage.py v7.13.3, created at 2026-02-04 20:33 +0000

1""" 

2Core logic of the plugin. 

3""" 

4 

5import json 

6import logging 

7from pathlib import Path 

8from typing import TYPE_CHECKING, Any, Callable, Literal, Optional 

9 

10import pathvalidate 

11 

12if TYPE_CHECKING: 

13 from jsonschema_diff import JsonSchemaDiff 

14 

15import pytest 

16from genschema import Converter, PseudoArrayHandler 

17from genschema.comparators import ( 

18 DeleteElement, 

19 FormatComparator, 

20 RequiredComparator, 

21 SchemaVersionComparator, 

22) 

23from jsonschema import FormatChecker, ValidationError, validate 

24 

25from .stats import GLOBAL_STATS 

26from .tools import NameMaker 

27 

28 

29class SchemaShot: 

30 def __init__( 

31 self, 

32 root_dir: Path, 

33 differ: "JsonSchemaDiff", 

34 callable_regex: str = "{class_method=.}", 

35 format_mode: str = "on", 

36 update_mode: bool = False, 

37 reset_mode: bool = False, 

38 update_actions: Optional[dict[str, bool]] = {}, 

39 save_original: bool = False, 

40 debug_mode: bool = False, 

41 snapshot_dir_name: str = "__snapshots__", 

42 ): 

43 """ 

44 Initializes SchemaShot. 

45 

46 Args: 

47 root_dir: Project root directory 

48 update_mode: Update mode (--schema-update) 

49 snapshot_dir_name: Name of the directory for snapshots 

50 """ 

51 self.root_dir: Path = root_dir 

52 self.differ: "JsonSchemaDiff" = differ 

53 self.callable_regex: str = callable_regex 

54 self.format_mode: str = format_mode 

55 # self.examples_limit: int = examples_limit 

56 self.update_mode: bool = update_mode 

57 self.reset_mode: bool = reset_mode 

58 self.update_actions: dict[str, bool] = dict(update_actions or {}) 

59 self.save_original: bool = save_original 

60 self.debug_mode: bool = debug_mode 

61 self.snapshot_dir: Path = root_dir / snapshot_dir_name 

62 self.used_schemas: set[str] = set() 

63 

64 self.conv = Converter( 

65 pseudo_handler=PseudoArrayHandler(), 

66 base_of="anyOf", 

67 ) 

68 self.conv.register(FormatComparator()) 

69 self.conv.register(RequiredComparator()) 

70 # self.conv.register(EmptyComparator()) 

71 self.conv.register(SchemaVersionComparator()) 

72 self.conv.register(DeleteElement()) 

73 self.conv.register(DeleteElement("isPseudoArray")) 

74 

75 self.logger = logging.getLogger(__name__) 

76 # добавляем вывод в stderr 

77 handler = logging.StreamHandler() 

78 handler.setFormatter(logging.Formatter("%(levelname)s %(name)s: %(message)s")) 

79 self.logger.addHandler(handler) 

80 # и поднимаем уровень, чтобы INFO/DEBUG прошли через handler 

81 self.logger.setLevel(logging.INFO) 

82 

83 # Создаем директорию для снэпшотов, если её нет 

84 if not self.snapshot_dir.exists(): 

85 self.snapshot_dir.mkdir(parents=True) 

86 

87 def _process_name(self, name: str | int | Callable | list[str | int | Callable]) -> str: 

88 """ 

89 1. Converts callable to string 

90 2. Checks for validity 

91 

92 Returns: 

93 str 

94 Raises: 

95 ValueError 

96 """ 

97 

98 __tracebackhide__ = not self.debug_mode # прячем из стека pytest 

99 

100 def process_name_part(part: str | int | Callable) -> str: 

101 if callable(part): 

102 return NameMaker.format(part, self.callable_regex) 

103 else: 

104 return str(part) 

105 

106 if isinstance(name, (list, tuple)): 

107 name = ".".join([process_name_part(part) for part in name]) 

108 else: 

109 name = process_name_part(name) 

110 

111 if not isinstance(name, str) or not name: 

112 raise ValueError("Schema name must be a non-empty string") 

113 

114 try: 

115 # auto подберёт правила под текущую ОС 

116 pathvalidate.validate_filename( 

117 name, platform="auto" 

118 ) # allow_reserved=False по умолчанию 

119 except pathvalidate.ValidationError as e: 

120 raise ValueError(f"Invalid schema name: {e}") from None 

121 

122 return name 

123 

124 def _save_process_original(self, real_name: str, status: Optional[bool], data: dict) -> None: 

125 json_name = f"{real_name}.json" 

126 json_path = self.snapshot_dir / json_name 

127 

128 if self.save_original: 

129 available_to_create = not json_path.exists() or status is None 

130 available_to_update = status is True 

131 

132 if (available_to_create and self.update_actions.get("add")) or ( 

133 available_to_update and self.update_actions.get("update") 

134 ): 

135 with open(json_path, "w", encoding="utf-8") as f: 

136 json.dump(data, f, indent=2, ensure_ascii=False) 

137 

138 if available_to_create: 

139 GLOBAL_STATS.add_created(json_name) 

140 elif available_to_update: 

141 GLOBAL_STATS.add_updated(json_name) 

142 else: 

143 raise ValueError(f"Unexpected status: {status}") 

144 elif json_path.exists() and self.update_actions.get("delete"): 

145 # удаляем 

146 json_path.unlink() 

147 GLOBAL_STATS.add_deleted(json_name) 

148 

149 def assert_json_match( 

150 self, 

151 data: dict, 

152 name: str | int | Callable | list[str | int | Callable], 

153 ) -> Optional[bool]: 

154 """ 

155 Asserts for JSON, converts it to schema and then compares. 

156 

157 Returns: 

158 True – the schema has been updated, 

159 False – the schema has not changed, 

160 None – a new schema has been created. 

161 """ 

162 

163 real_name = self._process_name(name) 

164 

165 real_name, status = self._base_match(data, data, "json", real_name) 

166 

167 if self.update_mode or self.reset_mode: 

168 self._save_process_original(real_name=real_name, status=status, data=data) 

169 

170 return status 

171 

172 def assert_schema_match( 

173 self, 

174 schema: dict[str, Any], 

175 name: str | int | Callable | list[str | int | Callable], 

176 *, 

177 data: Optional[dict] = None, 

178 ) -> Optional[bool]: 

179 """ 

180 Accepts a JSON-schema directly and compares it immediately. 

181 

182 Returns: 

183 True – the schema has been updated, 

184 False – the schema has not changed, 

185 None – a new schema has been created. 

186 """ 

187 

188 real_name = self._process_name(name) 

189 

190 real_name, status = self._base_match(data, schema, "schema", real_name) 

191 

192 if self.update_mode and data is not None: 

193 self._save_process_original(real_name=real_name, status=status, data=data) 

194 

195 return status 

196 

197 def _base_match( 

198 self, 

199 data: Optional[dict], 

200 current_data: dict, 

201 type_data: Literal["json", "schema"], 

202 name: str, 

203 ) -> tuple[str, Optional[bool]]: 

204 """ 

205 Checks if data matches the JSON schema, creates/updates it if needed, 

206 and writes statistics to GLOBAL_STATS. 

207 

208 Returns: 

209 True – the schema has been updated, 

210 False – the schema has not changed, 

211 None – a new schema has been created. 

212 """ 

213 __tracebackhide__ = not self.debug_mode # прячем из стека pytest 

214 

215 # Проверка имени 

216 name = self._process_name(name) 

217 

218 schema_path = self.snapshot_dir / f"{name}.schema.json" 

219 self.used_schemas.add(schema_path.name) 

220 

221 # --- состояние ДО проверки --- 

222 schema_exists_before = schema_path.exists() 

223 

224 def make_schema(current_data: dict | list, type_data: Literal["json", "schema"]) -> dict: 

225 if type_data == "schema": 

226 return dict(current_data) 

227 elif type_data == "json": 

228 self.conv.clear_data() 

229 self.conv.add_json(current_data) 

230 return self.conv.run() 

231 else: 

232 raise ValueError("Not correct type argument") 

233 

234 # --- когда схемы ещё нет --- 

235 if not schema_exists_before: 

236 if not self.update_mode and not self.reset_mode: 

237 raise pytest.fail.Exception( 

238 f"Schema `{name}` not found." 

239 "Run the test with the --schema-update option to create it." 

240 ) 

241 elif not self.update_actions.get("add"): 

242 raise pytest.fail.Exception( 

243 f"Schema `{name}` not found and adding new schemas is disabled." 

244 ) 

245 

246 current_schema = make_schema(current_data, type_data) 

247 

248 with open(schema_path, "w", encoding="utf-8") as f: 

249 json.dump(current_schema, f, indent=2, ensure_ascii=False) 

250 

251 self.logger.info(f"New schema `{name}` has been created.") 

252 GLOBAL_STATS.add_created(schema_path.name) # статистика «создана» 

253 return name, None 

254 else: 

255 with open(schema_path, "r", encoding="utf-8") as f: 

256 existing_schema = json.load(f) 

257 

258 # --- схема уже была: сравнение и валидация -------------------------------- 

259 schema_updated = False 

260 

261 def merge_schemas( 

262 old: dict, new: dict | list, type_data: Literal["json", "schema"] 

263 ) -> dict: 

264 self.conv.clear_data() 

265 self.conv.add_schema(old) 

266 if type_data == "schema": 

267 self.conv.add_schema(dict(new)) 

268 elif type_data == "json": 

269 self.conv.add_json(new) 

270 else: 

271 raise ValueError("Not correct type argument") 

272 result = self.conv.run() 

273 return result 

274 

275 if ( 

276 type_data == "json" or existing_schema != current_data 

277 ): # есть отличия или могут быть 

278 if (self.update_mode or self.reset_mode) and self.update_actions.get("update"): 

279 # обновляем файл 

280 if self.reset_mode and not self.update_mode: 

281 current_schema = make_schema(current_data, type_data) 

282 

283 differences = self.differ.compare( 

284 dict(existing_schema), current_schema 

285 ).render() 

286 diff_count = self.differ.property.calc_diff() 

287 if any(diff_count[key] > 0 for key in diff_count if key != "UNKNOWN"): 

288 GLOBAL_STATS.add_updated(schema_path.name, differences) 

289 

290 with open(schema_path, "w", encoding="utf-8") as f: 

291 json.dump(current_schema, f, indent=2, ensure_ascii=False) 

292 self.logger.warning(f"Schema `{name}` reseted.\n\n{differences}") 

293 elif self.update_mode and not self.reset_mode: 

294 merged_schema = merge_schemas(existing_schema, current_data, type_data) 

295 

296 differences = self.differ.compare( 

297 dict(existing_schema), merged_schema 

298 ).render() 

299 diff_count = self.differ.property.calc_diff() 

300 if any(diff_count[key] > 0 for key in diff_count if key != "UNKNOWN"): 

301 GLOBAL_STATS.add_updated(schema_path.name, differences) 

302 

303 with open(schema_path, "w", encoding="utf-8") as f: 

304 json.dump(merged_schema, f, indent=2, ensure_ascii=False) 

305 

306 self.logger.warning(f"Schema `{name}` updated.\n\n{differences}") 

307 else: # both update_mode and reset_mode are True 

308 raise ValueError( 

309 "Both update_mode and reset_mode cannot be True at the same time." 

310 ) 

311 schema_updated = True 

312 elif data is not None: 

313 merged_schema = merge_schemas(existing_schema, current_data, type_data) 

314 

315 differences = self.differ.compare(dict(existing_schema), merged_schema).render() 

316 GLOBAL_STATS.add_uncommitted(schema_path.name, differences) 

317 

318 # только валидируем по старой схеме 

319 try: 

320 validate( 

321 instance=data, 

322 schema=existing_schema, 

323 format_checker=FormatChecker(), 

324 ) 

325 except ValidationError as e: 

326 pytest.fail( 

327 f"\n\n{differences}\n\nValidation error in `{name}`: {e.message}" 

328 ) 

329 elif data is not None and type_data == "schema": 

330 # схемы совпали – всё равно валидируем на случай формальных ошибок 

331 try: 

332 validate( 

333 instance=data, 

334 schema=existing_schema, 

335 format_checker=FormatChecker(), 

336 ) 

337 except ValidationError as e: 

338 merged_schema = merge_schemas(existing_schema, current_data, type_data) 

339 

340 differences = self.differ.compare(dict(existing_schema), merged_schema).render() 

341 pytest.fail(f"\n\n{differences}\n\nValidation error in `{name}`: {e.message}") 

342 

343 return name, schema_updated