#!/usr/bin/env python3 """Generate and serve a review page for eval results. Reads the workspace directory, discovers runs (directories with outputs/), embeds all output data into a self-contained HTML page, and serves it via a tiny HTTP server. Feedback auto-saves to feedback.json in the workspace. Usage: python generate_review.py [--port PORT] [--skill-name NAME] python generate_review.py --previous-feedback /path/to/old/feedback.json No dependencies beyond the Python stdlib are required. """ import argparse import base64 import json import mimetypes import os import re import signal import subprocess import sys import time import webbrowser from functools import partial from http.server import HTTPServer, BaseHTTPRequestHandler from pathlib import Path # Files to exclude from output listings METADATA_FILES = {"transcript.md", "user_notes.md", "metrics.json"} # Extensions we render as inline text TEXT_EXTENSIONS = { ".txt", ".md", ".json", ".csv", ".py", ".js", ".ts", ".tsx", ".jsx", ".yaml", ".yml", ".xml", ".html", ".css", ".sh", ".rb", ".go", ".rs", ".java", ".c", ".cpp", ".h", ".hpp", ".sql", ".r", ".toml", } # Extensions we render as inline images IMAGE_EXTENSIONS = {".png", ".jpg", ".jpeg", ".gif", ".svg", ".webp"} # MIME type overrides for common types MIME_OVERRIDES = { ".svg": "image/svg+xml", ".xlsx": "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", ".docx": "application/vnd.openxmlformats-officedocument.wordprocessingml.document", ".pptx": "application/vnd.openxmlformats-officedocument.presentationml.presentation", } def get_mime_type(path: Path) -> str: ext = path.suffix.lower() if ext in MIME_OVERRIDES: return MIME_OVERRIDES[ext] mime, _ = mimetypes.guess_type(str(path)) return mime or "application/octet-stream" def find_runs(workspace: Path) -> list[dict]: """Recursively find directories that contain an outputs/ subdirectory.""" runs: list[dict] = [] _find_runs_recursive(workspace, workspace, runs) runs.sort(key=lambda r: (r.get("eval_id", float("inf")), r["id"])) return runs def _find_runs_recursive(root: Path, current: Path, runs: list[dict]) -> None: if not current.is_dir(): return outputs_dir = current / "outputs" if outputs_dir.is_dir(): run = build_run(root, current) if run: runs.append(run) return skip = {"node_modules", ".git", "__pycache__", "skill", "inputs"} for child in sorted(current.iterdir()): if child.is_dir() and child.name not in skip: _find_runs_recursive(root, child, runs) def build_run(root: Path, run_dir: Path) -> dict | None: """Build a run dict with prompt, outputs, and grading data.""" prompt = "" eval_id = None # Try eval_metadata.json for candidate in [run_dir / "eval_metadata.json", run_dir.parent / "eval_metadata.json"]: if candidate.exists(): try: metadata = json.loads(candidate.read_text()) prompt = metadata.get("prompt", "") eval_id = metadata.get("eval_id") except (json.JSONDecodeError, OSError): pass if prompt: break # Fall back to transcript.md if not prompt: for candidate in [run_dir / "transcript.md", run_dir / "outputs" / "transcript.md"]: if candidate.exists(): try: text = candidate.read_text() match = re.search(r"## Eval Prompt\n\n([\s\S]*?)(?=\n##|$)", text) if match: prompt = match.group(1).strip() except OSError: pass if prompt: break if not prompt: prompt = "(No prompt found)" run_id = str(run_dir.relative_to(root)).replace("/", "-").replace("\\", "-") # Collect output files outputs_dir = run_dir / "outputs" output_files: list[dict] = [] if outputs_dir.is_dir(): for f in sorted(outputs_dir.iterdir()): if f.is_file() and f.name not in METADATA_FILES: output_files.append(embed_file(f)) # Load grading if present grading = None for candidate in [run_dir / "grading.json", run_dir.parent / "grading.json"]: if candidate.exists(): try: grading = json.loads(candidate.read_text()) except (json.JSONDecodeError, OSError): pass if grading: break return { "id": run_id, "prompt": prompt, "eval_id": eval_id, "outputs": output_files, "grading": grading, } def embed_file(path: Path) -> dict: """Read a file and return an embedded representation.""" ext = path.suffix.lower() mime = get_mime_type(path) if ext in TEXT_EXTENSIONS: try: content = path.read_text(errors="replace") except OSError: content = "(Error reading file)" return { "name": path.name, "type": "text", "content": content, } elif ext in IMAGE_EXTENSIONS: try: raw = path.read_bytes() b64 = base64.b64encode(raw).decode("ascii") except OSError: return {"name": path.name, "type": "error", "content": "(Error reading file)"} return { "name": path.name, "type": "image", "mime": mime, "data_uri": f"data:{mime};base64,{b64}", } elif ext == ".pdf": try: raw = path.read_bytes() b64 = base64.b64encode(raw).decode("ascii") except OSError: return {"name": path.name, "type": "error", "content": "(Error reading file)"} return { "name": path.name, "type": "pdf", "data_uri": f"data:{mime};base64,{b64}", } elif ext == ".xlsx": try: raw = path.read_bytes() b64 = base64.b64encode(raw).decode("ascii") except OSError: return {"name": path.name, "type": "error", "content": "(Error reading file)"} return { "name": path.name, "type": "xlsx", "data_b64": b64, } else: # Binary / unknown — base64 download link try: raw = path.read_bytes() b64 = base64.b64encode(raw).decode("ascii") except OSError: return {"name": path.name, "type": "error", "content": "(Error reading file)"} return { "name": path.name, "type": "binary", "mime": mime, "data_uri": f"data:{mime};base64,{b64}", } def load_previous_iteration(workspace: Path) -> dict[str, dict]: """Load previous iteration's feedback and outputs. Returns a map of run_id -> {"feedback": str, "outputs": list[dict]}. """ result: dict[str, dict] = {} # Load feedback feedback_map: dict[str, str] = {} feedback_path = workspace / "feedback.json" if feedback_path.exists(): try: data = json.loads(feedback_path.read_text()) feedback_map = { r["run_id"]: r["feedback"] for r in data.get("reviews", []) if r.get("feedback", "").strip() } except (json.JSONDecodeError, OSError, KeyError): pass # Load runs (to get outputs) prev_runs = find_runs(workspace) for run in prev_runs: result[run["id"]] = { "feedback": feedback_map.get(run["id"], ""), "outputs": run.get("outputs", []), } # Also add feedback for run_ids that had feedback but no matching run for run_id, fb in feedback_map.items(): if run_id not in result: result[run_id] = {"feedback": fb, "outputs": []} return result def generate_html( runs: list[dict], skill_name: str, previous: dict[str, dict] | None = None, benchmark: dict | None = None, ) -> str: """Generate the complete standalone HTML page with embedded data.""" template_path = Path(__file__).parent / "viewer.html" template = template_path.read_text() # Build previous_feedback and previous_outputs maps for the template previous_feedback: dict[str, str] = {} previous_outputs: dict[str, list[dict]] = {} if previous: for run_id, data in previous.items(): if data.get("feedback"): previous_feedback[run_id] = data["feedback"] if data.get("outputs"): previous_outputs[run_id] = data["outputs"] embedded = { "skill_name": skill_name, "runs": runs, "previous_feedback": previous_feedback, "previous_outputs": previous_outputs, } if benchmark: embedded["benchmark"] = benchmark data_json = json.dumps(embedded) return template.replace("/*__EMBEDDED_DATA__*/", f"const EMBEDDED_DATA = {data_json};") # --------------------------------------------------------------------------- # HTTP server (stdlib only, zero dependencies) # --------------------------------------------------------------------------- def _kill_port(port: int) -> None: """Kill any process listening on the given port.""" try: result = subprocess.run( ["lsof", "-ti", f":{port}"], capture_output=True, text=True, timeout=5, ) for pid_str in result.stdout.strip().split("\n"): if pid_str.strip(): try: os.kill(int(pid_str.strip()), signal.SIGTERM) except (ProcessLookupError, ValueError): pass if result.stdout.strip(): time.sleep(0.5) except subprocess.TimeoutExpired: pass except FileNotFoundError: print("Note: lsof not found, cannot check if port is in use", file=sys.stderr) class ReviewHandler(BaseHTTPRequestHandler): """Serves the review HTML and handles feedback saves. Regenerates the HTML on each page load so that refreshing the browser picks up new eval outputs without restarting the server. """ def __init__( self, workspace: Path, skill_name: str, feedback_path: Path, previous: dict[str, dict], benchmark_path: Path | None, *args, **kwargs, ): self.workspace = workspace self.skill_name = skill_name self.feedback_path = feedback_path self.previous = previous self.benchmark_path = benchmark_path super().__init__(*args, **kwargs) def do_GET(self) -> None: if self.path == "/" or self.path == "/index.html": # Regenerate HTML on each request (re-scans workspace for new outputs) runs = find_runs(self.workspace) benchmark = None if self.benchmark_path and self.benchmark_path.exists(): try: benchmark = json.loads(self.benchmark_path.read_text()) except (json.JSONDecodeError, OSError): pass html = generate_html(runs, self.skill_name, self.previous, benchmark) content = html.encode("utf-8") self.send_response(200) self.send_header("Content-Type", "text/html; charset=utf-8") self.send_header("Content-Length", str(len(content))) self.end_headers() self.wfile.write(content) elif self.path == "/api/feedback": data = b"{}" if self.feedback_path.exists(): data = self.feedback_path.read_bytes() self.send_response(200) self.send_header("Content-Type", "application/json") self.send_header("Content-Length", str(len(data))) self.end_headers() self.wfile.write(data) else: self.send_error(404) def do_POST(self) -> None: if self.path == "/api/feedback": length = int(self.headers.get("Content-Length", 0)) body = self.rfile.read(length) try: data = json.loads(body) if not isinstance(data, dict) or "reviews" not in data: raise ValueError("Expected JSON object with 'reviews' key") self.feedback_path.write_text(json.dumps(data, indent=2) + "\n") resp = b'{"ok":true}' self.send_response(200) except (json.JSONDecodeError, OSError, ValueError) as e: resp = json.dumps({"error": str(e)}).encode() self.send_response(500) self.send_header("Content-Type", "application/json") self.send_header("Content-Length", str(len(resp))) self.end_headers() self.wfile.write(resp) else: self.send_error(404) def log_message(self, format: str, *args: object) -> None: # Suppress request logging to keep terminal clean pass def main() -> None: parser = argparse.ArgumentParser(description="Generate and serve eval review") parser.add_argument("workspace", type=Path, help="Path to workspace directory") parser.add_argument("--port", "-p", type=int, default=3117, help="Server port (default: 3117)") parser.add_argument("--skill-name", "-n", type=str, default=None, help="Skill name for header") parser.add_argument( "--previous-workspace", type=Path, default=None, help="Path to previous iteration's workspace (shows old outputs and feedback as context)", ) parser.add_argument( "--benchmark", type=Path, default=None, help="Path to benchmark.json to show in the Benchmark tab", ) parser.add_argument( "--static", "-s", type=Path, default=None, help="Write standalone HTML to this path instead of starting a server", ) args = parser.parse_args() workspace = args.workspace.resolve() if not workspace.is_dir(): print(f"Error: {workspace} is not a directory", file=sys.stderr) sys.exit(1) runs = find_runs(workspace) if not runs: print(f"No runs found in {workspace}", file=sys.stderr) sys.exit(1) skill_name = args.skill_name or workspace.name.replace("-workspace", "") feedback_path = workspace / "feedback.json" previous: dict[str, dict] = {} if args.previous_workspace: previous = load_previous_iteration(args.previous_workspace.resolve()) benchmark_path = args.benchmark.resolve() if args.benchmark else None benchmark = None if benchmark_path and benchmark_path.exists(): try: benchmark = json.loads(benchmark_path.read_text()) except (json.JSONDecodeError, OSError): pass if args.static: html = generate_html(runs, skill_name, previous, benchmark) args.static.parent.mkdir(parents=True, exist_ok=True) args.static.write_text(html) print(f"\n Static viewer written to: {args.static}\n") sys.exit(0) # Kill any existing process on the target port port = args.port _kill_port(port) handler = partial(ReviewHandler, workspace, skill_name, feedback_path, previous, benchmark_path) try: server = HTTPServer(("127.0.0.1", port), handler) except OSError: # Port still in use after kill attempt — find a free one server = HTTPServer(("127.0.0.1", 0), handler) port = server.server_address[1] url = f"http://localhost:{port}" print(f"\n Eval Viewer") print(f" ─────────────────────────────────") print(f" URL: {url}") print(f" Workspace: {workspace}") print(f" Feedback: {feedback_path}") if previous: print(f" Previous: {args.previous_workspace} ({len(previous)} runs)") if benchmark_path: print(f" Benchmark: {benchmark_path}") print(f"\n Press Ctrl+C to stop.\n") webbrowser.open(url) try: server.serve_forever() except KeyboardInterrupt: print("\nStopped.") server.server_close() if __name__ == "__main__": main()