Coverage for human_requests/network_analyzer/anomaly_sniffer.py: 20%
177 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-05-28 00:39 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-05-28 00:39 +0000
1from __future__ import annotations
3import asyncio
4import re
5from collections import defaultdict
6from dataclasses import dataclass
7from enum import Enum, auto
8from typing import Callable, Dict, Iterable, List, Optional, Pattern, Set, Union
9from urllib.parse import urlsplit, urlunsplit
11from playwright.async_api import BrowserContext, Request, Response
13# --- WAIT API -----------------------------------------------------------------
16class WaitSource(Enum):
17 REQUEST = auto()
18 RESPONSE = auto()
19 ALL = auto()
22@dataclass(frozen=True)
23class WaitHeader:
24 source: WaitSource = WaitSource.ALL # источник: запросы/ответы/оба
25 headers: Optional[List[str]] = None # список имён заголовков (case-insensitive)
27 def __post_init__(self) -> None:
28 if not self.headers:
29 raise ValueError("WaitHeader.headers must be non-empty")
30 object.__setattr__(self, "headers", [h.lower() for h in self.headers])
33# --- SNIFFER ------------------------------------------------------------------
35UrlFilter = Optional[Union[Callable[[str], bool], str, Pattern[str]]]
38class HeaderAnomalySniffer:
39 """Собирает НЕстандартные заголовки запросов/ответов по всему BrowserContext.
41 Использование:
42 sniffer = HeaderAnomalySniffer()
43 await sniffer.start(ctx)
44 # ... действия, которые нужно «послушать» ...
45 result = await sniffer.complite()
47 Результат:
48 {
49 "request": { url: { header: [values...] } },
50 "response": { url: { header: [values...] } },
51 }
52 """
54 # базовый вайтлист (нижний регистр)
55 _REQ_STD: Set[str] = {
56 "accept",
57 "accept-encoding",
58 "accept-language",
59 "cache-control",
60 "connection",
61 "content-length",
62 "content-type",
63 "cookie",
64 "host",
65 "origin",
66 "pragma",
67 "referer",
68 "upgrade-insecure-requests",
69 "user-agent",
70 "sec-ch-ua",
71 "sec-ch-ua-mobile",
72 "sec-ch-ua-platform",
73 "sec-fetch-dest",
74 "sec-fetch-mode",
75 "sec-fetch-site",
76 "sec-fetch-user",
77 "x-requested-with",
78 "purpose", # prefetch hint
79 }
80 _RESP_STD: Set[str] = {
81 "accept-ch",
82 "accept-ranges",
83 "age",
84 "alt-svc",
85 "cache-control",
86 "content-disposition",
87 "content-encoding",
88 "content-language",
89 "content-length",
90 "content-security-policy",
91 "content-type",
92 "date",
93 "etag",
94 "expect-ct",
95 "expires",
96 "last-modified",
97 "link",
98 "pragma",
99 "server",
100 "set-cookie",
101 "strict-transport-security",
102 "transfer-encoding",
103 "vary",
104 "via",
105 "x-content-type-options",
106 "x-frame-options",
107 "x-xss-protection",
108 "report-to",
109 "nel",
110 "permissions-policy",
111 "cross-origin-opener-policy",
112 "cross-origin-embedder-policy",
113 "cross-origin-resource-policy",
114 "referrer-policy",
115 "location",
116 "connection",
117 }
118 _STD_PREFIXES = ("sec-", "access-control-") # CORS/CH префиксы считаем стандартными
120 def __init__(
121 self,
122 *,
123 extra_request_allow: Iterable[str] = (),
124 extra_response_allow: Iterable[str] = (),
125 allowed_prefixes: Iterable[str] = (),
126 include_subresources: bool = True,
127 url_key: Optional[Callable[[str], str]] = None,
128 url_filter: UrlFilter = None,
129 ) -> None:
130 self._req_allow = set(self._REQ_STD) | {h.lower() for h in extra_request_allow}
131 self._resp_allow = set(self._RESP_STD) | {h.lower() for h in extra_response_allow}
132 self._allowed_pref = tuple(self._STD_PREFIXES) + tuple(allowed_prefixes)
133 self._include_sub = include_subresources
135 # нормализация URL по умолчанию: без фрагмента и без хвостового "/"
136 if url_key:
137 self._url_key = url_key
138 else:
140 def _default_url_key(u: str) -> str:
141 us = urlsplit(u)
142 path = us.path.rstrip("/") or "/"
143 return urlunsplit(us._replace(path=path, fragment=""))
145 self._url_key = _default_url_key
147 # фильтр URL: callable/regex/None
148 self._url_filter_fn: Optional[Callable[[str], bool]] = None
149 if url_filter is None:
150 self._url_filter_fn = None
151 elif callable(url_filter):
152 self._url_filter_fn = url_filter
153 else:
154 # строка с регекспом или скомпилированный pattern
155 pat: Pattern[str] = (
156 re.compile(url_filter) if isinstance(url_filter, str) else url_filter
157 )
159 def _url_filter(s: str, _p: Pattern[str] = pat) -> bool:
160 return bool(_p.search(s))
162 self._url_filter_fn = _url_filter
164 self._ctx: Optional[BrowserContext] = None
165 self._started = False
167 # результаты: url -> header -> set(values)
168 self._req_map: Dict[str, Dict[str, Set[str]]] = defaultdict(lambda: defaultdict(set))
169 self._resp_map: Dict[str, Dict[str, Set[str]]] = defaultdict(lambda: defaultdict(set))
171 # ссылки на колбэки
172 self._req_cb: Optional[Callable[[Request], None]] = None
173 self._resp_cb: Optional[Callable[[Response], None]] = None
175 # пул задач
176 self._tasks: Set[asyncio.Task] = set()
177 self._lock = asyncio.Lock()
178 self._wait_cond = asyncio.Condition(
179 self._lock
180 ) # уведомляем при каждом новом аномальном хедере
182 # ---------- API ----------
184 async def start(self, ctx: BrowserContext) -> None:
185 if self._started:
186 raise RuntimeError("sniffer already started")
187 self._ctx = ctx
189 def on_req(req: Request) -> None:
190 if not self._started:
191 return
192 if not self._include_sub:
193 try:
194 if (
195 not req.is_navigation_request()
196 or req.resource_type != "document"
197 or req.frame.parent_frame is not None
198 ):
199 return
200 except Exception:
201 return
202 t = asyncio.create_task(self._handle_request(req))
203 self._tasks.add(t)
204 t.add_done_callback(self._tasks.discard)
206 def on_resp(resp: Response) -> None:
207 if not self._started:
208 return
209 if not self._include_sub:
210 rq = resp.request
211 try:
212 if (
213 not rq.is_navigation_request()
214 or rq.resource_type != "document"
215 or rq.frame.parent_frame is not None
216 ):
217 return
218 except Exception:
219 return
220 t = asyncio.create_task(self._handle_response(resp))
221 self._tasks.add(t)
222 t.add_done_callback(self._tasks.discard)
224 self._req_cb = None
225 self._resp_cb = None
227 ctx.on("request", on_req)
228 ctx.on("response", on_resp)
229 self._started = True
231 async def complete(self) -> Dict[str, Dict[str, Dict[str, list[str]]]]:
232 if not self._started or self._ctx is None:
233 raise RuntimeError("sniffer not started")
234 self._started = False
236 # официальная отписка
237 if self._req_cb:
238 self._ctx.remove_listener("request", self._req_cb)
239 if self._resp_cb:
240 self._ctx.remove_listener("response", self._resp_cb)
242 if self._tasks:
243 await asyncio.gather(*list(self._tasks))
244 self._tasks.clear()
246 return self._snapshot()
248 async def wait(
249 self, *, tasks: List[WaitHeader], timeout_ms: int = 30000
250 ) -> Dict[str, Dict[str, Dict[str, list[str]]]]:
251 """
252 Ждёт, пока в ЛОГЕ аномалий появятся все указанные заголовки (для каждого WaitHeader).
253 Учитываются только записи, прошедшие url_filter и «нестандартность».
255 tasks:
256 список условий. Для каждого WaitHeader все его headers должны встретиться
257 хотя бы по одному значению, хотя бы на одном URL.
258 source=REQUEST/RESPONSE/ALL ограничивает источник поиска.
260 timeout_ms:
261 общий таймаут ожидания (мс). По таймауту — TimeoutError.
263 Возвращает:
264 текущий снапшот (как у complete), НЕ останавливает сниффер.
265 """
266 if not self._started:
267 raise RuntimeError("sniffer not started")
268 deadline = asyncio.get_running_loop().time() + (timeout_ms / 1000.0)
270 async with self._wait_cond:
271 # быстрый путь — уже всё есть
272 if self._wait_satisfied(tasks):
273 return self._snapshot()
275 while True:
276 remaining = deadline - asyncio.get_running_loop().time()
277 if remaining <= 0:
278 raise TimeoutError("wait: timeout")
279 await asyncio.wait_for(self._wait_cond.wait(), timeout=remaining)
280 if self._wait_satisfied(tasks):
281 return self._snapshot()
283 # ---------- внутреннее ----------
285 async def _handle_request(self, req: Request) -> None:
286 url = self._url_key(req.url)
287 if self._url_filter_fn and not self._url_filter_fn(url):
288 return
289 headers: Dict[str, str] = getattr(req, "headers", {}) or {}
290 unknown = {k.lower(): v for k, v in headers.items() if self._is_unknown_req(k)}
291 if not unknown:
292 return
293 async with self._lock:
294 for h, val in unknown.items():
295 # добавляем значение и нотифицируем wait-ожидания
296 before = len(self._req_map[url][h])
297 self._req_map[url][h].add(val)
298 if len(self._req_map[url][h]) != before:
299 self._wait_cond.notify_all()
301 async def _handle_response(self, resp: Response) -> None:
302 url = self._url_key(resp.url)
303 if self._url_filter_fn and not self._url_filter_fn(url):
304 return
305 headers: Dict[str, str] = await resp.all_headers()
306 unknown = {k.lower(): v for k, v in headers.items() if self._is_unknown_resp(k)}
307 if not unknown:
308 return
309 async with self._lock:
310 for h, val in unknown.items():
311 before = len(self._resp_map[url][h])
312 self._resp_map[url][h].add(val)
313 if len(self._resp_map[url][h]) != before:
314 self._wait_cond.notify_all()
316 # ---------- utils ----------
318 def _is_unknown_req(self, name: str) -> bool:
319 n = name.lower()
320 return n not in self._req_allow and not n.startswith(self._allowed_pref)
322 def _is_unknown_resp(self, name: str) -> bool:
323 n = name.lower()
324 return n not in self._resp_allow and not n.startswith(self._allowed_pref)
326 def _union_req_headers(self) -> Set[str]:
327 out: Set[str] = set()
328 for _, hmap in self._req_map.items():
329 out.update(hmap.keys())
330 return out
332 def _union_resp_headers(self) -> Set[str]:
333 out: Set[str] = set()
334 for _, hmap in self._resp_map.items():
335 out.update(hmap.keys())
336 return out
338 def _wait_satisfied(self, tasks: List[WaitHeader]) -> bool:
339 req_union = self._union_req_headers()
340 resp_union = self._union_resp_headers()
341 for t in tasks:
342 need = set(t.headers or [])
343 if t.source is WaitSource.REQUEST:
344 have = req_union
345 elif t.source is WaitSource.RESPONSE:
346 have = resp_union
347 else:
348 have = req_union | resp_union
349 if not need.issubset(have):
350 return False
351 return True
353 def _snapshot(self) -> Dict[str, Dict[str, Dict[str, list[str]]]]:
354 request_out = {
355 url: {h: sorted(vals) for h, vals in hmap.items()}
356 for url, hmap in self._req_map.items()
357 }
358 response_out = {
359 url: {h: sorted(vals) for h, vals in hmap.items()}
360 for url, hmap in self._resp_map.items()
361 }
362 return {"request": request_out, "response": response_out}