Coverage for human_requests/abstraction/proxy_manager.py: 38%

47 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-13 21:41 +0000

1from __future__ import annotations 

2 

3from dataclasses import dataclass 

4from typing import Dict, Optional 

5from urllib.parse import quote, urlsplit, urlunsplit 

6 

7ProxyInput = str | Dict[str, str] | None 

8 

9 

10@dataclass(frozen=True) 

11class ParsedProxy: 

12 """ 

13 Unified proxy representation: 

14 - scheme: http | https | socks5 | socks5h | ... 

15 - host: example.com 

16 - port: 8080 (or None) 

17 - username/password: may be None 

18 """ 

19 

20 scheme: str 

21 host: str 

22 port: Optional[int] 

23 username: Optional[str] 

24 password: Optional[str] 

25 

26 @classmethod 

27 def from_any(cls, value: ProxyInput) -> Optional["ParsedProxy"]: 

28 """ 

29 Supports: 

30 - URL string: http://user:pass@host:port, socks5://host:1080, ... 

31 - dict: {"server": "...", "username": "...", "password": "..."} 

32 If credentials conflict: URL takes precedence over dict fields. 

33 """ 

34 if not value: 

35 return None 

36 

37 dict_user: Optional[str] = None 

38 dict_pass: Optional[str] = None 

39 

40 if isinstance(value, dict): 

41 server = value.get("server") or "" 

42 dict_user = value.get("username") 

43 dict_pass = value.get("password") 

44 else: 

45 server = value 

46 

47 p = urlsplit(server) 

48 # Требуем хотя бы схему и хост 

49 if not p.scheme or not p.hostname: 

50 return None 

51 

52 user = p.username or dict_user 

53 pwd = p.password or dict_pass 

54 return cls(p.scheme, p.hostname, p.port, user, pwd) 

55 

56 def for_playwright(self) -> Dict[str, str]: 

57 """ 

58 Converts ParsedProxy → playwright/patchright/camoufox launch proxy dict: 

59 {"server": "scheme://host[:port]", "username": "...", "password": "..."} 

60 """ 

61 server = f"{self.scheme}://{self.host}" + (f":{self.port}" if self.port else "") 

62 out: Dict[str, str] = {"server": server} 

63 if self.username: 

64 out["username"] = self.username 

65 if self.password: 

66 out["password"] = self.password 

67 return out 

68 

69 def for_curl(self) -> str: 

70 """ 

71 Converts ParsedProxy → curl_cffi proxies dict: 

72 {"http": url, "https": url, "all": url} 

73 Uses a URL with userinfo: scheme://user:pass@host[:port] 

74 """ 

75 auth = "" 

76 if self.username: 

77 auth = quote(self.username, safe="") 

78 if self.password: 

79 auth += ":" + quote(self.password, safe="") 

80 auth += "@" 

81 

82 netloc = f"{self.host}" + (f":{self.port}" if self.port else "") 

83 url = urlunsplit((self.scheme, auth + netloc, "", "", "")) 

84 return url