Source code for pytest_jsonschema_snapshot.core

"""
Core logic of the plugin.
"""

import json
import logging
from pathlib import Path
from typing import TYPE_CHECKING, Any, Callable, Literal, Optional

import pathvalidate

if TYPE_CHECKING:
    from jsonschema_diff import JsonSchemaDiff

import pytest
from genschema import Converter, PseudoArrayHandler
from genschema.comparators import (
    DeleteElement,
    FormatComparator,
    RequiredComparator,
    SchemaVersionComparator,
)
from jsonschema import FormatChecker, ValidationError, validate

from .stats import GLOBAL_STATS
from .tools import NameMaker


[docs] class SchemaShot: def __init__( self, root_dir: Path, differ: "JsonSchemaDiff", callable_regex: str = "{class_method=.}", format_mode: str = "on", update_mode: bool = False, reset_mode: bool = False, update_actions: Optional[dict[str, bool]] = {}, save_original: bool = False, debug_mode: bool = False, snapshot_dir_name: str = "__snapshots__", ): """ Initializes SchemaShot. Args: root_dir: Project root directory update_mode: Update mode (--schema-update) snapshot_dir_name: Name of the directory for snapshots """
[docs] self.root_dir: Path = root_dir
[docs] self.differ: "JsonSchemaDiff" = differ
[docs] self.callable_regex: str = callable_regex
[docs] self.format_mode: str = format_mode
# self.examples_limit: int = examples_limit
[docs] self.update_mode: bool = update_mode
[docs] self.reset_mode: bool = reset_mode
[docs] self.update_actions: dict[str, bool] = dict(update_actions or {})
[docs] self.save_original: bool = save_original
[docs] self.debug_mode: bool = debug_mode
[docs] self.snapshot_dir: Path = root_dir / snapshot_dir_name
[docs] self.used_schemas: set[str] = set()
[docs] self.conv = Converter( pseudo_handler=PseudoArrayHandler(), base_of="anyOf", )
self.conv.register(FormatComparator()) self.conv.register(RequiredComparator()) # self.conv.register(EmptyComparator()) self.conv.register(SchemaVersionComparator()) self.conv.register(DeleteElement()) self.conv.register(DeleteElement("isPseudoArray"))
[docs] self.logger = logging.getLogger(__name__)
# добавляем вывод в stderr handler = logging.StreamHandler() handler.setFormatter(logging.Formatter("%(levelname)s %(name)s: %(message)s")) self.logger.addHandler(handler) # и поднимаем уровень, чтобы INFO/DEBUG прошли через handler self.logger.setLevel(logging.INFO) # Создаем директорию для снэпшотов, если её нет if not self.snapshot_dir.exists(): self.snapshot_dir.mkdir(parents=True) def _process_name(self, name: str | int | Callable | list[str | int | Callable]) -> str: """ 1. Converts callable to string 2. Checks for validity Returns: str Raises: ValueError """ __tracebackhide__ = not self.debug_mode # прячем из стека pytest def process_name_part(part: str | int | Callable) -> str: if callable(part): return NameMaker.format(part, self.callable_regex) else: return str(part) if isinstance(name, (list, tuple)): name = ".".join([process_name_part(part) for part in name]) else: name = process_name_part(name) if not isinstance(name, str) or not name: raise ValueError("Schema name must be a non-empty string") try: # auto подберёт правила под текущую ОС pathvalidate.validate_filename( name, platform="auto" ) # allow_reserved=False по умолчанию except pathvalidate.ValidationError as e: raise ValueError(f"Invalid schema name: {e}") from None return name def _save_process_original(self, real_name: str, status: Optional[bool], data: dict) -> None: json_name = f"{real_name}.json" json_path = self.snapshot_dir / json_name if self.save_original: available_to_create = not json_path.exists() or status is None available_to_update = status is True if (available_to_create and self.update_actions.get("add")) or ( available_to_update and self.update_actions.get("update") ): with open(json_path, "w", encoding="utf-8") as f: json.dump(data, f, indent=2, ensure_ascii=False) if available_to_create: GLOBAL_STATS.add_created(json_name) elif available_to_update: GLOBAL_STATS.add_updated(json_name) else: raise ValueError(f"Unexpected status: {status}") elif json_path.exists() and self.update_actions.get("delete"): # удаляем json_path.unlink() GLOBAL_STATS.add_deleted(json_name)
[docs] def assert_json_match( self, data: dict, name: str | int | Callable | list[str | int | Callable], ) -> Optional[bool]: """ Asserts for JSON, converts it to schema and then compares. Returns: True – the schema has been updated, False – the schema has not changed, None – a new schema has been created. """ real_name = self._process_name(name) real_name, status = self._base_match(data, data, "json", real_name) if self.update_mode or self.reset_mode: self._save_process_original(real_name=real_name, status=status, data=data) return status
[docs] def assert_schema_match( self, schema: dict[str, Any], name: str | int | Callable | list[str | int | Callable], *, data: Optional[dict] = None, ) -> Optional[bool]: """ Accepts a JSON-schema directly and compares it immediately. Returns: True – the schema has been updated, False – the schema has not changed, None – a new schema has been created. """ real_name = self._process_name(name) real_name, status = self._base_match(data, schema, "schema", real_name) if self.update_mode and data is not None: self._save_process_original(real_name=real_name, status=status, data=data) return status
def _base_match( self, data: Optional[dict], current_data: dict, type_data: Literal["json", "schema"], name: str, ) -> tuple[str, Optional[bool]]: """ Checks if data matches the JSON schema, creates/updates it if needed, and writes statistics to GLOBAL_STATS. Returns: True – the schema has been updated, False – the schema has not changed, None – a new schema has been created. """ __tracebackhide__ = not self.debug_mode # прячем из стека pytest # Проверка имени name = self._process_name(name) schema_path = self.snapshot_dir / f"{name}.schema.json" self.used_schemas.add(schema_path.name) # --- состояние ДО проверки --- schema_exists_before = schema_path.exists() def make_schema(current_data: dict | list, type_data: Literal["json", "schema"]) -> dict: if type_data == "schema": return dict(current_data) elif type_data == "json": self.conv.clear_data() self.conv.add_json(current_data) return self.conv.run() else: raise ValueError("Not correct type argument") # --- когда схемы ещё нет --- if not schema_exists_before: if not self.update_mode and not self.reset_mode: raise pytest.fail.Exception( f"Schema `{name}` not found." "Run the test with the --schema-update option to create it." ) elif not self.update_actions.get("add"): raise pytest.fail.Exception( f"Schema `{name}` not found and adding new schemas is disabled." ) current_schema = make_schema(current_data, type_data) with open(schema_path, "w", encoding="utf-8") as f: json.dump(current_schema, f, indent=2, ensure_ascii=False) self.logger.info(f"New schema `{name}` has been created.") GLOBAL_STATS.add_created(schema_path.name) # статистика «создана» return name, None else: with open(schema_path, "r", encoding="utf-8") as f: existing_schema = json.load(f) # --- схема уже была: сравнение и валидация -------------------------------- schema_updated = False def merge_schemas( old: dict, new: dict | list, type_data: Literal["json", "schema"] ) -> dict: self.conv.clear_data() self.conv.add_schema(old) if type_data == "schema": self.conv.add_schema(dict(new)) elif type_data == "json": self.conv.add_json(new) else: raise ValueError("Not correct type argument") result = self.conv.run() return result if ( type_data == "json" or existing_schema != current_data ): # есть отличия или могут быть if (self.update_mode or self.reset_mode) and self.update_actions.get("update"): # обновляем файл if self.reset_mode and not self.update_mode: current_schema = make_schema(current_data, type_data) differences = self.differ.compare( dict(existing_schema), current_schema ).render() diff_count = self.differ.property.calc_diff() if any(diff_count[key] > 0 for key in diff_count if key != "UNKNOWN"): GLOBAL_STATS.add_updated(schema_path.name, differences) with open(schema_path, "w", encoding="utf-8") as f: json.dump(current_schema, f, indent=2, ensure_ascii=False) self.logger.warning(f"Schema `{name}` reseted.\n\n{differences}") elif self.update_mode and not self.reset_mode: merged_schema = merge_schemas(existing_schema, current_data, type_data) differences = self.differ.compare( dict(existing_schema), merged_schema ).render() diff_count = self.differ.property.calc_diff() if any(diff_count[key] > 0 for key in diff_count if key != "UNKNOWN"): GLOBAL_STATS.add_updated(schema_path.name, differences) with open(schema_path, "w", encoding="utf-8") as f: json.dump(merged_schema, f, indent=2, ensure_ascii=False) self.logger.warning(f"Schema `{name}` updated.\n\n{differences}") else: # both update_mode and reset_mode are True raise ValueError( "Both update_mode and reset_mode cannot be True at the same time." ) schema_updated = True elif data is not None: merged_schema = merge_schemas(existing_schema, current_data, type_data) differences = self.differ.compare(dict(existing_schema), merged_schema).render() GLOBAL_STATS.add_uncommitted(schema_path.name, differences) # только валидируем по старой схеме try: validate( instance=data, schema=existing_schema, format_checker=FormatChecker(), ) except ValidationError as e: pytest.fail( f"\n\n{differences}\n\nValidation error in `{name}`: {e.message}" ) elif data is not None and type_data == "schema": # схемы совпали – всё равно валидируем на случай формальных ошибок try: validate( instance=data, schema=existing_schema, format_checker=FormatChecker(), ) except ValidationError as e: merged_schema = merge_schemas(existing_schema, current_data, type_data) differences = self.differ.compare(dict(existing_schema), merged_schema).render() pytest.fail(f"\n\n{differences}\n\nValidation error in `{name}`: {e.message}") return name, schema_updated