Coverage for pytest_jsonschema_snapshot/core.py: 38%

130 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-02 00:37 +0000

1""" 

2Core logic of the plugin. 

3""" 

4 

5import json 

6import logging 

7from pathlib import Path 

8from typing import TYPE_CHECKING, Any, Callable, Optional, Set 

9 

10import pathvalidate 

11 

12if TYPE_CHECKING: 

13 from jsonschema_diff import JsonSchemaDiff 

14 

15import pytest 

16from jsonschema import FormatChecker, ValidationError, validate 

17 

18from .stats import GLOBAL_STATS 

19from .tools import JsonToSchemaConverter, NameMaker 

20 

21 

22class SchemaShot: 

23 def __init__( 

24 self, 

25 root_dir: Path, 

26 differ: "JsonSchemaDiff", 

27 callable_regex: str = "{class_method=.}", 

28 format_mode: str = "on", 

29 update_mode: bool = False, 

30 reset_mode: bool = False, 

31 update_actions: dict[str, bool] = {}, 

32 save_original: bool = False, 

33 debug_mode: bool = False, 

34 snapshot_dir_name: str = "__snapshots__", 

35 ): 

36 """ 

37 Initializes SchemaShot. 

38 

39 Args: 

40 root_dir: Project root directory 

41 update_mode: Update mode (--schema-update) 

42 snapshot_dir_name: Name of the directory for snapshots 

43 """ 

44 self.root_dir: Path = root_dir 

45 self.differ: "JsonSchemaDiff" = differ 

46 self.callable_regex: str = callable_regex 

47 self.format_mode: str = format_mode 

48 # self.examples_limit: int = examples_limit 

49 self.update_mode: bool = update_mode 

50 self.reset_mode: bool = reset_mode 

51 self.update_actions: dict[str, bool] = update_actions 

52 self.save_original: bool = save_original 

53 self.debug_mode: bool = debug_mode 

54 self.snapshot_dir: Path = root_dir / snapshot_dir_name 

55 self.used_schemas: Set[str] = set() 

56 

57 self.logger = logging.getLogger(__name__) 

58 # добавляем вывод в stderr 

59 handler = logging.StreamHandler() 

60 handler.setFormatter(logging.Formatter("%(levelname)s %(name)s: %(message)s")) 

61 self.logger.addHandler(handler) 

62 # и поднимаем уровень, чтобы INFO/DEBUG прошли через handler 

63 self.logger.setLevel(logging.INFO) 

64 

65 # Создаем директорию для снэпшотов, если её нет 

66 if not self.snapshot_dir.exists(): 

67 self.snapshot_dir.mkdir(parents=True) 

68 

69 def _process_name(self, name: str | int | Callable | list[str | int | Callable]) -> str: 

70 """ 

71 1. Converts callable to string 

72 2. Checks for validity 

73 

74 Returns: 

75 str 

76 Raises: 

77 ValueError 

78 """ 

79 

80 __tracebackhide__ = not self.debug_mode # прячем из стека pytest 

81 

82 def process_name_part(part: str | int | Callable) -> str: 

83 if callable(part): 

84 return NameMaker.format(part, self.callable_regex) 

85 else: 

86 return str(part) 

87 

88 if isinstance(name, (list, tuple)): 

89 name = ".".join([process_name_part(part) for part in name]) 

90 else: 

91 name = process_name_part(name) 

92 

93 if not isinstance(name, str) or not name: 

94 raise ValueError("Schema name must be a non-empty string") 

95 

96 try: 

97 # auto подберёт правила под текущую ОС 

98 pathvalidate.validate_filename( 

99 name, platform="auto" 

100 ) # allow_reserved=False по умолчанию 

101 except ValidationError as e: 

102 raise ValueError(f"Invalid schema name: {e}") from None 

103 

104 return name 

105 

106 def _save_process_original(self, real_name: str, status: Optional[bool], data: dict) -> None: 

107 json_name = f"{real_name}.json" 

108 json_path = self.snapshot_dir / json_name 

109 

110 if self.save_original: 

111 available_to_create = not json_path.exists() or status is None 

112 available_to_update = status is True 

113 

114 if (available_to_create and self.update_actions.get("add")) or ( 

115 available_to_update and self.update_actions.get("update") 

116 ): 

117 with open(json_path, "w", encoding="utf-8") as f: 

118 json.dump(data, f, indent=2, ensure_ascii=False) 

119 

120 if available_to_create: 

121 GLOBAL_STATS.add_created(json_name) 

122 elif available_to_update: 

123 GLOBAL_STATS.add_updated(json_name) 

124 else: 

125 raise ValueError(f"Unexpected status: {status}") 

126 elif json_path.exists() and self.update_actions.get("delete"): 

127 # удаляем 

128 json_path.unlink() 

129 GLOBAL_STATS.add_deleted(json_name) 

130 

131 def assert_json_match( 

132 self, 

133 data: dict, 

134 name: str | int | Callable | list[str | int | Callable], 

135 ) -> Optional[bool]: 

136 """ 

137 Asserts for JSON, converts it to schema and then compares. 

138 

139 Returns: 

140 True – the schema has been updated, 

141 False – the schema has not changed, 

142 None – a new schema has been created. 

143 """ 

144 

145 real_name = self._process_name(name) 

146 

147 builder = JsonToSchemaConverter( 

148 format_mode=self.format_mode # type: ignore[arg-type] 

149 ) # , examples=self.examples_limit) 

150 builder.add_object(data) 

151 current_schema = builder.to_schema() 

152 

153 real_name, status = self._base_match(data, current_schema, real_name) 

154 

155 if self.update_mode or self.reset_mode: 

156 self._save_process_original(real_name=real_name, status=status, data=data) 

157 

158 return status 

159 

160 def assert_schema_match( 

161 self, 

162 schema: dict[str, Any], 

163 name: str | int | Callable | list[str | int | Callable], 

164 *, 

165 data: Optional[dict] = None, 

166 ) -> Optional[bool]: 

167 """ 

168 Accepts a JSON-schema directly and compares it immediately. 

169 

170 Returns: 

171 True – the schema has been updated, 

172 False – the schema has not changed, 

173 None – a new schema has been created. 

174 """ 

175 

176 real_name = self._process_name(name) 

177 

178 real_name, status = self._base_match(data, schema, real_name) 

179 

180 if self.update_mode and data is not None: 

181 self._save_process_original(real_name=real_name, status=status, data=data) 

182 

183 return status 

184 

185 def _base_match( 

186 self, 

187 data: Optional[dict], 

188 current_schema: dict, 

189 name: str, 

190 ) -> tuple[str, Optional[bool]]: 

191 """ 

192 Checks if data matches the JSON schema, creates/updates it if needed, 

193 and writes statistics to GLOBAL_STATS. 

194 

195 Returns: 

196 True – the schema has been updated, 

197 False – the schema has not changed, 

198 None – a new schema has been created. 

199 """ 

200 __tracebackhide__ = not self.debug_mode # прячем из стека pytest 

201 

202 # Проверка имени 

203 name = self._process_name(name) 

204 

205 schema_path = self.snapshot_dir / f"{name}.schema.json" 

206 self.used_schemas.add(schema_path.name) 

207 

208 # --- состояние ДО проверки --- 

209 schema_exists_before = schema_path.exists() 

210 

211 # --- когда схемы ещё нет --- 

212 if not schema_exists_before: 

213 if not self.update_mode and not self.reset_mode: 

214 raise pytest.fail.Exception( 

215 f"Schema `{name}` not found." 

216 "Run the test with the --schema-update option to create it." 

217 ) 

218 elif not self.update_actions.get("add"): 

219 raise pytest.fail.Exception( 

220 f"Schema `{name}` not found and adding new schemas is disabled." 

221 ) 

222 

223 with open(schema_path, "w", encoding="utf-8") as f: 

224 json.dump(current_schema, f, indent=2, ensure_ascii=False) 

225 

226 self.logger.info(f"New schema `{name}` has been created.") 

227 GLOBAL_STATS.add_created(schema_path.name) # статистика «создана» 

228 return name, None 

229 else: 

230 with open(schema_path, "r", encoding="utf-8") as f: 

231 existing_schema = json.load(f) 

232 

233 # --- схема уже была: сравнение и валидация -------------------------------- 

234 schema_updated = False 

235 

236 if existing_schema != current_schema: # есть отличия 

237 if (self.update_mode or self.reset_mode) and self.update_actions.get("update"): 

238 # обновляем файл 

239 if self.reset_mode and not self.update_mode: 

240 differences = self.differ.compare( 

241 dict(existing_schema), current_schema 

242 ).render() 

243 GLOBAL_STATS.add_updated(schema_path.name, differences) 

244 

245 with open(schema_path, "w", encoding="utf-8") as f: 

246 json.dump(current_schema, f, indent=2, ensure_ascii=False) 

247 self.logger.warning(f"Schema `{name}` updated (reset).\n\n{differences}") 

248 elif self.update_mode and not self.reset_mode: 

249 builder = JsonToSchemaConverter( 

250 format_mode=self.format_mode # type: ignore[arg-type] 

251 ) # , examples=self.examples_limit) 

252 builder.add_schema(existing_schema) 

253 builder.add_schema(current_schema) 

254 merged_schema = builder.to_schema() 

255 

256 differences = self.differ.compare( 

257 dict(existing_schema), merged_schema 

258 ).render() 

259 GLOBAL_STATS.add_updated(schema_path.name, differences) 

260 

261 with open(schema_path, "w", encoding="utf-8") as f: 

262 json.dump(merged_schema, f, indent=2, ensure_ascii=False) 

263 

264 self.logger.warning(f"Schema `{name}` updated (update).\n\n{differences}") 

265 else: # both update_mode and reset_mode are True 

266 raise ValueError( 

267 "Both update_mode and reset_mode cannot be True at the same time." 

268 ) 

269 schema_updated = True 

270 elif data is not None: 

271 differences = self.differ.compare( 

272 dict(existing_schema), current_schema 

273 ).render() 

274 GLOBAL_STATS.add_uncommitted(schema_path.name, differences) 

275 

276 # только валидируем по старой схеме 

277 try: 

278 validate( 

279 instance=data, 

280 schema=existing_schema, 

281 format_checker=FormatChecker(), 

282 ) 

283 except ValidationError as e: 

284 pytest.fail( 

285 f"\n\n{differences}\n\nValidation error in `{name}`: {e.message}" 

286 ) 

287 elif data is not None: 

288 # схемы совпали – всё равно валидируем на случай формальных ошибок 

289 try: 

290 validate( 

291 instance=data, 

292 schema=existing_schema, 

293 format_checker=FormatChecker(), 

294 ) 

295 except ValidationError as e: 

296 differences = self.differ.compare( 

297 dict(existing_schema), current_schema 

298 ).render() 

299 pytest.fail(f"\n\n{differences}\n\nValidation error in `{name}`: {e.message}") 

300 

301 return name, schema_updated