Coverage for genschema / comparators / required.py: 97%

30 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-14 22:23 +0000

1import logging 

2 

3from .template import Comparator, ComparatorResult, ProcessingContext 

4 

5logger = logging.getLogger(__name__) 

6 

7 

8class RequiredComparator(Comparator): 

9 """ 

10 Компаратор для определения обязательных полей. 

11 Устанавливает "required" на основе наличия ключей в JSON на текущем уровне. 

12 """ 

13 

14 def can_process(self, ctx: ProcessingContext, env: str, node: dict) -> bool: 

15 # обрабатываем только объекты 

16 return ( 

17 (node.get("type") == "object" and not node.get("isPseudoArray", False)) 

18 or node.get("type") is None 

19 or not ctx.jsons 

20 ) 

21 

22 def process(self, ctx: ProcessingContext, env: str, node: dict) -> ComparatorResult: 

23 required_sets: list[set[str]] = [] 

24 

25 # Если есть хотя бы один JSON, который не является объектом, 

26 # мы не можем корректно определить обязательные ключи. 

27 if ctx.jsons and any(not isinstance(j.content, dict) for j in ctx.jsons): 

28 return None, None 

29 

30 # ---------- из json ---------- 

31 objects = [j.content for j in ctx.jsons if isinstance(j.content, dict)] 

32 if objects: 

33 keys: set[str] = set() 

34 for obj in objects: 

35 keys.update(obj.keys()) 

36 

37 required_from_json = {k for k in keys if all(k in obj for obj in objects)} 

38 required_sets.append(required_from_json) 

39 

40 # ---------- из схем ---------- 

41 for schema in ctx.schemas: 

42 content = schema.content 

43 if not isinstance(content, dict): 

44 continue 

45 req = content.get("required") 

46 if isinstance(req, list): 

47 required_sets.append(set(req)) 

48 

49 if not required_sets: 

50 return None, None 

51 

52 # ---------- минимальное пересечение ---------- 

53 required = sorted(set.intersection(*required_sets)) 

54 

55 if required: 

56 return {"required": required}, None 

57 return None, None