Coverage for pytest_jsonschema_snapshot / core.py: 46%
200 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-29 08:31 +0000
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-29 08:31 +0000
1"""
2Core logic of the plugin.
3"""
5import json
6import logging
7import shutil
8from pathlib import Path
9from typing import TYPE_CHECKING, Any, Callable, Literal, Optional
11import pathvalidate
13if TYPE_CHECKING:
14 from jsonschema_diff import JsonSchemaDiff
16import pytest
17from genschema import Converter, PseudoArrayHandler
18from genschema.comparators import (
19 DeleteElement,
20 EmptyComparator,
21 EnumComparator,
22 FormatComparator,
23 RequiredComparator,
24 SchemaVersionComparator,
25)
26from genschema.postprocessing import (
27 SchemaReferenceExtractionConfig,
28 SchemaReferencePostprocessor,
29)
30from jsonschema import FormatChecker, ValidationError, validate
32from .stats import GLOBAL_STATS
33from .tools import NameMaker
36class SchemaShot:
37 def __init__(
38 self,
39 root_dir: Path,
40 differ: "JsonSchemaDiff",
41 callable_regex: str = "{class_method=.}",
42 format_mode: str = "on",
43 update_mode: bool = False,
44 ci_cd_mode: bool = False,
45 reset_mode: bool = False,
46 update_actions: Optional[dict[str, bool]] = {},
47 save_original: bool = False,
48 debug_mode: bool = False,
49 snapshot_dir_name: str = "__snapshots__",
50 ):
51 """
52 Initializes SchemaShot.
54 Args:
55 root_dir: Project root directory
56 update_mode: Update mode (--schema-update)
57 snapshot_dir_name: Name of the directory for snapshots
58 """
59 self.root_dir: Path = root_dir
60 self.differ: "JsonSchemaDiff" = differ
61 self.callable_regex: str = callable_regex
62 self.format_mode: str = format_mode.lower()
63 self.ci_cd_mode: bool = ci_cd_mode
64 # self.examples_limit: int = examples_limit
65 self.update_mode: bool = update_mode
66 self.reset_mode: bool = reset_mode
67 self.update_actions: dict[str, bool] = dict(update_actions or {})
68 self.save_original: bool = save_original
69 self.debug_mode: bool = debug_mode
70 self.snapshot_dir: Path = root_dir / snapshot_dir_name
71 self.used_schemas: set[str] = set()
73 if self.format_mode not in {"on", "safe", "off"}:
74 raise ValueError(
75 "Invalid jsss_format_mode value. Expected one of: 'on', 'safe', 'off'."
76 )
78 self.conv = Converter(
79 pseudo_handler=PseudoArrayHandler(),
80 base_of="anyOf",
81 )
82 if self._is_format_annotation_enabled():
83 self.conv.register(FormatComparator())
84 self.conv.register(EnumComparator())
85 self.conv.register(RequiredComparator())
86 # self.conv.register(EmptyComparator())
87 self.conv.register(SchemaVersionComparator())
88 self.conv.register(DeleteElement())
89 self.conv.register(DeleteElement("isPseudoArray"))
90 self.reference_extraction_config = SchemaReferenceExtractionConfig(
91 merge_base_of="anyOf",
92 merge_pseudo_handler=PseudoArrayHandler(),
93 merge_comparator_factories=self._make_reference_extraction_comparator_factories(),
94 )
96 self.logger = logging.getLogger(__name__)
97 # добавляем вывод в stderr
98 handler = logging.StreamHandler()
99 handler.setFormatter(logging.Formatter("%(levelname)s %(name)s: %(message)s"))
100 self.logger.addHandler(handler)
101 # и поднимаем уровень, чтобы INFO/DEBUG прошли через handler
102 self.logger.setLevel(logging.INFO)
104 # ci.cd is only needed in dedicated CI/CD mode. Touching it in every worker
105 # causes avoidable races under pytest-xdist.
106 self.snapshot_dir.mkdir(parents=True, exist_ok=True)
107 if self.ci_cd_mode:
108 cicd = self.snapshot_dir / "ci.cd"
109 shutil.rmtree(cicd, ignore_errors=True)
110 cicd.mkdir(parents=True, exist_ok=True)
112 def _is_format_annotation_enabled(self) -> bool:
113 return self.format_mode in {"on", "safe"}
115 def _is_format_validation_enabled(self) -> bool:
116 return self.format_mode == "on"
118 def _make_reference_extraction_comparator_factories(self) -> tuple[Callable[[], Any], ...]:
119 factories: list[Callable[[], Any]] = []
120 if self._is_format_annotation_enabled():
121 factories.append(FormatComparator)
122 factories.extend(
123 (
124 EnumComparator,
125 RequiredComparator,
126 EmptyComparator,
127 DeleteElement,
128 lambda: DeleteElement("isPseudoArray"),
129 )
130 )
131 return tuple(factories)
133 def _finalize_generated_schema(self, schema: dict[str, Any]) -> dict[str, Any]:
134 return SchemaReferencePostprocessor.process(schema, self.reference_extraction_config)
136 def _validate_instance(self, instance: Any, schema: dict[str, Any]) -> None:
137 validate_kwargs: dict[str, Any] = {}
138 if self._is_format_validation_enabled():
139 validate_kwargs["format_checker"] = FormatChecker()
140 validate(instance=instance, schema=schema, **validate_kwargs)
142 def _process_name(self, name: str | int | Callable | list[str | int | Callable]) -> str:
143 """
144 1. Converts callable to string
145 2. Checks for validity
147 Returns:
148 str
149 Raises:
150 ValueError
151 """
153 __tracebackhide__ = not self.debug_mode # прячем из стека pytest
155 def process_name_part(part: str | int | Callable) -> str:
156 if callable(part):
157 return NameMaker.format(part, self.callable_regex)
158 else:
159 return str(part)
161 if isinstance(name, (list, tuple)):
162 name = ".".join([process_name_part(part) for part in name])
163 else:
164 name = process_name_part(name)
166 if not isinstance(name, str) or not name:
167 raise ValueError("Schema name must be a non-empty string")
169 try:
170 # auto подберёт правила под текущую ОС
171 pathvalidate.validate_filename(
172 name, platform="auto"
173 ) # allow_reserved=False по умолчанию
174 except pathvalidate.ValidationError as e:
175 raise ValueError(f"Invalid schema name: {e}") from None
177 return name
179 def _save_process_original(self, real_name: str, status: Optional[bool], data: dict) -> None:
180 json_name = f"{real_name}.json"
181 schema_name = f"{real_name}.schema.json"
182 base_j_path = self.snapshot_dir / json_name
183 base_s_path = self.snapshot_dir / json_name
184 if not self.ci_cd_mode:
185 json_path = base_j_path
186 schema_path = base_s_path
187 else:
188 json_path = self.snapshot_dir / "ci.cd" / json_name
189 schema_path = self.snapshot_dir / "ci.cd" / schema_name
191 if self.save_original:
192 available_to_create = (
193 (not json_path.exists() or status is None) and not self.ci_cd_mode
194 ) or (schema_path.exists() and not base_s_path.exists() and self.ci_cd_mode)
195 available_to_update = (status is True and not self.ci_cd_mode) or (
196 schema_path.exists() and base_s_path.exists() and self.ci_cd_mode
197 )
199 if (available_to_create and self.update_actions.get("add")) or (
200 available_to_update and self.update_actions.get("update")
201 ):
202 with open(json_path, "w", encoding="utf-8") as f:
203 json.dump(data, f, indent=2, ensure_ascii=False)
205 if available_to_create:
206 GLOBAL_STATS.add_created(json_name)
207 elif available_to_update:
208 GLOBAL_STATS.add_updated(json_name)
209 else:
210 raise ValueError(f"Unexpected status: {status}")
211 elif not self.ci_cd_mode and json_path.exists() and self.update_actions.get("delete"):
212 # удаляем
213 json_path.unlink()
214 GLOBAL_STATS.add_deleted(json_name)
216 def assert_json_match(
217 self,
218 data: dict,
219 name: str | int | Callable | list[str | int | Callable],
220 ) -> Optional[bool]:
221 """
222 Asserts for JSON, converts it to schema and then compares.
224 Returns:
225 True – the schema has been updated,
226 False – the schema has not changed,
227 None – a new schema has been created.
228 """
230 real_name = self._process_name(name)
232 real_name, status = self._base_match(data, data, "json", real_name)
234 if self.update_mode or self.reset_mode or self.ci_cd_mode:
235 self._save_process_original(real_name=real_name, status=status, data=data)
237 return status
239 def assert_schema_match(
240 self,
241 schema: dict[str, Any],
242 name: str | int | Callable | list[str | int | Callable],
243 *,
244 data: Optional[dict] = None,
245 ) -> Optional[bool]:
246 """
247 Accepts a JSON-schema directly and compares it immediately.
249 Returns:
250 True – the schema has been updated,
251 False – the schema has not changed,
252 None – a new schema has been created.
253 """
255 real_name = self._process_name(name)
257 real_name, status = self._base_match(data, schema, "schema", real_name)
259 if self.update_mode and data is not None:
260 self._save_process_original(real_name=real_name, status=status, data=data)
262 return status
264 def _base_match(
265 self,
266 data: Optional[dict],
267 current_data: dict,
268 type_data: Literal["json", "schema"],
269 name: str,
270 ) -> tuple[str, Optional[bool]]:
271 """
272 Checks if data matches the JSON schema, creates/updates it if needed,
273 and writes statistics to GLOBAL_STATS.
275 Returns:
276 True – the schema has been updated,
277 False – the schema has not changed,
278 None – a new schema has been created.
279 """
280 __tracebackhide__ = not self.debug_mode # прячем из стека pytest
282 # Проверка имени
283 name = self._process_name(name)
285 base_path = self.snapshot_dir / f"{name}.schema.json"
286 if not self.ci_cd_mode:
287 schema_path = base_path
288 else:
289 schema_path = self.snapshot_dir / "ci.cd" / f"{name}.schema.json"
290 self.used_schemas.add(schema_path.name)
292 # --- состояние ДО проверки ---
293 schema_exists_before = base_path.exists()
295 def make_schema(current_data: dict | list, type_data: Literal["json", "schema"]) -> dict:
296 if type_data == "schema":
297 return dict(current_data)
298 elif type_data == "json":
299 self.conv.clear_data()
300 self.conv.add_json(current_data)
301 return self._finalize_generated_schema(self.conv.run())
302 else:
303 raise ValueError("Not correct type argument")
305 # --- когда схемы ещё нет ---
306 if not schema_exists_before:
307 if not self.update_mode and not self.reset_mode and not self.ci_cd_mode:
308 raise pytest.fail.Exception(
309 f"Schema `{name}` not found."
310 "Run the test with the --schema-update option to create it."
311 )
312 elif not self.update_actions.get("add"):
313 raise pytest.fail.Exception(
314 f"Schema `{name}` not found and adding new schemas is disabled."
315 )
317 current_schema = make_schema(current_data, type_data)
319 with open(schema_path, "w", encoding="utf-8") as f:
320 json.dump(current_schema, f, indent=2, ensure_ascii=False)
322 self.logger.info(f"New schema `{name}` has been created.")
323 GLOBAL_STATS.add_created(schema_path.name) # статистика «создана»
324 return name, None
325 else:
326 with open(base_path, "r", encoding="utf-8") as f:
327 existing_schema = json.load(f)
329 # --- схема уже была: сравнение и валидация --------------------------------
330 schema_updated = False
332 def merge_schemas(
333 old: dict, new: dict | list, type_data: Literal["json", "schema"]
334 ) -> dict:
335 self.conv.clear_data()
336 self.conv.add_schema(old)
337 if type_data == "schema":
338 self.conv.add_schema(dict(new))
339 elif type_data == "json":
340 self.conv.add_json(new)
341 else:
342 raise ValueError("Not correct type argument")
343 result = self.conv.run()
344 if type_data == "json":
345 result = self._finalize_generated_schema(result)
346 return result
348 if (
349 type_data == "json" or existing_schema != current_data
350 ): # есть отличия или могут быть
351 if (
352 self.update_mode or self.ci_cd_mode or self.reset_mode
353 ) and self.update_actions.get("update"):
354 # обновляем файл
355 if self.reset_mode and not self.update_mode and not self.ci_cd_mode:
356 current_schema = make_schema(current_data, type_data)
358 if existing_schema != current_schema:
359 differences = self.differ.compare(
360 dict(existing_schema), current_schema
361 ).render()
362 GLOBAL_STATS.add_updated(schema_path.name, differences)
364 with open(schema_path, "w", encoding="utf-8") as f:
365 json.dump(current_schema, f, indent=2, ensure_ascii=False)
366 self.logger.warning(f"Schema `{name}` reseted.\n\n{differences}")
367 elif self.update_mode or self.ci_cd_mode and not self.reset_mode:
368 merged_schema = merge_schemas(existing_schema, current_data, type_data)
370 if existing_schema != merged_schema:
371 differences = self.differ.compare(
372 dict(existing_schema), merged_schema
373 ).render()
374 GLOBAL_STATS.add_updated(schema_path.name, differences)
376 with open(schema_path, "w", encoding="utf-8") as f:
377 json.dump(merged_schema, f, indent=2, ensure_ascii=False)
379 self.logger.warning(f"Schema `{name}` updated.\n\n{differences}")
380 else: # both update_mode and reset_mode are True
381 raise ValueError(
382 "update_mode, ci_cd_mode and reset_mode"
383 " cannot be True at the same time."
384 )
385 schema_updated = True
386 elif data is not None:
387 merged_schema = merge_schemas(existing_schema, current_data, type_data)
389 differences = ""
390 if existing_schema != merged_schema:
391 differences = self.differ.compare(
392 dict(existing_schema), merged_schema
393 ).render()
394 GLOBAL_STATS.add_uncommitted(schema_path.name, differences)
396 # только валидируем по старой схеме
397 try:
398 self._validate_instance(instance=data, schema=existing_schema)
399 except ValidationError as e:
400 pytest.fail(
401 f"\n\n{differences}\n\nValidation error in `{name}`: {e.message}"
402 )
403 elif data is not None and type_data == "schema":
404 # схемы совпали – всё равно валидируем на случай формальных ошибок
405 try:
406 self._validate_instance(instance=data, schema=existing_schema)
407 except ValidationError as e:
408 merged_schema = merge_schemas(existing_schema, current_data, type_data)
410 differences = ""
411 if existing_schema != merged_schema:
412 differences = self.differ.compare(
413 dict(existing_schema), merged_schema
414 ).render()
415 pytest.fail(f"\n\n{differences}\n\nValidation error in `{name}`: {e.message}")
417 return name, schema_updated