Coverage for genschema / comparators / format.py: 90%

48 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-14 22:23 +0000

1import re 

2from collections import defaultdict 

3from functools import lru_cache 

4from typing import Any, Optional 

5 

6from .template import Comparator, ComparatorResult, ProcessingContext 

7 

8 

9class FormatDetector: 

10 """Глобальный детектор форматов. Расширяем — просто добавляем в _registry.""" 

11 

12 _registry = { 

13 "string": { 

14 re.compile(r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$"): "email", 

15 re.compile( 

16 r"^[0-9a-f]{8}-[0-9a-f]{4}-[1-5][0-9a-f]{3}-[89ab][0-9a-f]{3}-[0-9a-f]{12}$", 

17 re.I, 

18 ): "uuid", 

19 re.compile(r"^\d{4}-\d{2}-\d{2}$"): "date", 

20 re.compile( 

21 r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(\.\d+)?(Z|[+-]\d{2}:\d{2})?$" 

22 ): "date-time", 

23 re.compile(r"^https?://[^\s/$.?#].[^\s]*$", re.I): "uri", 

24 re.compile( 

25 r"^(?:(?:25[0-5]|2[0-4]\d|[01]?\d\d?)\.){3}" r"(?:25[0-5]|2[0-4]\d|[01]?\d\d?)$" 

26 ): "ipv4", 

27 } 

28 } 

29 

30 @classmethod 

31 @lru_cache(maxsize=512) 

32 def detect(cls, value: Any, type_hint: str = "string") -> Optional[str]: 

33 patterns = cls._registry.get(type_hint, {}) 

34 for pattern, name in patterns.items(): 

35 if pattern.fullmatch(str(value)): 

36 return name 

37 return None 

38 

39 

40class FormatComparator(Comparator): 

41 name = "format" 

42 

43 def can_process(self, ctx: ProcessingContext, env: str, prev_result: dict) -> bool: 

44 # Обрабатываем только если на текущем уровне уже есть type: "string" 

45 return prev_result.get("type") == "string" 

46 

47 def process(self, ctx: ProcessingContext, env: str, prev_result: dict) -> ComparatorResult: 

48 

49 # Базовые триггеры из предыдущих компараторов (обычно из TypeComparator) 

50 base_triggers = set(prev_result.get("j2sElementTrigger", [])) 

51 

52 # Собираем все возможные форматы и их источники 

53 format_to_ids: dict[str | None, set[str]] = defaultdict(set) 

54 format_to_ids[None].update(base_triggers) 

55 

56 # 1. Форматы, явно указанные в схемах 

57 for s in ctx.schemas: 

58 if isinstance(s.content, dict) and s.content.get("type") == "string": 

59 fmt = s.content.get("format") 

60 format_to_ids[fmt].add(s.id) 

61 if fmt is not None: 

62 format_to_ids[None].discard(s.id) 

63 

64 # 2. Форматы, выведенные из значений JSON 

65 for j in ctx.jsons: 

66 if isinstance(j.content, str): 

67 fmt = FormatDetector.detect(j.content) 

68 format_to_ids[fmt].add(j.id) 

69 if fmt is not None: 

70 format_to_ids[None].discard(j.id) 

71 

72 # Формируем варианты 

73 variants: list[dict] = [] 

74 for fmt, ids in format_to_ids.items(): 

75 if not ids: 

76 continue 

77 variant = {"type": "string", "j2sElementTrigger": sorted(ids)} 

78 if fmt is not None: 

79 variant["format"] = fmt 

80 variants.append(variant) 

81 

82 # Результат 

83 if len(variants) == 1: 

84 return variants[0], None 

85 if len(variants) > 1: 

86 return None, variants 

87 

88 # Если ничего нового не нашли — оставляем как есть 

89 return None, None