Coverage for pytest_jsonschema_snapshot / core.py: 42%
174 statements
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-08 01:59 +0000
« prev ^ index » next coverage.py v7.13.4, created at 2026-03-08 01:59 +0000
1"""
2Core logic of the plugin.
3"""
5import json
6import logging
7import shutil
8from pathlib import Path
9from typing import TYPE_CHECKING, Any, Callable, Literal, Optional
11import pathvalidate
13if TYPE_CHECKING:
14 from jsonschema_diff import JsonSchemaDiff
16import pytest
17from genschema import Converter, PseudoArrayHandler
18from genschema.comparators import (
19 DeleteElement,
20 FormatComparator,
21 RequiredComparator,
22 SchemaVersionComparator,
23)
24from jsonschema import FormatChecker, ValidationError, validate
26from .stats import GLOBAL_STATS
27from .tools import NameMaker
30class SchemaShot:
31 def __init__(
32 self,
33 root_dir: Path,
34 differ: "JsonSchemaDiff",
35 callable_regex: str = "{class_method=.}",
36 format_mode: str = "on",
37 update_mode: bool = False,
38 ci_cd_mode: bool = False,
39 reset_mode: bool = False,
40 update_actions: Optional[dict[str, bool]] = {},
41 save_original: bool = False,
42 debug_mode: bool = False,
43 snapshot_dir_name: str = "__snapshots__",
44 ):
45 """
46 Initializes SchemaShot.
48 Args:
49 root_dir: Project root directory
50 update_mode: Update mode (--schema-update)
51 snapshot_dir_name: Name of the directory for snapshots
52 """
53 self.root_dir: Path = root_dir
54 self.differ: "JsonSchemaDiff" = differ
55 self.callable_regex: str = callable_regex
56 self.format_mode: str = format_mode
57 self.ci_cd_mode: bool = ci_cd_mode
58 # self.examples_limit: int = examples_limit
59 self.update_mode: bool = update_mode
60 self.reset_mode: bool = reset_mode
61 self.update_actions: dict[str, bool] = dict(update_actions or {})
62 self.save_original: bool = save_original
63 self.debug_mode: bool = debug_mode
64 self.snapshot_dir: Path = root_dir / snapshot_dir_name
65 self.used_schemas: set[str] = set()
67 self.conv = Converter(
68 pseudo_handler=PseudoArrayHandler(),
69 base_of="anyOf",
70 )
71 self.conv.register(FormatComparator())
72 self.conv.register(RequiredComparator())
73 # self.conv.register(EmptyComparator())
74 self.conv.register(SchemaVersionComparator())
75 self.conv.register(DeleteElement())
76 self.conv.register(DeleteElement("isPseudoArray"))
78 self.logger = logging.getLogger(__name__)
79 # добавляем вывод в stderr
80 handler = logging.StreamHandler()
81 handler.setFormatter(logging.Formatter("%(levelname)s %(name)s: %(message)s"))
82 self.logger.addHandler(handler)
83 # и поднимаем уровень, чтобы INFO/DEBUG прошли через handler
84 self.logger.setLevel(logging.INFO)
86 # Создаем директорию для снэпшотов, если её нет
87 if not self.snapshot_dir.exists():
88 self.snapshot_dir.mkdir(parents=True)
89 cicd = self.snapshot_dir / "ci.cd"
90 if cicd.exists():
91 shutil.rmtree(cicd)
92 cicd.mkdir(parents=True)
94 def _process_name(self, name: str | int | Callable | list[str | int | Callable]) -> str:
95 """
96 1. Converts callable to string
97 2. Checks for validity
99 Returns:
100 str
101 Raises:
102 ValueError
103 """
105 __tracebackhide__ = not self.debug_mode # прячем из стека pytest
107 def process_name_part(part: str | int | Callable) -> str:
108 if callable(part):
109 return NameMaker.format(part, self.callable_regex)
110 else:
111 return str(part)
113 if isinstance(name, (list, tuple)):
114 name = ".".join([process_name_part(part) for part in name])
115 else:
116 name = process_name_part(name)
118 if not isinstance(name, str) or not name:
119 raise ValueError("Schema name must be a non-empty string")
121 try:
122 # auto подберёт правила под текущую ОС
123 pathvalidate.validate_filename(
124 name, platform="auto"
125 ) # allow_reserved=False по умолчанию
126 except pathvalidate.ValidationError as e:
127 raise ValueError(f"Invalid schema name: {e}") from None
129 return name
131 def _save_process_original(self, real_name: str, status: Optional[bool], data: dict) -> None:
132 json_name = f"{real_name}.json"
133 schema_name = f"{real_name}.schema.json"
134 base_j_path = self.snapshot_dir / json_name
135 base_s_path = self.snapshot_dir / json_name
136 if not self.ci_cd_mode:
137 json_path = base_j_path
138 schema_path = base_s_path
139 else:
140 json_path = self.snapshot_dir / "ci.cd" / json_name
141 schema_path = self.snapshot_dir / "ci.cd" / schema_name
143 if self.save_original:
144 available_to_create = (
145 (not json_path.exists() or status is None) and not self.ci_cd_mode
146 ) or (schema_path.exists() and not base_s_path.exists() and self.ci_cd_mode)
147 available_to_update = (status is True and not self.ci_cd_mode) or (
148 schema_path.exists() and base_s_path.exists() and self.ci_cd_mode
149 )
151 if (available_to_create and self.update_actions.get("add")) or (
152 available_to_update and self.update_actions.get("update")
153 ):
154 with open(json_path, "w", encoding="utf-8") as f:
155 json.dump(data, f, indent=2, ensure_ascii=False)
157 if available_to_create:
158 GLOBAL_STATS.add_created(json_name)
159 elif available_to_update:
160 GLOBAL_STATS.add_updated(json_name)
161 else:
162 raise ValueError(f"Unexpected status: {status}")
163 elif not self.ci_cd_mode and json_path.exists() and self.update_actions.get("delete"):
164 # удаляем
165 json_path.unlink()
166 GLOBAL_STATS.add_deleted(json_name)
168 def assert_json_match(
169 self,
170 data: dict,
171 name: str | int | Callable | list[str | int | Callable],
172 ) -> Optional[bool]:
173 """
174 Asserts for JSON, converts it to schema and then compares.
176 Returns:
177 True – the schema has been updated,
178 False – the schema has not changed,
179 None – a new schema has been created.
180 """
182 real_name = self._process_name(name)
184 real_name, status = self._base_match(data, data, "json", real_name)
186 if self.update_mode or self.reset_mode or self.ci_cd_mode:
187 self._save_process_original(real_name=real_name, status=status, data=data)
189 return status
191 def assert_schema_match(
192 self,
193 schema: dict[str, Any],
194 name: str | int | Callable | list[str | int | Callable],
195 *,
196 data: Optional[dict] = None,
197 ) -> Optional[bool]:
198 """
199 Accepts a JSON-schema directly and compares it immediately.
201 Returns:
202 True – the schema has been updated,
203 False – the schema has not changed,
204 None – a new schema has been created.
205 """
207 real_name = self._process_name(name)
209 real_name, status = self._base_match(data, schema, "schema", real_name)
211 if self.update_mode and data is not None:
212 self._save_process_original(real_name=real_name, status=status, data=data)
214 return status
216 def _base_match(
217 self,
218 data: Optional[dict],
219 current_data: dict,
220 type_data: Literal["json", "schema"],
221 name: str,
222 ) -> tuple[str, Optional[bool]]:
223 """
224 Checks if data matches the JSON schema, creates/updates it if needed,
225 and writes statistics to GLOBAL_STATS.
227 Returns:
228 True – the schema has been updated,
229 False – the schema has not changed,
230 None – a new schema has been created.
231 """
232 __tracebackhide__ = not self.debug_mode # прячем из стека pytest
234 # Проверка имени
235 name = self._process_name(name)
237 base_path = self.snapshot_dir / f"{name}.schema.json"
238 if not self.ci_cd_mode:
239 schema_path = base_path
240 else:
241 schema_path = self.snapshot_dir / "ci.cd" / f"{name}.schema.json"
242 self.used_schemas.add(schema_path.name)
244 # --- состояние ДО проверки ---
245 schema_exists_before = base_path.exists()
247 def make_schema(current_data: dict | list, type_data: Literal["json", "schema"]) -> dict:
248 if type_data == "schema":
249 return dict(current_data)
250 elif type_data == "json":
251 self.conv.clear_data()
252 self.conv.add_json(current_data)
253 return self.conv.run()
254 else:
255 raise ValueError("Not correct type argument")
257 # --- когда схемы ещё нет ---
258 if not schema_exists_before:
259 if not self.update_mode and not self.reset_mode and not self.ci_cd_mode:
260 raise pytest.fail.Exception(
261 f"Schema `{name}` not found."
262 "Run the test with the --schema-update option to create it."
263 )
264 elif not self.update_actions.get("add"):
265 raise pytest.fail.Exception(
266 f"Schema `{name}` not found and adding new schemas is disabled."
267 )
269 current_schema = make_schema(current_data, type_data)
271 with open(schema_path, "w", encoding="utf-8") as f:
272 json.dump(current_schema, f, indent=2, ensure_ascii=False)
274 self.logger.info(f"New schema `{name}` has been created.")
275 GLOBAL_STATS.add_created(schema_path.name) # статистика «создана»
276 return name, None
277 else:
278 with open(base_path, "r", encoding="utf-8") as f:
279 existing_schema = json.load(f)
281 # --- схема уже была: сравнение и валидация --------------------------------
282 schema_updated = False
284 def merge_schemas(
285 old: dict, new: dict | list, type_data: Literal["json", "schema"]
286 ) -> dict:
287 self.conv.clear_data()
288 self.conv.add_schema(old)
289 if type_data == "schema":
290 self.conv.add_schema(dict(new))
291 elif type_data == "json":
292 self.conv.add_json(new)
293 else:
294 raise ValueError("Not correct type argument")
295 result = self.conv.run()
296 return result
298 if (
299 type_data == "json" or existing_schema != current_data
300 ): # есть отличия или могут быть
301 if (
302 self.update_mode or self.ci_cd_mode or self.reset_mode
303 ) and self.update_actions.get("update"):
304 # обновляем файл
305 if self.reset_mode and not self.update_mode and not self.ci_cd_mode:
306 current_schema = make_schema(current_data, type_data)
308 differences = self.differ.compare(
309 dict(existing_schema), current_schema
310 ).render()
311 diff_count = self.differ.property.calc_diff()
312 if any(diff_count[key] > 0 for key in diff_count if key != "UNKNOWN"):
313 GLOBAL_STATS.add_updated(schema_path.name, differences)
315 with open(schema_path, "w", encoding="utf-8") as f:
316 json.dump(current_schema, f, indent=2, ensure_ascii=False)
317 self.logger.warning(f"Schema `{name}` reseted.\n\n{differences}")
318 elif self.update_mode or self.ci_cd_mode and not self.reset_mode:
319 merged_schema = merge_schemas(existing_schema, current_data, type_data)
321 differences = self.differ.compare(
322 dict(existing_schema), merged_schema
323 ).render()
324 diff_count = self.differ.property.calc_diff()
325 if any(
326 diff_count[key] > 0
327 for key in diff_count
328 if key not in ["UNKNOWN", "NO_DIFF"]
329 ):
330 GLOBAL_STATS.add_updated(schema_path.name, differences)
332 with open(schema_path, "w", encoding="utf-8") as f:
333 json.dump(merged_schema, f, indent=2, ensure_ascii=False)
335 self.logger.warning(f"Schema `{name}` updated.\n\n{differences}")
336 else: # both update_mode and reset_mode are True
337 raise ValueError(
338 "update_mode, ci_cd_mode and reset_mode"
339 " cannot be True at the same time."
340 )
341 schema_updated = True
342 elif data is not None:
343 merged_schema = merge_schemas(existing_schema, current_data, type_data)
345 differences = self.differ.compare(dict(existing_schema), merged_schema).render()
346 GLOBAL_STATS.add_uncommitted(schema_path.name, differences)
348 # только валидируем по старой схеме
349 try:
350 validate(
351 instance=data,
352 schema=existing_schema,
353 format_checker=FormatChecker(),
354 )
355 except ValidationError as e:
356 pytest.fail(
357 f"\n\n{differences}\n\nValidation error in `{name}`: {e.message}"
358 )
359 elif data is not None and type_data == "schema":
360 # схемы совпали – всё равно валидируем на случай формальных ошибок
361 try:
362 validate(
363 instance=data,
364 schema=existing_schema,
365 format_checker=FormatChecker(),
366 )
367 except ValidationError as e:
368 merged_schema = merge_schemas(existing_schema, current_data, type_data)
370 differences = self.differ.compare(dict(existing_schema), merged_schema).render()
371 pytest.fail(f"\n\n{differences}\n\nValidation error in `{name}`: {e.message}")
373 return name, schema_updated