Coverage for human_requests/abstraction/output.py: 76%

122 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-05-28 00:39 +0000

1from __future__ import annotations 

2 

3import json 

4from dataclasses import dataclass, field 

5from io import BytesIO 

6from time import time 

7from typing import Any, Literal 

8 

9from PIL import Image 

10from playwright.async_api import Response as PWResponse 

11 

12from .http import URL 

13from .json_debug import loads_json_debug 

14 

15 

16def _coerce_bytes(raw: bytes | bytearray | memoryview | str) -> bytes: 

17 if isinstance(raw, bytes): 

18 return raw 

19 if isinstance(raw, bytearray): 

20 return bytes(raw) 

21 if isinstance(raw, memoryview): 

22 return raw.tobytes() 

23 return raw.encode("utf-8", "replace") 

24 

25 

26def _normalize_headers(headers: dict[str, Any] | None) -> dict[str, str]: 

27 normalized: dict[str, str] = {} 

28 for key, value in (headers or {}).items(): 

29 normalized[str(key).lower()] = "" if value is None else str(value) 

30 return normalized 

31 

32 

33def _coerce_url(url: URL | str | None) -> URL | None: 

34 if url is None: 

35 return None 

36 if isinstance(url, URL): 

37 return url 

38 return URL(full_url=str(url)) 

39 

40 

41def _decode_text(raw: bytes, headers: dict[str, str]) -> str: 

42 content_type = headers.get("content-type", "") 

43 charset = "utf-8" 

44 if "charset=" in content_type: 

45 charset = content_type.split("charset=", 1)[-1].split(";", 1)[0].strip() or charset 

46 return raw.decode(charset, errors="replace") 

47 

48 

49@dataclass 

50class Output: 

51 raw: bytes = field(repr=False) 

52 headers: dict[str, str] = field(default_factory=dict) 

53 url: URL | None = None 

54 status_code: int | None = None 

55 status_text: str | None = None 

56 redirected: bool | None = None 

57 type: Literal["basic", "cors", "error", "opaque", "opaqueredirect"] | None = None 

58 duration: float | None = None 

59 end_time: float | None = None 

60 request: Any | None = None 

61 page: Any | None = None 

62 

63 def __post_init__(self) -> None: 

64 self.raw = _coerce_bytes(self.raw) 

65 self.headers = _normalize_headers(self.headers) 

66 if self.status_code is not None: 

67 self.status_code = int(self.status_code) 

68 

69 @classmethod 

70 def from_fetch_response(cls, response: Any) -> "Output": 

71 return cls( 

72 raw=getattr(response, "raw", b""), 

73 headers=getattr(response, "headers", {}) or {}, 

74 url=_coerce_url(getattr(response, "url", None)), 

75 status_code=getattr(response, "status_code", None), 

76 status_text=getattr(response, "status_text", None), 

77 redirected=getattr(response, "redirected", None), 

78 type=getattr(response, "type", None), 

79 duration=getattr(response, "duration", None), 

80 end_time=getattr(response, "end_time", None), 

81 request=getattr(response, "request", None), 

82 page=getattr(response, "page", None), 

83 ) 

84 

85 @classmethod 

86 def from_raw( 

87 cls, 

88 raw: bytes | bytearray | memoryview | str, 

89 *, 

90 url: URL | str | None = None, 

91 headers: dict[str, Any] | None = None, 

92 status_code: int | None = None, 

93 status_text: str | None = None, 

94 redirected: bool | None = None, 

95 response_type: Literal["basic", "cors", "error", "opaque", "opaqueredirect"] | None = None, 

96 duration: float | None = None, 

97 end_time: float | None = None, 

98 request: Any | None = None, 

99 page: Any | None = None, 

100 ) -> "Output": 

101 coerced_raw = _coerce_bytes(raw) 

102 return cls( 

103 raw=coerced_raw, 

104 headers=headers or {}, 

105 url=_coerce_url(url), 

106 status_code=status_code, 

107 status_text=status_text, 

108 redirected=redirected, 

109 type=response_type, 

110 duration=duration, 

111 end_time=end_time if end_time is not None else time(), 

112 request=request, 

113 page=page, 

114 ) 

115 

116 @classmethod 

117 async def from_playwright_response( 

118 cls, 

119 response: "PWResponse", 

120 *, 

121 page: Any | None = None, 

122 json_override: Any | None = None, 

123 text_override: str | bytes | bytearray | memoryview | None = None, 

124 ) -> "Output": 

125 """Build an Output from a Playwright response. 

126 

127 json_override and text_override let callers replace the response body 

128 with data obtained outside the Playwright response object while keeping 

129 response metadata intact. 

130 """ 

131 if json_override is not None and text_override is not None: 

132 raise ValueError("json_override and text_override are mutually exclusive") 

133 

134 headers = await response.all_headers() 

135 if json_override is not None: 

136 raw = json.dumps(json_override, ensure_ascii=False).encode("utf-8") 

137 headers = dict(headers) 

138 headers["content-type"] = "application/json; charset=utf-8" 

139 elif text_override is not None: 

140 raw = _coerce_bytes(text_override) 

141 headers = dict(headers) 

142 headers["content-type"] = "text/plain; charset=utf-8" 

143 else: 

144 raw = await response.body() 

145 

146 return cls( 

147 raw=raw, 

148 headers=headers, 

149 url=_coerce_url(getattr(response, "url", None)), 

150 status_code=getattr(response, "status", None), 

151 status_text=getattr(response, "status_text", None), 

152 redirected=None, 

153 type=getattr(response, "type", None), 

154 duration=0.0, 

155 end_time=time(), 

156 request=getattr(response, "request", None), 

157 page=page, 

158 ) 

159 

160 @property 

161 def status(self) -> int | None: 

162 return self.status_code 

163 

164 @property 

165 def response_type(self) -> str | None: 

166 return self.type 

167 

168 @property 

169 def text(self) -> str: 

170 return _decode_text(self.raw, self.headers) 

171 

172 def body(self) -> bytes: 

173 return self.raw 

174 

175 def json(self) -> Any: 

176 return loads_json_debug(self.text) 

177 

178 def image(self) -> Any: 

179 image = Image.open(BytesIO(self.raw)) 

180 image.load() 

181 return image 

182 

183 def all_headers(self) -> dict[str, str]: 

184 return dict(self.headers) 

185 

186 def header_value(self, name: str) -> str | None: 

187 return self.headers.get(name.lower()) 

188 

189 def header_values(self, name: str) -> list[str]: 

190 value = self.header_value(name) 

191 if value is None: 

192 return [] 

193 return [value] 

194 

195 def headers_array(self) -> list[dict[str, str]]: 

196 return [{"name": key, "value": value} for key, value in self.headers.items()] 

197 

198 def seconds_ago(self) -> float: 

199 if self.end_time is None: 

200 return 0.0 

201 return time() - self.end_time 

202 

203 async def render( 

204 self, 

205 retry: int = 2, 

206 timeout: float | None = None, 

207 wait_until: Literal["commit", "load", "domcontentloaded", "networkidle"] = "commit", 

208 referer: str | None = None, 

209 ) -> Any: 

210 if self.page is None: 

211 raise RuntimeError("render() requires a page") 

212 page = await self.page.context.new_page() 

213 await page.goto_render( 

214 self, 

215 wait_until=wait_until, 

216 referer=referer, 

217 timeout=timeout, 

218 retry=retry, 

219 ) 

220 return page 

221 

222 def __bytes__(self) -> bytes: 

223 return self.raw 

224 

225 def __len__(self) -> int: 

226 return len(self.raw) 

227 

228 def __getattr__(self, name: str) -> Any: 

229 for source in (self.request, self.page): 

230 if source is not None and hasattr(source, name): 

231 return getattr(source, name) 

232 raise AttributeError(name)