from __future__ import annotations
import inspect
import io
import linecache
import os
import sys
import traceback
from dataclasses import dataclass
from json import JSONDecodeError
from pathlib import Path
from types import FrameType, TracebackType
from typing import Any, Callable, NoReturn, Sequence
from _pytest._code.code import ReprExceptionInfo, ReprFileLocation, ReprTracebackNative
from rich import box
from rich.console import Console
from rich.panel import Panel
from rich.syntax import Syntax
from rich.table import Table
from rich.text import Text
[docs]
AutotestFunction = Callable[..., Any]
@dataclass(frozen=True)
[docs]
class AutotestCrashData:
@dataclass(frozen=True)
[docs]
class AutotestSourceLocation:
[docs]
start_lineno: int | None = None
[docs]
end_lineno: int | None = None
[docs]
colno: int | None = None
[docs]
end_colno: int | None = None
@dataclass(frozen=True)
[docs]
class AutotestPanelRow:
[docs]
label_style: str | None = None
[docs]
value_style: str | None = None
[docs]
value_highlights: tuple[tuple[str, str], ...] = ()
[docs]
class AutotestCrash(RuntimeError):
def __init__(self, data: AutotestCrashData) -> None:
super().__init__(data.summary_message)
[docs]
self.summary_message = data.summary_message
[docs]
self.detail_message = data.detail_message
[docs]
self.report = data.report
[docs]
self.source_path = data.source_path
[docs]
self.source_lineno = data.source_lineno
[docs]
def to_longrepr(self) -> ReprExceptionInfo:
return ReprExceptionInfo(
reprtraceback=ReprTracebackNative([self.report + "\n"]),
reprcrash=ReprFileLocation(
path=self.source_path,
lineno=self.source_lineno,
message=self.summary_message,
),
)
[docs]
class AutotestMethodCrash(AutotestCrash):
pass
[docs]
class AutotestHookCrash(AutotestCrash):
pass
[docs]
class AutotestParamsCrash(AutotestCrash):
pass
[docs]
def build_autotest_method_crash_report(
*,
api: object,
func: AutotestFunction,
error: BaseException,
context_lines: int = 3,
trace_limit: int = 3,
truncation_context_lines: int = 3,
) -> str:
return _build_autotest_crash_data(
api=api,
title="API method crashed",
subject_label="Method",
subject_value=getattr(func, "__qualname__", repr(func)),
error=error,
context_lines=context_lines,
trace_limit=trace_limit,
truncation_context_lines=truncation_context_lines,
).report
[docs]
def build_autotest_method_output_error_report(
*,
api: object,
func: AutotestFunction,
error: BaseException,
context_lines: int = 3,
trace_limit: int = 3,
truncation_context_lines: int = 3,
) -> str:
return _build_autotest_crash_data(
api=api,
title=_format_autotest_method_output_error_title(error),
subject_label="Method",
subject_value=getattr(func, "__qualname__", repr(func)),
error=error,
context_lines=context_lines,
trace_limit=trace_limit,
truncation_context_lines=truncation_context_lines,
source_func=func,
render_source_excerpt=False,
).report
[docs]
def build_autotest_hook_crash_report(
*,
api: object,
hook: AutotestFunction,
error: BaseException,
context_lines: int = 3,
trace_limit: int = 3,
truncation_context_lines: int = 3,
) -> str:
return _build_autotest_crash_data(
api=api,
title="Autotest hook crashed",
subject_label="Hook",
subject_value=getattr(hook, "__qualname__", repr(hook)),
error=error,
context_lines=context_lines,
trace_limit=trace_limit,
truncation_context_lines=truncation_context_lines,
).report
[docs]
def build_autotest_params_crash_report(
*,
api: object,
params_provider: AutotestFunction,
error: BaseException,
context_lines: int = 3,
trace_limit: int = 3,
truncation_context_lines: int = 3,
) -> str:
provider_name = getattr(params_provider, "__qualname__", repr(params_provider))
return _build_autotest_crash_data(
api=api,
title="Autotest params preparation crashed",
subject_label="Params",
subject_value=provider_name,
error=error,
context_lines=context_lines,
trace_limit=trace_limit,
truncation_context_lines=truncation_context_lines,
).report
[docs]
def build_autotest_case_results_report(
records: Sequence[tuple[str, str]],
) -> str:
passed = sum(1 for _label, status in records if status == "passed")
failed = sum(1 for _label, status in records if status == "failed")
skipped = sum(1 for _label, status in records if status == "skipped")
heading = "Autotest case results"
details: list[str] = []
if passed:
details.append(f"{passed} passed")
if failed:
details.append(f"{failed} failed")
if skipped:
details.append(f"{skipped} skipped")
if details:
heading = f"{heading}: {', '.join(details)}"
if not records:
return f"{heading}\n\n<no autotest cases recorded>"
table = Table(
header_style="bold cyan",
box=box.SIMPLE_HEAVY,
expand=False,
pad_edge=False,
)
table.add_column("Test", overflow="fold")
table.add_column("Status", no_wrap=True)
for label, status in records:
table.add_row(
label,
Text(status, style=_status_style_for_case(status)),
)
buffer = io.StringIO()
console = Console(
file=buffer,
force_terminal=True,
color_system="standard",
legacy_windows=False,
width=Console().width,
)
console.print(table)
rendered = buffer.getvalue().rstrip("\r\n")
return f"{heading}\n\n{rendered}"
[docs]
def raise_autotest_method_crash(
*,
api: object,
func: AutotestFunction,
error: BaseException,
source_func: AutotestFunction | None = None,
detail_message: str | None = None,
trace_limit: int = 3,
truncation_context_lines: int = 3,
) -> NoReturn:
raise AutotestMethodCrash(
_build_autotest_crash_data(
api=api,
title="API method crashed",
subject_label="Method",
subject_value=getattr(func, "__qualname__", repr(func)),
error=error,
context_lines=truncation_context_lines,
source_func=source_func,
detail_message=detail_message,
trace_limit=trace_limit,
truncation_context_lines=truncation_context_lines,
)
)
[docs]
def raise_autotest_method_output_error(
*,
api: object,
func: AutotestFunction,
error: BaseException,
source_func: AutotestFunction | None = None,
detail_message: str | None = None,
trace_limit: int = 3,
truncation_context_lines: int = 3,
) -> NoReturn:
raise AutotestMethodCrash(
_build_autotest_crash_data(
api=api,
title=_format_autotest_method_output_error_title(error),
subject_label="Method",
subject_value=getattr(func, "__qualname__", repr(func)),
error=error,
context_lines=truncation_context_lines,
source_func=source_func or func,
detail_message=detail_message,
trace_limit=trace_limit,
truncation_context_lines=truncation_context_lines,
render_source_excerpt=False,
)
)
[docs]
def raise_autotest_hook_crash(
*,
api: object,
hook: AutotestFunction,
error: BaseException,
summary_message: str = "Autotest hook crashed",
subject_label: str = "Hook",
subject_value: str | None = None,
source_func: AutotestFunction | None = None,
detail_message: str | None = None,
trace_limit: int = 3,
truncation_context_lines: int = 3,
) -> NoReturn:
resolved_subject_value = str(subject_value or getattr(hook, "__qualname__", repr(hook)))
raise AutotestHookCrash(
_build_autotest_crash_data(
api=api,
title=summary_message,
subject_label=subject_label,
subject_value=resolved_subject_value,
error=error,
context_lines=truncation_context_lines,
source_func=source_func,
detail_message=detail_message,
trace_limit=trace_limit,
truncation_context_lines=truncation_context_lines,
)
)
[docs]
def raise_autotest_params_crash(
*,
api: object,
params_provider: AutotestFunction,
error: BaseException,
summary_message: str = "Autotest params preparation crashed",
subject_label: str = "Params",
subject_value: str | None = None,
source_func: AutotestFunction | None = None,
detail_message: str | None = None,
trace_limit: int = 3,
truncation_context_lines: int = 3,
) -> NoReturn:
provider_name = str(
subject_value or getattr(params_provider, "__qualname__", repr(params_provider))
)
raise AutotestParamsCrash(
_build_autotest_crash_data(
api=api,
title=summary_message,
subject_label=subject_label,
subject_value=provider_name,
error=error,
context_lines=truncation_context_lines,
source_func=source_func or params_provider,
detail_message=detail_message,
trace_limit=trace_limit,
truncation_context_lines=truncation_context_lines,
)
)
def _build_autotest_crash_data(
*,
api: object,
title: str,
subject_label: str,
subject_value: str,
error: BaseException,
context_lines: int,
trace_limit: int,
truncation_context_lines: int,
source_func: AutotestFunction | None = None,
detail_message: str | None = None,
render_source_excerpt: bool = True,
) -> AutotestCrashData:
code_root = _resolve_code_root(api)
trace_frames = _traceback_source_locations(error.__traceback__, code_root)
frame = _select_source_frame(
trace_frames,
code_root,
source_func=source_func,
)
if frame is None and source_func is not None:
source_frame = _source_location_from_callable(source_func)
if source_frame is not None:
frame = source_frame
if frame is not None:
trace_frames = _drop_trace_source_frame(
trace_frames,
filename=frame.filename,
lineno=frame.lineno,
)
rows = [
AutotestPanelRow(
label=subject_label,
value=subject_value,
label_style="bold cyan",
value_style="green",
),
AutotestPanelRow(
label="Error",
value=error.__class__.__name__,
label_style="bold cyan",
value_style="yellow",
),
AutotestPanelRow(
label="Message",
value=detail_message or _format_error_message(error),
label_style="bold cyan",
value_style="white",
value_highlights=(("human_requests.abstraction.Output", "bold magenta"),),
),
]
lines = _render_panel(title, rows)
lines.append("")
if frame is None:
lines.append("Source:")
lines.append("<source unavailable>")
source_path = "<source unavailable>"
source_lineno = 0
else:
source_path = _format_source_path(frame.filename, code_root)
source_lineno = frame.lineno
if render_source_excerpt:
rendered_frames = [frame]
rendered_frames.extend(trace_frames)
if trace_limit > 0:
rendered_frames = rendered_frames[-trace_limit:]
else:
rendered_frames = []
lines.extend(
_render_source_chain(
rendered_frames,
code_root=code_root,
context_lines=context_lines,
truncation_context_lines=truncation_context_lines,
)
)
else:
lines.append("Source:")
lines.append(f"{source_path}:{source_lineno}")
return AutotestCrashData(
summary_message=title,
detail_message=detail_message or _format_error_message(error),
report="\n".join(lines),
source_path=source_path,
source_lineno=source_lineno,
)
def _resolve_code_root(api: object) -> Path | None:
module_name = getattr(type(api), "__module__", "")
if not module_name:
return None
top_level_name = module_name.split(".", 1)[0]
module = sys.modules.get(top_level_name)
if module is None:
return None
module_path = getattr(module, "__path__", None)
if module_path is not None:
first_location = next(iter(module_path), None)
if first_location is None:
return None
return Path(first_location).resolve()
module_file = getattr(module, "__file__", None)
if module_file is None:
return None
return Path(module_file).resolve()
def _select_source_frame(
frames: list[AutotestSourceLocation],
code_root: Path | None,
*,
source_func: AutotestFunction | None = None,
) -> AutotestSourceLocation | None:
if not frames:
return None
if source_func is not None:
source_location = _source_location_from_callable(source_func)
if source_location is not None:
for frame in frames:
if _frame_matches_location(frame, source_location):
return frame
if code_root is None:
return frames[-1]
selected = None
for frame in frames:
if _is_within_root(frame.filename, code_root):
selected = frame
if selected is not None:
return selected
return frames[-1]
def _traceback_source_locations(
tb: TracebackType | None,
code_root: Path | None,
) -> list[AutotestSourceLocation]:
if tb is None:
return []
summaries = list(traceback.extract_tb(tb))
walk_frames = list(traceback.walk_tb(tb))
if not summaries or not walk_frames:
return []
locations: list[AutotestSourceLocation] = []
for summary, (frame, lineno) in zip(summaries, walk_frames):
if code_root is not None and not _is_within_root(summary.filename, code_root):
continue
location = _source_location_from_traceback_frame(
frame,
lineno,
colno=summary.colno,
end_colno=summary.end_colno,
)
if location is None:
assert summary.lineno is not None
location = AutotestSourceLocation(
filename=summary.filename,
lineno=summary.lineno,
colno=summary.colno,
end_colno=summary.end_colno,
)
locations.append(location)
return locations
def _drop_trace_source_frame(
frames: list[AutotestSourceLocation],
*,
filename: str,
lineno: int,
) -> list[AutotestSourceLocation]:
resolved_filename = Path(filename).resolve()
filtered: list[AutotestSourceLocation] = []
skipped = False
for frame in frames:
try:
frame_filename = Path(frame.filename).resolve()
except OSError:
frame_filename = Path(frame.filename)
if not skipped and frame_filename == resolved_filename and frame.lineno == lineno:
skipped = True
continue
filtered.append(frame)
return filtered
def _frame_matches_location(
frame: AutotestSourceLocation, location: AutotestSourceLocation
) -> bool:
try:
frame_path = Path(frame.filename).resolve()
location_path = Path(location.filename).resolve()
except OSError:
return False
if frame_path != location_path:
return False
if location.start_lineno is not None and frame.lineno < location.start_lineno:
return False
if location.end_lineno is not None and frame.lineno > location.end_lineno:
return False
return True
def _source_location_from_traceback_frame(
frame: FrameType,
lineno: int,
*,
colno: int | None = None,
end_colno: int | None = None,
) -> AutotestSourceLocation | None:
try:
source_file = inspect.getsourcefile(frame) or inspect.getfile(frame)
source_lines, start_lineno = inspect.getsourcelines(frame)
except (OSError, TypeError):
return None
if not source_file:
return None
end_lineno = start_lineno + len(source_lines) - 1
return AutotestSourceLocation(
filename=str(Path(source_file).resolve()),
lineno=lineno,
start_lineno=start_lineno,
end_lineno=end_lineno,
colno=colno,
end_colno=end_colno,
)
def _source_location_from_callable(func: AutotestFunction) -> AutotestSourceLocation | None:
try:
source_file = inspect.getsourcefile(func) or inspect.getfile(func)
source_lines, start_lineno = inspect.getsourcelines(func)
except (OSError, TypeError):
return None
if not source_file:
return None
end_lineno = start_lineno + len(source_lines) - 1
focus_offset = 0
for index in range(len(source_lines) - 1, -1, -1):
if source_lines[index].strip():
focus_offset = index
break
return AutotestSourceLocation(
filename=str(Path(source_file).resolve()),
lineno=start_lineno + focus_offset,
start_lineno=start_lineno,
end_lineno=end_lineno,
)
def _is_within_root(filename: str, code_root: Path) -> bool:
if filename.startswith("<") and filename.endswith(">"):
return False
try:
return Path(filename).resolve().is_relative_to(code_root)
except OSError:
return False
def _format_error_message(error: BaseException) -> str:
message = " ".join(str(error).split())
return message or "<no message>"
def _format_autotest_method_output_error_title(error: BaseException) -> str:
if isinstance(error, JSONDecodeError):
return "API method returned invalid JSON"
return "API method returned invalid output"
def _format_source_path(filename: str, code_root: Path | None) -> str:
if filename.startswith("<") and filename.endswith(">"):
return filename
path = Path(filename).resolve()
cwd = Path.cwd().resolve()
try:
return path.relative_to(cwd).as_posix()
except ValueError:
pass
if code_root is not None:
base = code_root.parent
try:
return path.relative_to(base).as_posix()
except ValueError:
pass
return path.as_posix()
def _render_source_excerpt(
frame: Any,
*,
context_lines: int,
truncation_context_lines: int,
) -> list[str]:
if frame.filename.startswith("<") and frame.filename.endswith(">"):
return ["<source unavailable>"]
path = Path(frame.filename).resolve()
lines = linecache.getlines(str(path))
if not lines:
return ["<source unavailable>"]
error_line_index = max(frame.lineno - 1, 0)
start_bound = 0
end_bound = len(lines)
source_start_lineno = getattr(frame, "start_lineno", None)
source_end_lineno = getattr(frame, "end_lineno", None)
if isinstance(source_start_lineno, int):
start_bound = max(source_start_lineno - 1, 0)
if isinstance(source_end_lineno, int):
end_bound = min(source_end_lineno, len(lines))
common_indent = _common_leading_whitespace_prefix(lines, start=start_bound, end=end_bound)
common_indent_len = len(common_indent)
head_start = start_bound
head_end = min(start_bound + max(truncation_context_lines, 0) + 1, end_bound)
tail_start = max(error_line_index - context_lines, start_bound)
tail_end = min(error_line_index + context_lines + 1, end_bound)
line_no_width = len(str(end_bound))
if head_end >= tail_start:
rendered = _render_source_excerpt_range(
lines,
start=head_start,
end=tail_end,
line_no_width=line_no_width,
error_lineno=frame.lineno,
colno=getattr(frame, "colno", None),
end_colno=getattr(frame, "end_colno", None),
common_indent=common_indent,
common_indent_len=common_indent_len,
)
if tail_end < end_bound:
hidden_lines = end_bound - tail_end
if hidden_lines > 0:
rendered.append(_render_truncated_notice(line_no_width, hidden_lines))
return rendered
rendered = _render_source_excerpt_range(
lines,
start=head_start,
end=head_end,
line_no_width=line_no_width,
error_lineno=frame.lineno,
colno=getattr(frame, "colno", None),
end_colno=getattr(frame, "end_colno", None),
common_indent=common_indent,
common_indent_len=common_indent_len,
)
if not rendered:
return ["<source unavailable>"]
hidden_lines = tail_start - head_end
if hidden_lines > 0:
rendered.append(_render_truncated_notice(line_no_width, hidden_lines))
tail_rendered = _render_source_excerpt_range(
lines,
start=tail_start,
end=tail_end,
line_no_width=line_no_width,
error_lineno=frame.lineno,
colno=getattr(frame, "colno", None),
end_colno=getattr(frame, "end_colno", None),
common_indent=common_indent,
common_indent_len=common_indent_len,
)
if tail_rendered:
rendered.extend(tail_rendered)
if tail_end < end_bound:
hidden_lines = end_bound - tail_end
if hidden_lines > 0:
rendered.append(_render_truncated_notice(line_no_width, hidden_lines))
return rendered
def _render_source_excerpt_range(
lines: list[str],
*,
start: int,
end: int,
line_no_width: int,
error_lineno: int,
colno: int | None,
end_colno: int | None,
common_indent: str,
common_indent_len: int,
) -> list[str]:
if start >= end:
return []
while start < end and not lines[start].strip():
start += 1
while end > start and not lines[end - 1].strip():
end -= 1
if start >= end:
return []
rendered: list[str] = []
for index in range(start, end):
line_no = index + 1
source_line = lines[index].rstrip("\r\n")
if common_indent and source_line.startswith(common_indent):
source_line = source_line[common_indent_len:]
rendered_source = _highlight_python_source_line(source_line)
rendered.append(f"{line_no:>{line_no_width}} │ {rendered_source}")
if line_no != error_lineno:
continue
if not isinstance(colno, int):
continue
display_colno = max(colno - common_indent_len, 0)
pointer_width = 1
if isinstance(end_colno, int) and end_colno > colno:
display_end_colno = max(end_colno - common_indent_len, display_colno + 1)
pointer_width = display_end_colno - display_colno
pointer = " " * display_colno + "^" * max(pointer_width, 1)
rendered.append(f"{' ' * line_no_width} │ {pointer}")
return rendered
def _common_leading_whitespace_prefix(lines: list[str], *, start: int, end: int) -> str:
prefixes: list[str] = []
for index in range(start, end):
source_line = lines[index].rstrip("\r\n")
if not source_line.strip():
continue
whitespace_end = 0
while whitespace_end < len(source_line) and source_line[whitespace_end] in " \t":
whitespace_end += 1
prefixes.append(source_line[:whitespace_end])
if not prefixes:
return ""
return os.path.commonprefix(prefixes)
def _render_truncated_notice(line_no_width: int, skipped_lines: int) -> str:
suffix = "line" if skipped_lines == 1 else "lines"
return f"{' ' * line_no_width} │ " + _render_styled_text(
f"… skipped {skipped_lines} {suffix} …",
style="bold bright_black",
)
def _status_style_for_case(status: str) -> str:
if status == "passed":
return "green"
if status == "failed":
return "red"
if status == "skipped":
return "yellow"
return "white"
def _render_source_chain(
frames: list[AutotestSourceLocation],
*,
code_root: Path | None,
context_lines: int,
truncation_context_lines: int,
) -> list[str]:
if not frames:
return []
lines: list[str] = []
for index, frame in enumerate(frames):
if index > 0:
lines.append("")
lines.append("Source:")
source_path = _format_source_path(frame.filename, code_root)
lines.append(f"{source_path}:{frame.lineno}")
lines.extend(
_render_source_excerpt(
frame,
context_lines=context_lines,
truncation_context_lines=truncation_context_lines,
)
)
return lines
def _highlight_python_source_line(source_line: str) -> str:
if not source_line.strip():
return source_line
buffer = io.StringIO()
console = Console(
file=buffer,
force_terminal=True,
color_system="standard",
legacy_windows=False,
width=max(len(source_line) + 4, 24),
)
console.print(
Syntax(
source_line,
"python",
theme="monokai",
background_color="default",
line_numbers=False,
word_wrap=False,
indent_guides=False,
),
end="",
)
return buffer.getvalue().rstrip("\r\n")
def _render_panel(title: str, rows: list[AutotestPanelRow]) -> list[str]:
table = Table.grid(padding=(0, 2))
table.add_column(no_wrap=True)
table.add_column(overflow="fold")
for row in rows:
table.add_row(
Text.from_ansi(_render_styled_text(row.label, style=row.label_style)),
Text.from_ansi(
_render_styled_text(
row.value,
style=row.value_style,
highlights=row.value_highlights,
)
),
)
buffer = io.StringIO()
console = Console(
file=buffer,
force_terminal=True,
color_system="standard",
legacy_windows=False,
width=Console().width,
)
console.print(Panel(table, title=title, expand=False))
return buffer.getvalue().rstrip("\r\n").splitlines()
def _render_styled_text(
text: str,
*,
style: str | None = None,
highlights: tuple[tuple[str, str], ...] = (),
) -> str:
if style is None and not highlights:
return text
if style is None:
rich_text = Text(text)
else:
rich_text = Text(text, style=style)
for needle, highlight_style in highlights:
start = 0
while True:
index = rich_text.plain.find(needle, start)
if index == -1:
break
rich_text.stylize(highlight_style, index, index + len(needle))
start = index + len(needle)
buffer = io.StringIO()
console = Console(
file=buffer,
force_terminal=True,
color_system="standard",
legacy_windows=False,
width=max(len(text) + 4, 24),
)
console.print(rich_text, end="")
return buffer.getvalue().rstrip("\r\n")