Coverage for human_requests/autotest_report.py: 84%

412 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-05-28 00:39 +0000

1from __future__ import annotations 

2 

3import inspect 

4import io 

5import linecache 

6import os 

7import sys 

8import traceback 

9from dataclasses import dataclass 

10from json import JSONDecodeError 

11from pathlib import Path 

12from types import FrameType, TracebackType 

13from typing import Any, Callable, NoReturn, Sequence 

14 

15from _pytest._code.code import ReprExceptionInfo, ReprFileLocation, ReprTracebackNative 

16from rich import box 

17from rich.console import Console 

18from rich.panel import Panel 

19from rich.syntax import Syntax 

20from rich.table import Table 

21from rich.text import Text 

22 

23AutotestFunction = Callable[..., Any] 

24 

25 

26@dataclass(frozen=True) 

27class AutotestCrashData: 

28 summary_message: str 

29 detail_message: str 

30 report: str 

31 source_path: str 

32 source_lineno: int 

33 

34 

35@dataclass(frozen=True) 

36class AutotestSourceLocation: 

37 filename: str 

38 lineno: int 

39 start_lineno: int | None = None 

40 end_lineno: int | None = None 

41 colno: int | None = None 

42 end_colno: int | None = None 

43 

44 

45@dataclass(frozen=True) 

46class AutotestPanelRow: 

47 label: str 

48 value: str 

49 label_style: str | None = None 

50 value_style: str | None = None 

51 value_highlights: tuple[tuple[str, str], ...] = () 

52 

53 

54class AutotestCrash(RuntimeError): 

55 def __init__(self, data: AutotestCrashData) -> None: 

56 super().__init__(data.summary_message) 

57 self.summary_message = data.summary_message 

58 self.detail_message = data.detail_message 

59 self.report = data.report 

60 self.source_path = data.source_path 

61 self.source_lineno = data.source_lineno 

62 

63 def to_longrepr(self) -> ReprExceptionInfo: 

64 return ReprExceptionInfo( 

65 reprtraceback=ReprTracebackNative([self.report + "\n"]), 

66 reprcrash=ReprFileLocation( 

67 path=self.source_path, 

68 lineno=self.source_lineno, 

69 message=self.summary_message, 

70 ), 

71 ) 

72 

73 

74class AutotestMethodCrash(AutotestCrash): 

75 pass 

76 

77 

78class AutotestHookCrash(AutotestCrash): 

79 pass 

80 

81 

82class AutotestParamsCrash(AutotestCrash): 

83 pass 

84 

85 

86def build_autotest_method_crash_report( 

87 *, 

88 api: object, 

89 func: AutotestFunction, 

90 error: BaseException, 

91 context_lines: int = 3, 

92 trace_limit: int = 3, 

93 truncation_context_lines: int = 3, 

94) -> str: 

95 return _build_autotest_crash_data( 

96 api=api, 

97 title="API method crashed", 

98 subject_label="Method", 

99 subject_value=getattr(func, "__qualname__", repr(func)), 

100 error=error, 

101 context_lines=context_lines, 

102 trace_limit=trace_limit, 

103 truncation_context_lines=truncation_context_lines, 

104 ).report 

105 

106 

107def build_autotest_method_output_error_report( 

108 *, 

109 api: object, 

110 func: AutotestFunction, 

111 error: BaseException, 

112 context_lines: int = 3, 

113 trace_limit: int = 3, 

114 truncation_context_lines: int = 3, 

115) -> str: 

116 return _build_autotest_crash_data( 

117 api=api, 

118 title=_format_autotest_method_output_error_title(error), 

119 subject_label="Method", 

120 subject_value=getattr(func, "__qualname__", repr(func)), 

121 error=error, 

122 context_lines=context_lines, 

123 trace_limit=trace_limit, 

124 truncation_context_lines=truncation_context_lines, 

125 source_func=func, 

126 render_source_excerpt=False, 

127 ).report 

128 

129 

130def build_autotest_hook_crash_report( 

131 *, 

132 api: object, 

133 hook: AutotestFunction, 

134 error: BaseException, 

135 context_lines: int = 3, 

136 trace_limit: int = 3, 

137 truncation_context_lines: int = 3, 

138) -> str: 

139 return _build_autotest_crash_data( 

140 api=api, 

141 title="Autotest hook crashed", 

142 subject_label="Hook", 

143 subject_value=getattr(hook, "__qualname__", repr(hook)), 

144 error=error, 

145 context_lines=context_lines, 

146 trace_limit=trace_limit, 

147 truncation_context_lines=truncation_context_lines, 

148 ).report 

149 

150 

151def build_autotest_params_crash_report( 

152 *, 

153 api: object, 

154 params_provider: AutotestFunction, 

155 error: BaseException, 

156 context_lines: int = 3, 

157 trace_limit: int = 3, 

158 truncation_context_lines: int = 3, 

159) -> str: 

160 provider_name = getattr(params_provider, "__qualname__", repr(params_provider)) 

161 return _build_autotest_crash_data( 

162 api=api, 

163 title="Autotest params preparation crashed", 

164 subject_label="Params", 

165 subject_value=provider_name, 

166 error=error, 

167 context_lines=context_lines, 

168 trace_limit=trace_limit, 

169 truncation_context_lines=truncation_context_lines, 

170 ).report 

171 

172 

173def build_autotest_case_results_report( 

174 records: Sequence[tuple[str, str]], 

175) -> str: 

176 passed = sum(1 for _label, status in records if status == "passed") 

177 failed = sum(1 for _label, status in records if status == "failed") 

178 skipped = sum(1 for _label, status in records if status == "skipped") 

179 

180 heading = "Autotest case results" 

181 details: list[str] = [] 

182 if passed: 

183 details.append(f"{passed} passed") 

184 if failed: 

185 details.append(f"{failed} failed") 

186 if skipped: 

187 details.append(f"{skipped} skipped") 

188 if details: 

189 heading = f"{heading}: {', '.join(details)}" 

190 

191 if not records: 

192 return f"{heading}\n\n<no autotest cases recorded>" 

193 

194 table = Table( 

195 header_style="bold cyan", 

196 box=box.SIMPLE_HEAVY, 

197 expand=False, 

198 pad_edge=False, 

199 ) 

200 table.add_column("Test", overflow="fold") 

201 table.add_column("Status", no_wrap=True) 

202 

203 for label, status in records: 

204 table.add_row( 

205 label, 

206 Text(status, style=_status_style_for_case(status)), 

207 ) 

208 

209 buffer = io.StringIO() 

210 console = Console( 

211 file=buffer, 

212 force_terminal=True, 

213 color_system="standard", 

214 legacy_windows=False, 

215 width=Console().width, 

216 ) 

217 console.print(table) 

218 rendered = buffer.getvalue().rstrip("\r\n") 

219 return f"{heading}\n\n{rendered}" 

220 

221 

222def raise_autotest_method_crash( 

223 *, 

224 api: object, 

225 func: AutotestFunction, 

226 error: BaseException, 

227 source_func: AutotestFunction | None = None, 

228 detail_message: str | None = None, 

229 trace_limit: int = 3, 

230 truncation_context_lines: int = 3, 

231) -> NoReturn: 

232 raise AutotestMethodCrash( 

233 _build_autotest_crash_data( 

234 api=api, 

235 title="API method crashed", 

236 subject_label="Method", 

237 subject_value=getattr(func, "__qualname__", repr(func)), 

238 error=error, 

239 context_lines=truncation_context_lines, 

240 source_func=source_func, 

241 detail_message=detail_message, 

242 trace_limit=trace_limit, 

243 truncation_context_lines=truncation_context_lines, 

244 ) 

245 ) 

246 

247 

248def raise_autotest_method_output_error( 

249 *, 

250 api: object, 

251 func: AutotestFunction, 

252 error: BaseException, 

253 source_func: AutotestFunction | None = None, 

254 detail_message: str | None = None, 

255 trace_limit: int = 3, 

256 truncation_context_lines: int = 3, 

257) -> NoReturn: 

258 raise AutotestMethodCrash( 

259 _build_autotest_crash_data( 

260 api=api, 

261 title=_format_autotest_method_output_error_title(error), 

262 subject_label="Method", 

263 subject_value=getattr(func, "__qualname__", repr(func)), 

264 error=error, 

265 context_lines=truncation_context_lines, 

266 source_func=source_func or func, 

267 detail_message=detail_message, 

268 trace_limit=trace_limit, 

269 truncation_context_lines=truncation_context_lines, 

270 render_source_excerpt=False, 

271 ) 

272 ) 

273 

274 

275def raise_autotest_hook_crash( 

276 *, 

277 api: object, 

278 hook: AutotestFunction, 

279 error: BaseException, 

280 summary_message: str = "Autotest hook crashed", 

281 subject_label: str = "Hook", 

282 subject_value: str | None = None, 

283 source_func: AutotestFunction | None = None, 

284 detail_message: str | None = None, 

285 trace_limit: int = 3, 

286 truncation_context_lines: int = 3, 

287) -> NoReturn: 

288 resolved_subject_value = str(subject_value or getattr(hook, "__qualname__", repr(hook))) 

289 raise AutotestHookCrash( 

290 _build_autotest_crash_data( 

291 api=api, 

292 title=summary_message, 

293 subject_label=subject_label, 

294 subject_value=resolved_subject_value, 

295 error=error, 

296 context_lines=truncation_context_lines, 

297 source_func=source_func, 

298 detail_message=detail_message, 

299 trace_limit=trace_limit, 

300 truncation_context_lines=truncation_context_lines, 

301 ) 

302 ) 

303 

304 

305def raise_autotest_params_crash( 

306 *, 

307 api: object, 

308 params_provider: AutotestFunction, 

309 error: BaseException, 

310 summary_message: str = "Autotest params preparation crashed", 

311 subject_label: str = "Params", 

312 subject_value: str | None = None, 

313 source_func: AutotestFunction | None = None, 

314 detail_message: str | None = None, 

315 trace_limit: int = 3, 

316 truncation_context_lines: int = 3, 

317) -> NoReturn: 

318 provider_name = str( 

319 subject_value or getattr(params_provider, "__qualname__", repr(params_provider)) 

320 ) 

321 raise AutotestParamsCrash( 

322 _build_autotest_crash_data( 

323 api=api, 

324 title=summary_message, 

325 subject_label=subject_label, 

326 subject_value=provider_name, 

327 error=error, 

328 context_lines=truncation_context_lines, 

329 source_func=source_func or params_provider, 

330 detail_message=detail_message, 

331 trace_limit=trace_limit, 

332 truncation_context_lines=truncation_context_lines, 

333 ) 

334 ) 

335 

336 

337def _build_autotest_crash_data( 

338 *, 

339 api: object, 

340 title: str, 

341 subject_label: str, 

342 subject_value: str, 

343 error: BaseException, 

344 context_lines: int, 

345 trace_limit: int, 

346 truncation_context_lines: int, 

347 source_func: AutotestFunction | None = None, 

348 detail_message: str | None = None, 

349 render_source_excerpt: bool = True, 

350) -> AutotestCrashData: 

351 code_root = _resolve_code_root(api) 

352 trace_frames = _traceback_source_locations(error.__traceback__, code_root) 

353 frame = _select_source_frame( 

354 trace_frames, 

355 code_root, 

356 source_func=source_func, 

357 ) 

358 if frame is None and source_func is not None: 

359 source_frame = _source_location_from_callable(source_func) 

360 if source_frame is not None: 

361 frame = source_frame 

362 

363 if frame is not None: 

364 trace_frames = _drop_trace_source_frame( 

365 trace_frames, 

366 filename=frame.filename, 

367 lineno=frame.lineno, 

368 ) 

369 

370 rows = [ 

371 AutotestPanelRow( 

372 label=subject_label, 

373 value=subject_value, 

374 label_style="bold cyan", 

375 value_style="green", 

376 ), 

377 AutotestPanelRow( 

378 label="Error", 

379 value=error.__class__.__name__, 

380 label_style="bold cyan", 

381 value_style="yellow", 

382 ), 

383 AutotestPanelRow( 

384 label="Message", 

385 value=detail_message or _format_error_message(error), 

386 label_style="bold cyan", 

387 value_style="white", 

388 value_highlights=(("human_requests.abstraction.Output", "bold magenta"),), 

389 ), 

390 ] 

391 

392 lines = _render_panel(title, rows) 

393 lines.append("") 

394 if frame is None: 

395 lines.append("Source:") 

396 lines.append("<source unavailable>") 

397 source_path = "<source unavailable>" 

398 source_lineno = 0 

399 else: 

400 source_path = _format_source_path(frame.filename, code_root) 

401 source_lineno = frame.lineno 

402 if render_source_excerpt: 

403 rendered_frames = [frame] 

404 rendered_frames.extend(trace_frames) 

405 if trace_limit > 0: 

406 rendered_frames = rendered_frames[-trace_limit:] 

407 else: 

408 rendered_frames = [] 

409 lines.extend( 

410 _render_source_chain( 

411 rendered_frames, 

412 code_root=code_root, 

413 context_lines=context_lines, 

414 truncation_context_lines=truncation_context_lines, 

415 ) 

416 ) 

417 else: 

418 lines.append("Source:") 

419 lines.append(f"{source_path}:{source_lineno}") 

420 

421 return AutotestCrashData( 

422 summary_message=title, 

423 detail_message=detail_message or _format_error_message(error), 

424 report="\n".join(lines), 

425 source_path=source_path, 

426 source_lineno=source_lineno, 

427 ) 

428 

429 

430def _resolve_code_root(api: object) -> Path | None: 

431 module_name = getattr(type(api), "__module__", "") 

432 if not module_name: 

433 return None 

434 

435 top_level_name = module_name.split(".", 1)[0] 

436 module = sys.modules.get(top_level_name) 

437 if module is None: 

438 return None 

439 

440 module_path = getattr(module, "__path__", None) 

441 if module_path is not None: 

442 first_location = next(iter(module_path), None) 

443 if first_location is None: 

444 return None 

445 return Path(first_location).resolve() 

446 

447 module_file = getattr(module, "__file__", None) 

448 if module_file is None: 

449 return None 

450 return Path(module_file).resolve() 

451 

452 

453def _select_source_frame( 

454 frames: list[AutotestSourceLocation], 

455 code_root: Path | None, 

456 *, 

457 source_func: AutotestFunction | None = None, 

458) -> AutotestSourceLocation | None: 

459 if not frames: 

460 return None 

461 

462 if source_func is not None: 

463 source_location = _source_location_from_callable(source_func) 

464 if source_location is not None: 

465 for frame in frames: 

466 if _frame_matches_location(frame, source_location): 

467 return frame 

468 

469 if code_root is None: 

470 return frames[-1] 

471 

472 selected = None 

473 for frame in frames: 

474 if _is_within_root(frame.filename, code_root): 

475 selected = frame 

476 

477 if selected is not None: 

478 return selected 

479 return frames[-1] 

480 

481 

482def _traceback_source_locations( 

483 tb: TracebackType | None, 

484 code_root: Path | None, 

485) -> list[AutotestSourceLocation]: 

486 if tb is None: 

487 return [] 

488 

489 summaries = list(traceback.extract_tb(tb)) 

490 walk_frames = list(traceback.walk_tb(tb)) 

491 if not summaries or not walk_frames: 

492 return [] 

493 

494 locations: list[AutotestSourceLocation] = [] 

495 for summary, (frame, lineno) in zip(summaries, walk_frames): 

496 if code_root is not None and not _is_within_root(summary.filename, code_root): 

497 continue 

498 

499 location = _source_location_from_traceback_frame( 

500 frame, 

501 lineno, 

502 colno=summary.colno, 

503 end_colno=summary.end_colno, 

504 ) 

505 if location is None: 

506 assert summary.lineno is not None 

507 location = AutotestSourceLocation( 

508 filename=summary.filename, 

509 lineno=summary.lineno, 

510 colno=summary.colno, 

511 end_colno=summary.end_colno, 

512 ) 

513 locations.append(location) 

514 

515 return locations 

516 

517 

518def _drop_trace_source_frame( 

519 frames: list[AutotestSourceLocation], 

520 *, 

521 filename: str, 

522 lineno: int, 

523) -> list[AutotestSourceLocation]: 

524 resolved_filename = Path(filename).resolve() 

525 filtered: list[AutotestSourceLocation] = [] 

526 skipped = False 

527 

528 for frame in frames: 

529 try: 

530 frame_filename = Path(frame.filename).resolve() 

531 except OSError: 

532 frame_filename = Path(frame.filename) 

533 

534 if not skipped and frame_filename == resolved_filename and frame.lineno == lineno: 

535 skipped = True 

536 continue 

537 filtered.append(frame) 

538 

539 return filtered 

540 

541 

542def _frame_matches_location( 

543 frame: AutotestSourceLocation, location: AutotestSourceLocation 

544) -> bool: 

545 try: 

546 frame_path = Path(frame.filename).resolve() 

547 location_path = Path(location.filename).resolve() 

548 except OSError: 

549 return False 

550 

551 if frame_path != location_path: 

552 return False 

553 

554 if location.start_lineno is not None and frame.lineno < location.start_lineno: 

555 return False 

556 if location.end_lineno is not None and frame.lineno > location.end_lineno: 

557 return False 

558 return True 

559 

560 

561def _source_location_from_traceback_frame( 

562 frame: FrameType, 

563 lineno: int, 

564 *, 

565 colno: int | None = None, 

566 end_colno: int | None = None, 

567) -> AutotestSourceLocation | None: 

568 try: 

569 source_file = inspect.getsourcefile(frame) or inspect.getfile(frame) 

570 source_lines, start_lineno = inspect.getsourcelines(frame) 

571 except (OSError, TypeError): 

572 return None 

573 

574 if not source_file: 

575 return None 

576 

577 end_lineno = start_lineno + len(source_lines) - 1 

578 return AutotestSourceLocation( 

579 filename=str(Path(source_file).resolve()), 

580 lineno=lineno, 

581 start_lineno=start_lineno, 

582 end_lineno=end_lineno, 

583 colno=colno, 

584 end_colno=end_colno, 

585 ) 

586 

587 

588def _source_location_from_callable(func: AutotestFunction) -> AutotestSourceLocation | None: 

589 try: 

590 source_file = inspect.getsourcefile(func) or inspect.getfile(func) 

591 source_lines, start_lineno = inspect.getsourcelines(func) 

592 except (OSError, TypeError): 

593 return None 

594 

595 if not source_file: 

596 return None 

597 

598 end_lineno = start_lineno + len(source_lines) - 1 

599 focus_offset = 0 

600 for index in range(len(source_lines) - 1, -1, -1): 

601 if source_lines[index].strip(): 

602 focus_offset = index 

603 break 

604 

605 return AutotestSourceLocation( 

606 filename=str(Path(source_file).resolve()), 

607 lineno=start_lineno + focus_offset, 

608 start_lineno=start_lineno, 

609 end_lineno=end_lineno, 

610 ) 

611 

612 

613def _is_within_root(filename: str, code_root: Path) -> bool: 

614 if filename.startswith("<") and filename.endswith(">"): 

615 return False 

616 

617 try: 

618 return Path(filename).resolve().is_relative_to(code_root) 

619 except OSError: 

620 return False 

621 

622 

623def _format_error_message(error: BaseException) -> str: 

624 message = " ".join(str(error).split()) 

625 return message or "<no message>" 

626 

627 

628def _format_autotest_method_output_error_title(error: BaseException) -> str: 

629 if isinstance(error, JSONDecodeError): 

630 return "API method returned invalid JSON" 

631 return "API method returned invalid output" 

632 

633 

634def _format_source_path(filename: str, code_root: Path | None) -> str: 

635 if filename.startswith("<") and filename.endswith(">"): 

636 return filename 

637 

638 path = Path(filename).resolve() 

639 

640 cwd = Path.cwd().resolve() 

641 try: 

642 return path.relative_to(cwd).as_posix() 

643 except ValueError: 

644 pass 

645 

646 if code_root is not None: 

647 base = code_root.parent 

648 try: 

649 return path.relative_to(base).as_posix() 

650 except ValueError: 

651 pass 

652 

653 return path.as_posix() 

654 

655 

656def _render_source_excerpt( 

657 frame: Any, 

658 *, 

659 context_lines: int, 

660 truncation_context_lines: int, 

661) -> list[str]: 

662 if frame.filename.startswith("<") and frame.filename.endswith(">"): 

663 return ["<source unavailable>"] 

664 

665 path = Path(frame.filename).resolve() 

666 lines = linecache.getlines(str(path)) 

667 if not lines: 

668 return ["<source unavailable>"] 

669 

670 error_line_index = max(frame.lineno - 1, 0) 

671 start_bound = 0 

672 end_bound = len(lines) 

673 

674 source_start_lineno = getattr(frame, "start_lineno", None) 

675 source_end_lineno = getattr(frame, "end_lineno", None) 

676 if isinstance(source_start_lineno, int): 

677 start_bound = max(source_start_lineno - 1, 0) 

678 if isinstance(source_end_lineno, int): 

679 end_bound = min(source_end_lineno, len(lines)) 

680 

681 common_indent = _common_leading_whitespace_prefix(lines, start=start_bound, end=end_bound) 

682 common_indent_len = len(common_indent) 

683 head_start = start_bound 

684 head_end = min(start_bound + max(truncation_context_lines, 0) + 1, end_bound) 

685 tail_start = max(error_line_index - context_lines, start_bound) 

686 tail_end = min(error_line_index + context_lines + 1, end_bound) 

687 

688 line_no_width = len(str(end_bound)) 

689 

690 if head_end >= tail_start: 

691 rendered = _render_source_excerpt_range( 

692 lines, 

693 start=head_start, 

694 end=tail_end, 

695 line_no_width=line_no_width, 

696 error_lineno=frame.lineno, 

697 colno=getattr(frame, "colno", None), 

698 end_colno=getattr(frame, "end_colno", None), 

699 common_indent=common_indent, 

700 common_indent_len=common_indent_len, 

701 ) 

702 if tail_end < end_bound: 

703 hidden_lines = end_bound - tail_end 

704 if hidden_lines > 0: 

705 rendered.append(_render_truncated_notice(line_no_width, hidden_lines)) 

706 return rendered 

707 

708 rendered = _render_source_excerpt_range( 

709 lines, 

710 start=head_start, 

711 end=head_end, 

712 line_no_width=line_no_width, 

713 error_lineno=frame.lineno, 

714 colno=getattr(frame, "colno", None), 

715 end_colno=getattr(frame, "end_colno", None), 

716 common_indent=common_indent, 

717 common_indent_len=common_indent_len, 

718 ) 

719 if not rendered: 

720 return ["<source unavailable>"] 

721 

722 hidden_lines = tail_start - head_end 

723 if hidden_lines > 0: 

724 rendered.append(_render_truncated_notice(line_no_width, hidden_lines)) 

725 tail_rendered = _render_source_excerpt_range( 

726 lines, 

727 start=tail_start, 

728 end=tail_end, 

729 line_no_width=line_no_width, 

730 error_lineno=frame.lineno, 

731 colno=getattr(frame, "colno", None), 

732 end_colno=getattr(frame, "end_colno", None), 

733 common_indent=common_indent, 

734 common_indent_len=common_indent_len, 

735 ) 

736 if tail_rendered: 

737 rendered.extend(tail_rendered) 

738 if tail_end < end_bound: 

739 hidden_lines = end_bound - tail_end 

740 if hidden_lines > 0: 

741 rendered.append(_render_truncated_notice(line_no_width, hidden_lines)) 

742 return rendered 

743 

744 

745def _render_source_excerpt_range( 

746 lines: list[str], 

747 *, 

748 start: int, 

749 end: int, 

750 line_no_width: int, 

751 error_lineno: int, 

752 colno: int | None, 

753 end_colno: int | None, 

754 common_indent: str, 

755 common_indent_len: int, 

756) -> list[str]: 

757 if start >= end: 

758 return [] 

759 

760 while start < end and not lines[start].strip(): 

761 start += 1 

762 while end > start and not lines[end - 1].strip(): 

763 end -= 1 

764 

765 if start >= end: 

766 return [] 

767 

768 rendered: list[str] = [] 

769 for index in range(start, end): 

770 line_no = index + 1 

771 source_line = lines[index].rstrip("\r\n") 

772 if common_indent and source_line.startswith(common_indent): 

773 source_line = source_line[common_indent_len:] 

774 rendered_source = _highlight_python_source_line(source_line) 

775 rendered.append(f"{line_no:>{line_no_width}}{rendered_source}") 

776 

777 if line_no != error_lineno: 

778 continue 

779 

780 if not isinstance(colno, int): 

781 continue 

782 

783 display_colno = max(colno - common_indent_len, 0) 

784 pointer_width = 1 

785 if isinstance(end_colno, int) and end_colno > colno: 

786 display_end_colno = max(end_colno - common_indent_len, display_colno + 1) 

787 pointer_width = display_end_colno - display_colno 

788 

789 pointer = " " * display_colno + "^" * max(pointer_width, 1) 

790 rendered.append(f"{' ' * line_no_width}{pointer}") 

791 

792 return rendered 

793 

794 

795def _common_leading_whitespace_prefix(lines: list[str], *, start: int, end: int) -> str: 

796 prefixes: list[str] = [] 

797 for index in range(start, end): 

798 source_line = lines[index].rstrip("\r\n") 

799 if not source_line.strip(): 

800 continue 

801 

802 whitespace_end = 0 

803 while whitespace_end < len(source_line) and source_line[whitespace_end] in " \t": 

804 whitespace_end += 1 

805 prefixes.append(source_line[:whitespace_end]) 

806 

807 if not prefixes: 

808 return "" 

809 

810 return os.path.commonprefix(prefixes) 

811 

812 

813def _render_truncated_notice(line_no_width: int, skipped_lines: int) -> str: 

814 suffix = "line" if skipped_lines == 1 else "lines" 

815 return f"{' ' * line_no_width}" + _render_styled_text( 

816 f"… skipped {skipped_lines} {suffix}", 

817 style="bold bright_black", 

818 ) 

819 

820 

821def _status_style_for_case(status: str) -> str: 

822 if status == "passed": 

823 return "green" 

824 if status == "failed": 

825 return "red" 

826 if status == "skipped": 

827 return "yellow" 

828 return "white" 

829 

830 

831def _render_source_chain( 

832 frames: list[AutotestSourceLocation], 

833 *, 

834 code_root: Path | None, 

835 context_lines: int, 

836 truncation_context_lines: int, 

837) -> list[str]: 

838 if not frames: 

839 return [] 

840 

841 lines: list[str] = [] 

842 for index, frame in enumerate(frames): 

843 if index > 0: 

844 lines.append("") 

845 lines.append("Source:") 

846 source_path = _format_source_path(frame.filename, code_root) 

847 lines.append(f"{source_path}:{frame.lineno}") 

848 lines.extend( 

849 _render_source_excerpt( 

850 frame, 

851 context_lines=context_lines, 

852 truncation_context_lines=truncation_context_lines, 

853 ) 

854 ) 

855 return lines 

856 

857 

858def _highlight_python_source_line(source_line: str) -> str: 

859 if not source_line.strip(): 

860 return source_line 

861 

862 buffer = io.StringIO() 

863 console = Console( 

864 file=buffer, 

865 force_terminal=True, 

866 color_system="standard", 

867 legacy_windows=False, 

868 width=max(len(source_line) + 4, 24), 

869 ) 

870 console.print( 

871 Syntax( 

872 source_line, 

873 "python", 

874 theme="monokai", 

875 background_color="default", 

876 line_numbers=False, 

877 word_wrap=False, 

878 indent_guides=False, 

879 ), 

880 end="", 

881 ) 

882 return buffer.getvalue().rstrip("\r\n") 

883 

884 

885def _render_panel(title: str, rows: list[AutotestPanelRow]) -> list[str]: 

886 table = Table.grid(padding=(0, 2)) 

887 table.add_column(no_wrap=True) 

888 table.add_column(overflow="fold") 

889 

890 for row in rows: 

891 table.add_row( 

892 Text.from_ansi(_render_styled_text(row.label, style=row.label_style)), 

893 Text.from_ansi( 

894 _render_styled_text( 

895 row.value, 

896 style=row.value_style, 

897 highlights=row.value_highlights, 

898 ) 

899 ), 

900 ) 

901 

902 buffer = io.StringIO() 

903 console = Console( 

904 file=buffer, 

905 force_terminal=True, 

906 color_system="standard", 

907 legacy_windows=False, 

908 width=Console().width, 

909 ) 

910 console.print(Panel(table, title=title, expand=False)) 

911 return buffer.getvalue().rstrip("\r\n").splitlines() 

912 

913 

914def _render_styled_text( 

915 text: str, 

916 *, 

917 style: str | None = None, 

918 highlights: tuple[tuple[str, str], ...] = (), 

919) -> str: 

920 if style is None and not highlights: 

921 return text 

922 

923 if style is None: 

924 rich_text = Text(text) 

925 else: 

926 rich_text = Text(text, style=style) 

927 for needle, highlight_style in highlights: 

928 start = 0 

929 while True: 

930 index = rich_text.plain.find(needle, start) 

931 if index == -1: 

932 break 

933 rich_text.stylize(highlight_style, index, index + len(needle)) 

934 start = index + len(needle) 

935 

936 buffer = io.StringIO() 

937 console = Console( 

938 file=buffer, 

939 force_terminal=True, 

940 color_system="standard", 

941 legacy_windows=False, 

942 width=max(len(text) + 4, 24), 

943 ) 

944 console.print(rich_text, end="") 

945 return buffer.getvalue().rstrip("\r\n")