Coverage for genschema / pipeline.py: 86%

207 statements  

« prev     ^ index     » next       coverage.py v7.13.4, created at 2026-03-14 22:23 +0000

1import json 

2import logging 

3import re 

4from typing import Literal, Optional 

5 

6from .comparators import TypeComparator 

7from .comparators.template import Comparator, ProcessingContext, Resource, ToDelete 

8from .pseudo_arrays import PseudoArrayHandlerBase 

9 

10logging.basicConfig(level=logging.ERROR) 

11logger = logging.getLogger(__name__) 

12 

13 

14class Converter: 

15 def __init__( 

16 self, 

17 pseudo_handler: Optional[PseudoArrayHandlerBase] = None, 

18 base_of: Literal["anyOf", "oneOf", "allOf"] = "anyOf", 

19 core_comparator: Optional[TypeComparator] = None, 

20 ): 

21 """ 

22 Конвертер JSON + JSON Schema структур в JSON Schema. 

23 

24 :param pseudo_handler: Обработчик псевдомассивов 

25 (большие словари с одинаковым паттерном значений, а ключами являются индефикаторы). 

26 :type pseudo_handler: Optional[PseudoArrayHandlerBase] 

27 

28 :param base_of: Базовый оператор объединения схем. 

29 Логики определения конкретного типа Of индивидуально не предусмотрено. 

30 :type base_of: Literal["anyOf", "oneOf", "allOf"] 

31 

32 :param core_comparator: Базовый компаратор типов. 

33 Он вынесен отдельно, 

34 так как type - единственное поле без которого Converter не может построить структуру. 

35 :type core_comparator: TypeComparator 

36 """ 

37 self._schemas: list[Resource] = [] 

38 self._jsons: list[Resource] = [] 

39 self._comparators: list[Comparator] = [] 

40 self._core_comparator = core_comparator or TypeComparator() 

41 self._id = 0 

42 self._pseudo_handler = pseudo_handler 

43 self._base_of = base_of 

44 

45 def add_schema(self, s: dict | str) -> None: 

46 if isinstance(s, str): 

47 with open(s, "r") as f: 

48 s = json.loads(f.read()) 

49 

50 self._schemas.append(Resource(str(self._id), "schema", s)) 

51 self._id += 1 

52 

53 def add_json(self, j: dict | list | str) -> None: 

54 if isinstance(j, str): 

55 with open(j, "r") as f: 

56 j = json.loads(f.read()) 

57 

58 self._jsons.append(Resource(str(self._id), "json", j)) 

59 self._id += 1 

60 

61 def clear_data(self) -> None: 

62 self._id = 0 

63 self._jsons = [] 

64 self._schemas = [] 

65 

66 def register(self, c: Comparator) -> None: 

67 if isinstance(c, TypeComparator): 

68 raise UserWarning( 

69 "A TypeComparator-like comparator must be provided during initialization " 

70 "using the core_comparator attribute." 

71 ) 

72 self._comparators.append(c) 

73 

74 # ---------------- utils ---------------- 

75 

76 def _collect_prop_names(self, schemas: list[Resource], jsons: list[Resource]) -> list[str]: 

77 names = set() 

78 for s in schemas: 

79 c = s.content 

80 if isinstance(c, dict) and isinstance(c.get("properties"), dict): 

81 names.update(c["properties"].keys()) 

82 for j in jsons: 

83 if isinstance(j.content, dict): 

84 names.update(j.content.keys()) 

85 return sorted(names) 

86 

87 def _gather_property_candidates( 

88 self, schemas: list[Resource], jsons: list[Resource], prop: str 

89 ) -> tuple[list[Resource], list[Resource]]: 

90 s_out, j_out = [], [] 

91 

92 for s in schemas: 

93 c = s.content 

94 if isinstance(c, dict) and prop in c.get("properties", {}): 

95 s_out.append(Resource(f"{s.id}/{prop}", "schema", c["properties"][prop])) 

96 

97 for j in jsons: 

98 if isinstance(j.content, dict) and prop in j.content: 

99 j_out.append(Resource(f"{j.id}/{prop}", "json", j.content[prop])) 

100 

101 return s_out, j_out 

102 

103 def _keys_matched_by_pattern(self, pattern: str, keys: list[str]) -> set[str]: 

104 try: 

105 regex = re.compile(pattern) 

106 except re.error: 

107 return set() 

108 return {key for key in keys if regex.fullmatch(key)} 

109 

110 def _split_array_ctx( 

111 self, ctx: ProcessingContext 

112 ) -> tuple[ProcessingContext, ProcessingContext]: 

113 obj_jsons = [] 

114 item_jsons = [] 

115 

116 for j in ctx.jsons: 

117 c = j.content 

118 if isinstance(c, list): 

119 for i, el in enumerate(c): 

120 item_jsons.append(Resource(f"{j.id}/{i}", "json", el)) 

121 elif isinstance(c, dict): 

122 keys = self._collect_prop_names([], [j]) 

123 is_pseudo_array = False 

124 if self._pseudo_handler: 

125 is_pseudo_array, _ = self._pseudo_handler.is_pseudo_array(keys, ctx) 

126 if is_pseudo_array: 

127 sorted_keys = sorted(keys, key=lambda k: int(k) if k.isdigit() else -1) 

128 for i, k in enumerate(sorted_keys): 

129 item_jsons.append(Resource(f"{j.id}/{i}", "json", c[k])) 

130 else: 

131 obj_jsons.append(j) 

132 else: 

133 obj_jsons.append(j) 

134 

135 obj_schemas = [] 

136 item_schemas = [] 

137 

138 for s in ctx.schemas: 

139 c = s.content 

140 if isinstance(c, dict): 

141 t = c.get("type") 

142 if t == "array" and "items" in c: 

143 item_schemas.append(Resource(f"{s.id}/items", "schema", c["items"])) 

144 elif t == "object" and "properties" in c: 

145 keys = sorted(c["properties"].keys()) 

146 is_pseudo_array = False 

147 if self._pseudo_handler: 

148 is_pseudo_array, _ = self._pseudo_handler.is_pseudo_array(keys, ctx) 

149 if is_pseudo_array: 

150 sorted_keys = sorted(keys, key=lambda k: int(k) if k.isdigit() else -1) 

151 for i, k in enumerate(sorted_keys): 

152 item_schemas.append( 

153 Resource(f"{s.id}/{i}", "schema", c["properties"][k]) 

154 ) 

155 else: 

156 obj_schemas.append(s) 

157 elif t == "object" and isinstance(c.get("patternProperties"), dict): 

158 pattern_props = c["patternProperties"] 

159 if not self._pseudo_handler: 

160 obj_schemas.append(s) 

161 continue 

162 

163 keys = self._collect_prop_names([], ctx.jsons) 

164 is_pseudo_array, _ = self._pseudo_handler.is_pseudo_array(keys, ctx) 

165 

166 # If a branch is not pseudo-array, keep schema on object path. 

167 if not is_pseudo_array or not keys: 

168 obj_schemas.append(s) 

169 continue 

170 

171 matched_patterns: list[str] = [] 

172 covered_keys: set[str] = set() 

173 for pattern in pattern_props: 

174 matched_keys = self._keys_matched_by_pattern(pattern, keys) 

175 if not matched_keys: 

176 continue 

177 matched_patterns.append(pattern) 

178 covered_keys.update(matched_keys) 

179 

180 # Branch contains non-pattern keys: do not map patternProperties as items. 

181 has_non_pattern_keys = any(key not in covered_keys for key in keys) 

182 if has_non_pattern_keys or not matched_patterns: 

183 obj_schemas.append(s) 

184 continue 

185 

186 for index, pattern in enumerate(matched_patterns): 

187 item_schemas.append( 

188 Resource( 

189 f"{s.id}/patternProperties/{index}", 

190 "schema", 

191 pattern_props[pattern], 

192 ) 

193 ) 

194 else: 

195 obj_schemas.append(s) 

196 else: 

197 obj_schemas.append(s) 

198 

199 return ( 

200 ProcessingContext(obj_schemas, obj_jsons, ctx.sealed), 

201 ProcessingContext(item_schemas, item_jsons, ctx.sealed), 

202 ) 

203 

204 def _filter_ctx_by_ids(self, ctx: ProcessingContext, ids: set) -> ProcessingContext: 

205 if not ids: 

206 return ctx 

207 schemas = [s for s in ctx.schemas if s.id in ids] 

208 jsons = [j for j in ctx.jsons if j.id in ids] 

209 return ProcessingContext(schemas, jsons, ctx.sealed) 

210 

211 # ---------------- core ---------------- 

212 

213 def _run_level(self, ctx: ProcessingContext, env: str, prev: dict) -> dict: 

214 logger.debug("Entering _run_level: env=%s, prev_result=%s", env, prev) 

215 node = dict(prev) 

216 

217 def use_comp(comp: Comparator) -> bool: 

218 if not comp.can_process(ctx, env, node): 

219 return False 

220 

221 g, alts = comp.process(ctx, env, node) 

222 if g: 

223 node.update(g) 

224 if alts: 

225 node.setdefault(self._base_of, []).extend(alts) 

226 return True 

227 

228 # Вызов базового компаратора 

229 use_comp(self._core_comparator) 

230 

231 # Определение является ли объект псевдомассивом 

232 if node.get("type") == "object": 

233 props = self._collect_prop_names(ctx.schemas, ctx.jsons) 

234 if self._pseudo_handler: 

235 is_pseudo_array, pattern = self._pseudo_handler.is_pseudo_array(props, ctx) 

236 node["isPseudoArray"] = is_pseudo_array 

237 else: 

238 # node["isPseudoArray"] = False 

239 is_pseudo_array = False 

240 

241 # Вызов остальных компараторов 

242 for comp in self._comparators: 

243 use_comp(comp) 

244 

245 # Удаление атрибутов помеченных на удаление 

246 to_delete_keys = [] 

247 for key, element in node.items(): 

248 if isinstance(element, ToDelete): 

249 to_delete_keys.append(key) 

250 for key in to_delete_keys: 

251 del node[key] 

252 

253 # если есть Of — обработаем каждую альтернативу через _run_level 

254 if self._base_of in node: 

255 new_of = [] 

256 for idx, alt in enumerate(node[self._base_of]): 

257 alt_ids = set(alt.get("j2sElementTrigger", [])) 

258 alt_ctx = self._filter_ctx_by_ids(ctx, alt_ids) if alt_ids else ctx 

259 processed_alt = self._run_level(alt_ctx, env + f"/{self._base_of}/{idx}", alt) 

260 new_of.append(processed_alt) 

261 node[self._base_of] = new_of 

262 logger.debug( 

263 "Exiting _run_level (%s handled): env=%s, node=%s", self._base_of, env, node 

264 ) 

265 return node 

266 

267 # recursion based on type 

268 if node.get("type") == "object": 

269 if is_pseudo_array: 

270 node = self._run_pseudo_array(ctx, env, node, str(pattern)) 

271 else: 

272 node = self._run_object(ctx, env, node) 

273 elif node.get("type") == "array": 

274 node = self._run_array(ctx, env, node) 

275 

276 logger.debug("Exiting _run_level: env=%s, node=%s", env, node) 

277 return node 

278 

279 # ---------------- object ---------------- 

280 

281 def _run_object(self, ctx: ProcessingContext, env: str, node: dict) -> dict: 

282 node = dict(node) 

283 node.setdefault("properties", {}) 

284 

285 props = self._collect_prop_names(ctx.schemas, ctx.jsons) 

286 for name in props: 

287 s, j = self._gather_property_candidates(ctx.schemas, ctx.jsons, name) 

288 sub_ctx = ProcessingContext(s, j, ctx.sealed) 

289 node["properties"][name] = self._run_level( 

290 sub_ctx, f"{env}/properties/{name}", node["properties"].get(name, {}) 

291 ) 

292 

293 if not node["properties"]: 

294 node.pop("properties", None) 

295 

296 return node 

297 

298 # ---------------- pseudo array ---------------- 

299 

300 def _run_pseudo_array(self, ctx: ProcessingContext, env: str, node: dict, pattern: str) -> dict: 

301 node = dict(node) 

302 node.setdefault("patternProperties", {}) 

303 _, items_ctx = self._split_array_ctx(ctx) 

304 node["patternProperties"][pattern] = self._run_level( 

305 items_ctx, f"{env}/patternProperties/{pattern}", {} 

306 ) 

307 if not node["patternProperties"]: 

308 node.pop("patternProperties", None) 

309 return node 

310 

311 # ---------------- array ---------------- 

312 

313 def _run_array(self, ctx: ProcessingContext, env: str, node: dict) -> dict: 

314 node = dict(node) 

315 node.setdefault("items", {}) 

316 

317 _, items_ctx = self._split_array_ctx(ctx) 

318 node["items"] = self._run_level(items_ctx, f"{env}/items", node.get("items", {})) 

319 

320 return node 

321 

322 # ---------------- entry ---------------- 

323 

324 def run(self) -> dict: 

325 ctx = ProcessingContext(self._schemas, self._jsons, sealed=False) 

326 return self._run_level(ctx, "/", {})