Coverage for human_requests/network_analyzer/anomaly_sniffer.py: 20%

177 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-05-28 00:39 +0000

1from __future__ import annotations 

2 

3import asyncio 

4import re 

5from collections import defaultdict 

6from dataclasses import dataclass 

7from enum import Enum, auto 

8from typing import Callable, Dict, Iterable, List, Optional, Pattern, Set, Union 

9from urllib.parse import urlsplit, urlunsplit 

10 

11from playwright.async_api import BrowserContext, Request, Response 

12 

13# --- WAIT API ----------------------------------------------------------------- 

14 

15 

16class WaitSource(Enum): 

17 REQUEST = auto() 

18 RESPONSE = auto() 

19 ALL = auto() 

20 

21 

22@dataclass(frozen=True) 

23class WaitHeader: 

24 source: WaitSource = WaitSource.ALL # источник: запросы/ответы/оба 

25 headers: Optional[List[str]] = None # список имён заголовков (case-insensitive) 

26 

27 def __post_init__(self) -> None: 

28 if not self.headers: 

29 raise ValueError("WaitHeader.headers must be non-empty") 

30 object.__setattr__(self, "headers", [h.lower() for h in self.headers]) 

31 

32 

33# --- SNIFFER ------------------------------------------------------------------ 

34 

35UrlFilter = Optional[Union[Callable[[str], bool], str, Pattern[str]]] 

36 

37 

38class HeaderAnomalySniffer: 

39 """Собирает НЕстандартные заголовки запросов/ответов по всему BrowserContext. 

40 

41 Использование: 

42 sniffer = HeaderAnomalySniffer() 

43 await sniffer.start(ctx) 

44 # ... действия, которые нужно «послушать» ... 

45 result = await sniffer.complite() 

46 

47 Результат: 

48 { 

49 "request": { url: { header: [values...] } }, 

50 "response": { url: { header: [values...] } }, 

51 } 

52 """ 

53 

54 # базовый вайтлист (нижний регистр) 

55 _REQ_STD: Set[str] = { 

56 "accept", 

57 "accept-encoding", 

58 "accept-language", 

59 "cache-control", 

60 "connection", 

61 "content-length", 

62 "content-type", 

63 "cookie", 

64 "host", 

65 "origin", 

66 "pragma", 

67 "referer", 

68 "upgrade-insecure-requests", 

69 "user-agent", 

70 "sec-ch-ua", 

71 "sec-ch-ua-mobile", 

72 "sec-ch-ua-platform", 

73 "sec-fetch-dest", 

74 "sec-fetch-mode", 

75 "sec-fetch-site", 

76 "sec-fetch-user", 

77 "x-requested-with", 

78 "purpose", # prefetch hint 

79 } 

80 _RESP_STD: Set[str] = { 

81 "accept-ch", 

82 "accept-ranges", 

83 "age", 

84 "alt-svc", 

85 "cache-control", 

86 "content-disposition", 

87 "content-encoding", 

88 "content-language", 

89 "content-length", 

90 "content-security-policy", 

91 "content-type", 

92 "date", 

93 "etag", 

94 "expect-ct", 

95 "expires", 

96 "last-modified", 

97 "link", 

98 "pragma", 

99 "server", 

100 "set-cookie", 

101 "strict-transport-security", 

102 "transfer-encoding", 

103 "vary", 

104 "via", 

105 "x-content-type-options", 

106 "x-frame-options", 

107 "x-xss-protection", 

108 "report-to", 

109 "nel", 

110 "permissions-policy", 

111 "cross-origin-opener-policy", 

112 "cross-origin-embedder-policy", 

113 "cross-origin-resource-policy", 

114 "referrer-policy", 

115 "location", 

116 "connection", 

117 } 

118 _STD_PREFIXES = ("sec-", "access-control-") # CORS/CH префиксы считаем стандартными 

119 

120 def __init__( 

121 self, 

122 *, 

123 extra_request_allow: Iterable[str] = (), 

124 extra_response_allow: Iterable[str] = (), 

125 allowed_prefixes: Iterable[str] = (), 

126 include_subresources: bool = True, 

127 url_key: Optional[Callable[[str], str]] = None, 

128 url_filter: UrlFilter = None, 

129 ) -> None: 

130 self._req_allow = set(self._REQ_STD) | {h.lower() for h in extra_request_allow} 

131 self._resp_allow = set(self._RESP_STD) | {h.lower() for h in extra_response_allow} 

132 self._allowed_pref = tuple(self._STD_PREFIXES) + tuple(allowed_prefixes) 

133 self._include_sub = include_subresources 

134 

135 # нормализация URL по умолчанию: без фрагмента и без хвостового "/" 

136 if url_key: 

137 self._url_key = url_key 

138 else: 

139 

140 def _default_url_key(u: str) -> str: 

141 us = urlsplit(u) 

142 path = us.path.rstrip("/") or "/" 

143 return urlunsplit(us._replace(path=path, fragment="")) 

144 

145 self._url_key = _default_url_key 

146 

147 # фильтр URL: callable/regex/None 

148 self._url_filter_fn: Optional[Callable[[str], bool]] = None 

149 if url_filter is None: 

150 self._url_filter_fn = None 

151 elif callable(url_filter): 

152 self._url_filter_fn = url_filter 

153 else: 

154 # строка с регекспом или скомпилированный pattern 

155 pat: Pattern[str] = ( 

156 re.compile(url_filter) if isinstance(url_filter, str) else url_filter 

157 ) 

158 

159 def _url_filter(s: str, _p: Pattern[str] = pat) -> bool: 

160 return bool(_p.search(s)) 

161 

162 self._url_filter_fn = _url_filter 

163 

164 self._ctx: Optional[BrowserContext] = None 

165 self._started = False 

166 

167 # результаты: url -> header -> set(values) 

168 self._req_map: Dict[str, Dict[str, Set[str]]] = defaultdict(lambda: defaultdict(set)) 

169 self._resp_map: Dict[str, Dict[str, Set[str]]] = defaultdict(lambda: defaultdict(set)) 

170 

171 # ссылки на колбэки 

172 self._req_cb: Optional[Callable[[Request], None]] = None 

173 self._resp_cb: Optional[Callable[[Response], None]] = None 

174 

175 # пул задач 

176 self._tasks: Set[asyncio.Task] = set() 

177 self._lock = asyncio.Lock() 

178 self._wait_cond = asyncio.Condition( 

179 self._lock 

180 ) # уведомляем при каждом новом аномальном хедере 

181 

182 # ---------- API ---------- 

183 

184 async def start(self, ctx: BrowserContext) -> None: 

185 if self._started: 

186 raise RuntimeError("sniffer already started") 

187 self._ctx = ctx 

188 

189 def on_req(req: Request) -> None: 

190 if not self._started: 

191 return 

192 if not self._include_sub: 

193 try: 

194 if ( 

195 not req.is_navigation_request() 

196 or req.resource_type != "document" 

197 or req.frame.parent_frame is not None 

198 ): 

199 return 

200 except Exception: 

201 return 

202 t = asyncio.create_task(self._handle_request(req)) 

203 self._tasks.add(t) 

204 t.add_done_callback(self._tasks.discard) 

205 

206 def on_resp(resp: Response) -> None: 

207 if not self._started: 

208 return 

209 if not self._include_sub: 

210 rq = resp.request 

211 try: 

212 if ( 

213 not rq.is_navigation_request() 

214 or rq.resource_type != "document" 

215 or rq.frame.parent_frame is not None 

216 ): 

217 return 

218 except Exception: 

219 return 

220 t = asyncio.create_task(self._handle_response(resp)) 

221 self._tasks.add(t) 

222 t.add_done_callback(self._tasks.discard) 

223 

224 self._req_cb = None 

225 self._resp_cb = None 

226 

227 ctx.on("request", on_req) 

228 ctx.on("response", on_resp) 

229 self._started = True 

230 

231 async def complete(self) -> Dict[str, Dict[str, Dict[str, list[str]]]]: 

232 if not self._started or self._ctx is None: 

233 raise RuntimeError("sniffer not started") 

234 self._started = False 

235 

236 # официальная отписка 

237 if self._req_cb: 

238 self._ctx.remove_listener("request", self._req_cb) 

239 if self._resp_cb: 

240 self._ctx.remove_listener("response", self._resp_cb) 

241 

242 if self._tasks: 

243 await asyncio.gather(*list(self._tasks)) 

244 self._tasks.clear() 

245 

246 return self._snapshot() 

247 

248 async def wait( 

249 self, *, tasks: List[WaitHeader], timeout_ms: int = 30000 

250 ) -> Dict[str, Dict[str, Dict[str, list[str]]]]: 

251 """ 

252 Ждёт, пока в ЛОГЕ аномалий появятся все указанные заголовки (для каждого WaitHeader). 

253 Учитываются только записи, прошедшие url_filter и «нестандартность». 

254 

255 tasks: 

256 список условий. Для каждого WaitHeader все его headers должны встретиться 

257 хотя бы по одному значению, хотя бы на одном URL. 

258 source=REQUEST/RESPONSE/ALL ограничивает источник поиска. 

259 

260 timeout_ms: 

261 общий таймаут ожидания (мс). По таймауту — TimeoutError. 

262 

263 Возвращает: 

264 текущий снапшот (как у complete), НЕ останавливает сниффер. 

265 """ 

266 if not self._started: 

267 raise RuntimeError("sniffer not started") 

268 deadline = asyncio.get_running_loop().time() + (timeout_ms / 1000.0) 

269 

270 async with self._wait_cond: 

271 # быстрый путь — уже всё есть 

272 if self._wait_satisfied(tasks): 

273 return self._snapshot() 

274 

275 while True: 

276 remaining = deadline - asyncio.get_running_loop().time() 

277 if remaining <= 0: 

278 raise TimeoutError("wait: timeout") 

279 await asyncio.wait_for(self._wait_cond.wait(), timeout=remaining) 

280 if self._wait_satisfied(tasks): 

281 return self._snapshot() 

282 

283 # ---------- внутреннее ---------- 

284 

285 async def _handle_request(self, req: Request) -> None: 

286 url = self._url_key(req.url) 

287 if self._url_filter_fn and not self._url_filter_fn(url): 

288 return 

289 headers: Dict[str, str] = getattr(req, "headers", {}) or {} 

290 unknown = {k.lower(): v for k, v in headers.items() if self._is_unknown_req(k)} 

291 if not unknown: 

292 return 

293 async with self._lock: 

294 for h, val in unknown.items(): 

295 # добавляем значение и нотифицируем wait-ожидания 

296 before = len(self._req_map[url][h]) 

297 self._req_map[url][h].add(val) 

298 if len(self._req_map[url][h]) != before: 

299 self._wait_cond.notify_all() 

300 

301 async def _handle_response(self, resp: Response) -> None: 

302 url = self._url_key(resp.url) 

303 if self._url_filter_fn and not self._url_filter_fn(url): 

304 return 

305 headers: Dict[str, str] = await resp.all_headers() 

306 unknown = {k.lower(): v for k, v in headers.items() if self._is_unknown_resp(k)} 

307 if not unknown: 

308 return 

309 async with self._lock: 

310 for h, val in unknown.items(): 

311 before = len(self._resp_map[url][h]) 

312 self._resp_map[url][h].add(val) 

313 if len(self._resp_map[url][h]) != before: 

314 self._wait_cond.notify_all() 

315 

316 # ---------- utils ---------- 

317 

318 def _is_unknown_req(self, name: str) -> bool: 

319 n = name.lower() 

320 return n not in self._req_allow and not n.startswith(self._allowed_pref) 

321 

322 def _is_unknown_resp(self, name: str) -> bool: 

323 n = name.lower() 

324 return n not in self._resp_allow and not n.startswith(self._allowed_pref) 

325 

326 def _union_req_headers(self) -> Set[str]: 

327 out: Set[str] = set() 

328 for _, hmap in self._req_map.items(): 

329 out.update(hmap.keys()) 

330 return out 

331 

332 def _union_resp_headers(self) -> Set[str]: 

333 out: Set[str] = set() 

334 for _, hmap in self._resp_map.items(): 

335 out.update(hmap.keys()) 

336 return out 

337 

338 def _wait_satisfied(self, tasks: List[WaitHeader]) -> bool: 

339 req_union = self._union_req_headers() 

340 resp_union = self._union_resp_headers() 

341 for t in tasks: 

342 need = set(t.headers or []) 

343 if t.source is WaitSource.REQUEST: 

344 have = req_union 

345 elif t.source is WaitSource.RESPONSE: 

346 have = resp_union 

347 else: 

348 have = req_union | resp_union 

349 if not need.issubset(have): 

350 return False 

351 return True 

352 

353 def _snapshot(self) -> Dict[str, Dict[str, Dict[str, list[str]]]]: 

354 request_out = { 

355 url: {h: sorted(vals) for h, vals in hmap.items()} 

356 for url, hmap in self._req_map.items() 

357 } 

358 response_out = { 

359 url: {h: sorted(vals) for h, vals in hmap.items()} 

360 for url, hmap in self._resp_map.items() 

361 } 

362 return {"request": request_out, "response": response_out}