Coverage for pytest_jsonschema_snapshot / core.py: 42%
158 statements
« prev ^ index » next coverage.py v7.13.3, created at 2026-02-04 20:33 +0000
« prev ^ index » next coverage.py v7.13.3, created at 2026-02-04 20:33 +0000
1"""
2Core logic of the plugin.
3"""
5import json
6import logging
7from pathlib import Path
8from typing import TYPE_CHECKING, Any, Callable, Literal, Optional
10import pathvalidate
12if TYPE_CHECKING:
13 from jsonschema_diff import JsonSchemaDiff
15import pytest
16from genschema import Converter, PseudoArrayHandler
17from genschema.comparators import (
18 DeleteElement,
19 FormatComparator,
20 RequiredComparator,
21 SchemaVersionComparator,
22)
23from jsonschema import FormatChecker, ValidationError, validate
25from .stats import GLOBAL_STATS
26from .tools import NameMaker
29class SchemaShot:
30 def __init__(
31 self,
32 root_dir: Path,
33 differ: "JsonSchemaDiff",
34 callable_regex: str = "{class_method=.}",
35 format_mode: str = "on",
36 update_mode: bool = False,
37 reset_mode: bool = False,
38 update_actions: Optional[dict[str, bool]] = {},
39 save_original: bool = False,
40 debug_mode: bool = False,
41 snapshot_dir_name: str = "__snapshots__",
42 ):
43 """
44 Initializes SchemaShot.
46 Args:
47 root_dir: Project root directory
48 update_mode: Update mode (--schema-update)
49 snapshot_dir_name: Name of the directory for snapshots
50 """
51 self.root_dir: Path = root_dir
52 self.differ: "JsonSchemaDiff" = differ
53 self.callable_regex: str = callable_regex
54 self.format_mode: str = format_mode
55 # self.examples_limit: int = examples_limit
56 self.update_mode: bool = update_mode
57 self.reset_mode: bool = reset_mode
58 self.update_actions: dict[str, bool] = dict(update_actions or {})
59 self.save_original: bool = save_original
60 self.debug_mode: bool = debug_mode
61 self.snapshot_dir: Path = root_dir / snapshot_dir_name
62 self.used_schemas: set[str] = set()
64 self.conv = Converter(
65 pseudo_handler=PseudoArrayHandler(),
66 base_of="anyOf",
67 )
68 self.conv.register(FormatComparator())
69 self.conv.register(RequiredComparator())
70 # self.conv.register(EmptyComparator())
71 self.conv.register(SchemaVersionComparator())
72 self.conv.register(DeleteElement())
73 self.conv.register(DeleteElement("isPseudoArray"))
75 self.logger = logging.getLogger(__name__)
76 # добавляем вывод в stderr
77 handler = logging.StreamHandler()
78 handler.setFormatter(logging.Formatter("%(levelname)s %(name)s: %(message)s"))
79 self.logger.addHandler(handler)
80 # и поднимаем уровень, чтобы INFO/DEBUG прошли через handler
81 self.logger.setLevel(logging.INFO)
83 # Создаем директорию для снэпшотов, если её нет
84 if not self.snapshot_dir.exists():
85 self.snapshot_dir.mkdir(parents=True)
87 def _process_name(self, name: str | int | Callable | list[str | int | Callable]) -> str:
88 """
89 1. Converts callable to string
90 2. Checks for validity
92 Returns:
93 str
94 Raises:
95 ValueError
96 """
98 __tracebackhide__ = not self.debug_mode # прячем из стека pytest
100 def process_name_part(part: str | int | Callable) -> str:
101 if callable(part):
102 return NameMaker.format(part, self.callable_regex)
103 else:
104 return str(part)
106 if isinstance(name, (list, tuple)):
107 name = ".".join([process_name_part(part) for part in name])
108 else:
109 name = process_name_part(name)
111 if not isinstance(name, str) or not name:
112 raise ValueError("Schema name must be a non-empty string")
114 try:
115 # auto подберёт правила под текущую ОС
116 pathvalidate.validate_filename(
117 name, platform="auto"
118 ) # allow_reserved=False по умолчанию
119 except pathvalidate.ValidationError as e:
120 raise ValueError(f"Invalid schema name: {e}") from None
122 return name
124 def _save_process_original(self, real_name: str, status: Optional[bool], data: dict) -> None:
125 json_name = f"{real_name}.json"
126 json_path = self.snapshot_dir / json_name
128 if self.save_original:
129 available_to_create = not json_path.exists() or status is None
130 available_to_update = status is True
132 if (available_to_create and self.update_actions.get("add")) or (
133 available_to_update and self.update_actions.get("update")
134 ):
135 with open(json_path, "w", encoding="utf-8") as f:
136 json.dump(data, f, indent=2, ensure_ascii=False)
138 if available_to_create:
139 GLOBAL_STATS.add_created(json_name)
140 elif available_to_update:
141 GLOBAL_STATS.add_updated(json_name)
142 else:
143 raise ValueError(f"Unexpected status: {status}")
144 elif json_path.exists() and self.update_actions.get("delete"):
145 # удаляем
146 json_path.unlink()
147 GLOBAL_STATS.add_deleted(json_name)
149 def assert_json_match(
150 self,
151 data: dict,
152 name: str | int | Callable | list[str | int | Callable],
153 ) -> Optional[bool]:
154 """
155 Asserts for JSON, converts it to schema and then compares.
157 Returns:
158 True – the schema has been updated,
159 False – the schema has not changed,
160 None – a new schema has been created.
161 """
163 real_name = self._process_name(name)
165 real_name, status = self._base_match(data, data, "json", real_name)
167 if self.update_mode or self.reset_mode:
168 self._save_process_original(real_name=real_name, status=status, data=data)
170 return status
172 def assert_schema_match(
173 self,
174 schema: dict[str, Any],
175 name: str | int | Callable | list[str | int | Callable],
176 *,
177 data: Optional[dict] = None,
178 ) -> Optional[bool]:
179 """
180 Accepts a JSON-schema directly and compares it immediately.
182 Returns:
183 True – the schema has been updated,
184 False – the schema has not changed,
185 None – a new schema has been created.
186 """
188 real_name = self._process_name(name)
190 real_name, status = self._base_match(data, schema, "schema", real_name)
192 if self.update_mode and data is not None:
193 self._save_process_original(real_name=real_name, status=status, data=data)
195 return status
197 def _base_match(
198 self,
199 data: Optional[dict],
200 current_data: dict,
201 type_data: Literal["json", "schema"],
202 name: str,
203 ) -> tuple[str, Optional[bool]]:
204 """
205 Checks if data matches the JSON schema, creates/updates it if needed,
206 and writes statistics to GLOBAL_STATS.
208 Returns:
209 True – the schema has been updated,
210 False – the schema has not changed,
211 None – a new schema has been created.
212 """
213 __tracebackhide__ = not self.debug_mode # прячем из стека pytest
215 # Проверка имени
216 name = self._process_name(name)
218 schema_path = self.snapshot_dir / f"{name}.schema.json"
219 self.used_schemas.add(schema_path.name)
221 # --- состояние ДО проверки ---
222 schema_exists_before = schema_path.exists()
224 def make_schema(current_data: dict | list, type_data: Literal["json", "schema"]) -> dict:
225 if type_data == "schema":
226 return dict(current_data)
227 elif type_data == "json":
228 self.conv.clear_data()
229 self.conv.add_json(current_data)
230 return self.conv.run()
231 else:
232 raise ValueError("Not correct type argument")
234 # --- когда схемы ещё нет ---
235 if not schema_exists_before:
236 if not self.update_mode and not self.reset_mode:
237 raise pytest.fail.Exception(
238 f"Schema `{name}` not found."
239 "Run the test with the --schema-update option to create it."
240 )
241 elif not self.update_actions.get("add"):
242 raise pytest.fail.Exception(
243 f"Schema `{name}` not found and adding new schemas is disabled."
244 )
246 current_schema = make_schema(current_data, type_data)
248 with open(schema_path, "w", encoding="utf-8") as f:
249 json.dump(current_schema, f, indent=2, ensure_ascii=False)
251 self.logger.info(f"New schema `{name}` has been created.")
252 GLOBAL_STATS.add_created(schema_path.name) # статистика «создана»
253 return name, None
254 else:
255 with open(schema_path, "r", encoding="utf-8") as f:
256 existing_schema = json.load(f)
258 # --- схема уже была: сравнение и валидация --------------------------------
259 schema_updated = False
261 def merge_schemas(
262 old: dict, new: dict | list, type_data: Literal["json", "schema"]
263 ) -> dict:
264 self.conv.clear_data()
265 self.conv.add_schema(old)
266 if type_data == "schema":
267 self.conv.add_schema(dict(new))
268 elif type_data == "json":
269 self.conv.add_json(new)
270 else:
271 raise ValueError("Not correct type argument")
272 result = self.conv.run()
273 return result
275 if (
276 type_data == "json" or existing_schema != current_data
277 ): # есть отличия или могут быть
278 if (self.update_mode or self.reset_mode) and self.update_actions.get("update"):
279 # обновляем файл
280 if self.reset_mode and not self.update_mode:
281 current_schema = make_schema(current_data, type_data)
283 differences = self.differ.compare(
284 dict(existing_schema), current_schema
285 ).render()
286 diff_count = self.differ.property.calc_diff()
287 if any(diff_count[key] > 0 for key in diff_count if key != "UNKNOWN"):
288 GLOBAL_STATS.add_updated(schema_path.name, differences)
290 with open(schema_path, "w", encoding="utf-8") as f:
291 json.dump(current_schema, f, indent=2, ensure_ascii=False)
292 self.logger.warning(f"Schema `{name}` reseted.\n\n{differences}")
293 elif self.update_mode and not self.reset_mode:
294 merged_schema = merge_schemas(existing_schema, current_data, type_data)
296 differences = self.differ.compare(
297 dict(existing_schema), merged_schema
298 ).render()
299 diff_count = self.differ.property.calc_diff()
300 if any(diff_count[key] > 0 for key in diff_count if key != "UNKNOWN"):
301 GLOBAL_STATS.add_updated(schema_path.name, differences)
303 with open(schema_path, "w", encoding="utf-8") as f:
304 json.dump(merged_schema, f, indent=2, ensure_ascii=False)
306 self.logger.warning(f"Schema `{name}` updated.\n\n{differences}")
307 else: # both update_mode and reset_mode are True
308 raise ValueError(
309 "Both update_mode and reset_mode cannot be True at the same time."
310 )
311 schema_updated = True
312 elif data is not None:
313 merged_schema = merge_schemas(existing_schema, current_data, type_data)
315 differences = self.differ.compare(dict(existing_schema), merged_schema).render()
316 GLOBAL_STATS.add_uncommitted(schema_path.name, differences)
318 # только валидируем по старой схеме
319 try:
320 validate(
321 instance=data,
322 schema=existing_schema,
323 format_checker=FormatChecker(),
324 )
325 except ValidationError as e:
326 pytest.fail(
327 f"\n\n{differences}\n\nValidation error in `{name}`: {e.message}"
328 )
329 elif data is not None and type_data == "schema":
330 # схемы совпали – всё равно валидируем на случай формальных ошибок
331 try:
332 validate(
333 instance=data,
334 schema=existing_schema,
335 format_checker=FormatChecker(),
336 )
337 except ValidationError as e:
338 merged_schema = merge_schemas(existing_schema, current_data, type_data)
340 differences = self.differ.compare(dict(existing_schema), merged_schema).render()
341 pytest.fail(f"\n\n{differences}\n\nValidation error in `{name}`: {e.message}")
343 return name, schema_updated