Coverage for genschema / comparators / required.py: 97%

30 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-25 09:44 +0000

1import logging 

2 

3from .template import Comparator, ComparatorResult, ProcessingContext 

4 

5logger = logging.getLogger(__name__) 

6 

7 

8class RequiredComparator(Comparator): 

9 """ 

10 Компаратор для определения обязательных полей. 

11 Устанавливает "required" на основе наличия ключей в JSON на текущем уровне. 

12 """ 

13 

14 def can_process(self, ctx: ProcessingContext, env: str, node: dict) -> bool: 

15 # обрабатываем только объекты 

16 return node.get("type") == "object" and not node.get("isPseudoArray", False) 

17 

18 def process(self, ctx: ProcessingContext, env: str, node: dict) -> ComparatorResult: 

19 required_sets: list[set[str]] = [] 

20 

21 # Если есть хотя бы один JSON, который не является объектом, 

22 # мы не можем корректно определить обязательные ключи. 

23 if ctx.jsons and any(not isinstance(j.content, dict) for j in ctx.jsons): 

24 return None, None 

25 

26 # ---------- из json ---------- 

27 objects = [j.content for j in ctx.jsons if isinstance(j.content, dict)] 

28 if objects: 

29 keys: set[str] = set() 

30 for obj in objects: 

31 keys.update(obj.keys()) 

32 

33 required_from_json = {k for k in keys if all(k in obj for obj in objects)} 

34 required_sets.append(required_from_json) 

35 

36 # ---------- из схем ---------- 

37 for schema in ctx.schemas: 

38 content = schema.content 

39 if not isinstance(content, dict): 

40 continue 

41 req = content.get("required") 

42 if isinstance(req, list): 

43 required_sets.append(set(req)) 

44 

45 if not required_sets: 

46 return None, None 

47 

48 # ---------- минимальное пересечение ---------- 

49 required = sorted(set.intersection(*required_sets)) 

50 

51 if required: 

52 return {"required": required}, None 

53 return None, None