diff --git a/benchmarks/olmocr/bench/runners/run_extend.py b/benchmarks/olmocr/bench/runners/run_extend.py new file mode 100644 index 0000000..2a5b57e --- /dev/null +++ b/benchmarks/olmocr/bench/runners/run_extend.py @@ -0,0 +1,193 @@ +""" +olmOCR-bench runner for Extend.ai's /parse_runs endpoint — max-accuracy config. + +Per-page contract (matches run_reducto / run_gemini_pro_31): + run_extend(pdf_path, page_num=1) -> markdown string for that one page. + +Design: + * Extract one PDF page locally (pymupdf) so the upload payload stays small. + * Use `engine="parse_performance"` (Extend's high-accuracy engine). + * Turn on agentic processing for text + tables, and chart extraction for + figures, with `page_rotation_enabled` for sideways scans. + * **Walk chunk.blocks[] instead of joining chunk.content** — gives us: + - structural filtering of headers/footers/page numbers (block.type + match) rather than a soft prompt instruction the agentic pass ignores; + - real LaTeX from formula blocks via `block.details.latex`, wrapped + in `\\(...\\)` so olmOCR's math tests can match it. + + This replaces the earlier prompt-based approach, which left + `` tags in the markdown and never produced LaTeX. +""" + +import os +import tempfile + +import pymupdf + +from src.commons_extend import extend_client, record_usage + + +# Block types that olmOCR's `absent` tests expect to NOT be in the output. +# Filtered structurally before we serialize anything. +SKIP_BLOCK_TYPES = {"header", "footer", "page_number"} + + +# Config — only documented Extend.ai parameters (verified against +# docs.extend.ai/product/parsing/configuration-options): +# +# target=markdown best for LLM-style text scoring (olmOCR uses +# substring/fuzzy matches against markdown gold). +# engine=parse_performance high-accuracy engine (vs parse_light); required +# for cellBlocksEnabled, advancedChartExtraction, +# formattingDetection, etc. +# chunking_strategy=page we upload one page at a time, so a single +# page-chunk is all we need. +# text.agentic.enabled=true VLM second-pass on text blocks; ON because +# olmOCR includes degraded scans + handwriting. +# (custom_instructions removed — undocumented, +# doesn't filter content, gave us nothing.) +# text.signature_detection_enabled=true cheap helper for old-scans subset. +# tables.enabled+agentic table re-parsing for messy/multi-page tables; +# markdown target_format matches our scorer. +# formulas.enabled=true *** THIS IS THE BIG ONE ***. Without it, +# equations come back as plain `text` blocks and +# we never see `details.latex`. Set to true so +# formula blocks appear and we can emit LaTeX. +# figures.enabled+adv-chart charts -> structured data; cheap insurance. +# page_rotation_enabled=true handles sideways scans (old_scans subset). +PARSE_CONFIG = { + "target": "markdown", + "engine": "parse_performance", + "chunking_strategy": {"type": "page"}, + "block_options": { + "text": { + "signature_detection_enabled": True, + "agentic": {"enabled": True}, + }, + "tables": { + "enabled": True, + "target_format": "markdown", + "table_header_continuation_enabled": True, + "agentic": {"enabled": True}, + }, + "formulas": {"enabled": True}, + "figures": { + "enabled": True, + "advanced_chart_extraction_enabled": True, + }, + }, + "advanced_options": { + "page_rotation_enabled": True, + }, +} + + +def _extract_page_to_tempfile(pdf_path: str, page_num: int) -> str: + """Extract a single 1-indexed page into a new temp PDF file. Returns path.""" + src = pymupdf.open(pdf_path) + try: + if page_num < 1 or page_num > src.page_count: + raise ValueError( + f"page_num {page_num} out of range for {pdf_path} (n_pages={src.page_count})" + ) + out = pymupdf.open() + try: + out.insert_pdf(src, from_page=page_num - 1, to_page=page_num - 1) + fd, tmp_path = tempfile.mkstemp(suffix=".pdf", prefix="extend_pg_") + os.close(fd) + out.save(tmp_path) + return tmp_path + finally: + out.close() + finally: + src.close() + + +def _block_type(block) -> str: + """Return the block.type as a plain string ('text' / 'formula' / ...). + + SDK exposes type as an enum-or-string union (UNKNOWN sentinel possible), + so normalize via .value when available. + """ + t = getattr(block, "type", None) + if t is None: + return "" + return getattr(t, "value", str(t)) + + +def _render_block(block) -> str: + """Serialize a single block to markdown text. + + Formula blocks: emit `\\( latex \\)` from `block.details.latex` so olmOCR's + math tests can substring-match the LaTeX. Falls back to `block.content` + if `details.latex` is missing. + All other (kept) types: use `block.content` as-is — Extend already + formatted it per the parse config (markdown tables, etc.). + """ + btype = _block_type(block) + + if btype == "formula": + details = getattr(block, "details", None) + latex = getattr(details, "latex", None) if details is not None else None + if latex: + latex = latex.strip() + return f"\\( {latex} \\)" + # fall through to content if no latex available + + return getattr(block, "content", None) or "" + + +def _serialize_run(run) -> str: + """Walk chunks -> blocks, skip header/footer/page_number, return markdown.""" + output = getattr(run, "output", None) + chunks = getattr(output, "chunks", None) if output is not None else None + if not chunks: + return "" + + pieces: list[str] = [] + for chunk in chunks: + blocks = getattr(chunk, "blocks", None) or [] + for block in blocks: + if _block_type(block) in SKIP_BLOCK_TYPES: + continue + piece = _render_block(block) + if piece and piece.strip(): + pieces.append(piece.strip()) + return "\n\n".join(pieces).strip() + + +def run_extend( + pdf_path: str, + page_num: int = 1, + timeout: float = 600.0, +) -> str: + """Parse one PDF page through Extend.ai and return the markdown content.""" + single_page_path = _extract_page_to_tempfile(pdf_path, page_num) + try: + with open(single_page_path, "rb") as fh: + upload = extend_client.files.upload(file=fh) + + run = extend_client.parse_runs.create_and_poll( + file={"id": upload.id}, + config=PARSE_CONFIG, + ) + + record_usage("parse", getattr(run, "metrics", None)) + + status = getattr(run.status, "value", str(run.status)) + if status != "PROCESSED": + err = ( + getattr(run, "failure_message", None) + or getattr(run, "failure_reason", None) + ) + raise RuntimeError(f"Extend parse failed: status={status} error={err}") + + text = _serialize_run(run) + if not text or text.strip().lower() in ("null", "none", "n/a"): + return "" + return text + finally: + try: + os.unlink(single_page_path) + except OSError: + pass diff --git a/benchmarks/olmocr/olmocr_bench_extend.py b/benchmarks/olmocr/olmocr_bench_extend.py new file mode 100644 index 0000000..1ca1d1e --- /dev/null +++ b/benchmarks/olmocr/olmocr_bench_extend.py @@ -0,0 +1,232 @@ +""" +OlmOCR Benchmark for Extend.ai /parse_runs. + +Mirrors olmocr_bench_reducto.py — same dataset, same per-page contract, just +swaps Reducto for Extend so candidate outputs land under data_dir/extend/. + +Usage: + uv run -m benchmarks.olmocr.olmocr_bench_extend + uv run -m benchmarks.olmocr.olmocr_bench_extend --sample + uv run -m benchmarks.olmocr.olmocr_bench_extend --skip-generation + uv run -m benchmarks.olmocr.olmocr_bench_extend --generate-only +""" + +import argparse +import asyncio +import json +import os +import sys +from pathlib import Path + +from dotenv import load_dotenv +from huggingface_hub import hf_hub_download +from tqdm.asyncio import tqdm_asyncio + +load_dotenv() + +PROJECT_ROOT = Path(__file__).resolve().parent.parent.parent +SAMPLE_DATA_DIR = Path(__file__).resolve().parent / "bench" / "sample_data" +FULL_DATA_DIR = Path(__file__).resolve().parent / "bench" / "full_data" +RESULTS_DIR = PROJECT_ROOT / "results" +USAGE_OUTPUT = RESULTS_DIR / "olmocr_extend_v2_usage.json" +CANDIDATE_NAME = "extend_v2" +RATE_LIMIT = 8 +MAX_RETRIES = 3 + +HF_REPO = "allenai/olmOCR-bench" +SPLITS = [ + "arxiv_math", + "headers_footers", + "long_tiny_text", + "multi_column", + "old_scans", + "old_scans_math", + "table_tests", +] + +sys.path.insert(0, str(PROJECT_ROOT)) +sys.path.insert(0, str(Path(__file__).resolve().parent.parent)) + + +class RateLimiter: + def __init__(self, rate: int): + self.rate = rate + self.tokens = rate + self.last_refill = 0.0 + self._lock = asyncio.Lock() + + async def acquire(self): + while True: + async with self._lock: + now = asyncio.get_running_loop().time() + elapsed = now - self.last_refill + self.tokens = min(self.rate, self.tokens + elapsed * self.rate) + self.last_refill = now + if self.tokens >= 1: + self.tokens -= 1 + return + await asyncio.sleep(1 / self.rate) + + +def download_full_dataset(): + data_dir = FULL_DATA_DIR + pdf_dir = data_dir / "pdfs" + all_pdfs = set() + for split in SPLITS: + jsonl_dest = data_dir / f"{split}.jsonl" + if jsonl_dest.exists(): + with open(jsonl_dest) as f: + tests = [json.loads(l) for l in f if l.strip()] + else: + print(f" Downloading {split}.jsonl...") + src = hf_hub_download( + HF_REPO, f"bench_data/{split}.jsonl", repo_type="dataset" + ) + with open(src) as f: + tests = [json.loads(l) for l in f if l.strip()] + data_dir.mkdir(parents=True, exist_ok=True) + with open(jsonl_dest, "w") as f: + for t in tests: + f.write(json.dumps(t) + "\n") + print(f" {split}: {len(tests)} tests") + for t in tests: + all_pdfs.add(t["pdf"]) + + print(f"\n Total unique PDFs to download: {len(all_pdfs)}") + downloaded = 0 + skipped = 0 + for pdf_rel in sorted(all_pdfs): + local_path = pdf_dir / pdf_rel + if local_path.exists(): + skipped += 1 + continue + local_path.parent.mkdir(parents=True, exist_ok=True) + try: + src = hf_hub_download( + HF_REPO, f"bench_data/pdfs/{pdf_rel}", repo_type="dataset" + ) + os.symlink(src, str(local_path)) + downloaded += 1 + except Exception as e: + print(f" Failed to download {pdf_rel}: {e}") + print(f" PDFs: {downloaded} downloaded, {skipped} already existed") + return data_dir + + +async def process_page(pdf_path, page_num, output_path, rate_limiter): + from olmocr.bench.runners.run_extend import run_extend + + for attempt in range(MAX_RETRIES): + await rate_limiter.acquire() + try: + result = await asyncio.to_thread(run_extend, pdf_path, page_num) + os.makedirs(os.path.dirname(output_path), exist_ok=True) + with open(output_path, "w", encoding="utf-8") as f: + f.write(result) + return True + except Exception as e: + if attempt < MAX_RETRIES - 1: + await asyncio.sleep(2**attempt) + else: + print( + f"Failed after {MAX_RETRIES} attempts: {pdf_path} page {page_num}: {e}" + ) + return False + + +async def generate_outputs(data_dir: Path): + pdf_folder = data_dir / "pdfs" + output_folder = data_dir / CANDIDATE_NAME + + pdf_pages = set() + for jsonl_file in data_dir.glob("*.jsonl"): + with open(jsonl_file) as f: + for line in f: + line = line.strip() + if not line: + continue + t = json.loads(line) + pdf_pages.add((t["pdf"], t["page"])) + + print(f"Found {len(pdf_pages)} unique (pdf, page) pairs to process") + + rate_limiter = RateLimiter(RATE_LIMIT) + tasks = [] + for pdf_rel, page in sorted(pdf_pages): + pdf_path = str(pdf_folder / pdf_rel) + if not os.path.exists(pdf_path): + continue + base_name = os.path.splitext(os.path.basename(pdf_rel))[0] + parent_dir = os.path.dirname(pdf_rel) + md_filename = f"{base_name}_pg{page}_repeat1.md" + if parent_dir: + out_path = str(output_folder / parent_dir / md_filename) + else: + out_path = str(output_folder / md_filename) + if os.path.exists(out_path): + continue + tasks.append(process_page(pdf_path, page, out_path, rate_limiter)) + + if not tasks: + print("All outputs already exist, skipping generation.") + return True + print(f"Processing {len(tasks)} pages...") + results = await tqdm_asyncio.gather( + *tasks, desc=f"Generating {CANDIDATE_NAME} outputs" + ) + num_success = sum(1 for r in results if r) + num_failed = len(results) - num_success + print(f"Done: {num_success} succeeded, {num_failed} failed") + + try: + from src.commons_extend import write_usage_snapshot + + write_usage_snapshot(USAGE_OUTPUT) + print(f"Usage written to {USAGE_OUTPUT}") + except Exception as e: + print(f" (usage snapshot failed: {e})") + + return num_failed == 0 + + +def run_evaluation(data_dir: Path): + from olmocr.bench.benchmark import main as bench_main + + sys.argv = [ + "benchmark", + "--dir", + str(data_dir), + "--candidate", + CANDIDATE_NAME, + "--force", + ] + bench_main() + + +async def main(): + parser = argparse.ArgumentParser( + description=f"Run OlmOCR benchmark with {CANDIDATE_NAME}" + ) + parser.add_argument("--sample", action="store_true") + parser.add_argument("--skip-generation", action="store_true") + parser.add_argument("--generate-only", action="store_true") + args = parser.parse_args() + + if args.sample: + data_dir = SAMPLE_DATA_DIR + print("=== Using sample data ===") + else: + print("=== Downloading full olmOCR-bench dataset from HuggingFace ===") + data_dir = download_full_dataset() + + if not args.skip_generation: + print(f"\n=== Generating {CANDIDATE_NAME} outputs ===") + await generate_outputs(data_dir) + + if not args.generate_only: + print("\n=== Running OlmOCR Benchmark Evaluation ===") + run_evaluation(data_dir) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/src/commons_extend.py b/src/commons_extend.py new file mode 100644 index 0000000..c1131ff --- /dev/null +++ b/src/commons_extend.py @@ -0,0 +1,80 @@ +""" +Extend.ai client + light usage tracking. + +Mirrors src/commons_reducto.py so the olmOCR bench candidate looks the same +as the existing Reducto integration. Keeps the SDK construction in one place +and exposes a thread-safe page-count tally for the usage snapshot file. +""" + +import copy +import json +import os +import threading +from pathlib import Path + +from dotenv import load_dotenv +from extend_ai import Extend + +load_dotenv() + + +if (EXTEND_API_KEY := os.getenv("EXTEND_API_KEY", None)) is None: + raise ValueError( + "EXTEND_API_KEY is not set in environment variables — get it from https://www.extend.ai/" + ) + + +extend_client = Extend(token=EXTEND_API_KEY) + + +_usage_lock = threading.Lock() +_usage_state: dict = { + "calls": 0, + "pages": 0, + "by_endpoint": {}, +} + + +def record_usage(endpoint: str, metrics_obj) -> None: + """Tally a single API response's usage. Safe to call from many threads. + + Extend's ParseRun returns a `metrics` field with `pageCount` (camelCase via + SDK attribute access) and processing time. We only track pages here. + """ + if metrics_obj is None: + return + pages = ( + getattr(metrics_obj, "page_count", None) + or getattr(metrics_obj, "pageCount", None) + or 0 + ) + with _usage_lock: + _usage_state["calls"] += 1 + _usage_state["pages"] += pages + bucket = _usage_state["by_endpoint"].setdefault( + endpoint, {"calls": 0, "pages": 0} + ) + bucket["calls"] += 1 + bucket["pages"] += pages + + +def get_usage_snapshot() -> dict: + with _usage_lock: + return copy.deepcopy(_usage_state) + + +def reset_usage() -> None: + with _usage_lock: + _usage_state["calls"] = 0 + _usage_state["pages"] = 0 + _usage_state["by_endpoint"] = {} + + +def write_usage_snapshot(path) -> None: + """Atomically persist the current usage tally to a JSON file.""" + snap = get_usage_snapshot() + p = Path(path) + p.parent.mkdir(parents=True, exist_ok=True) + tmp = p.with_suffix(p.suffix + ".tmp") + tmp.write_text(json.dumps(snap, indent=2)) + tmp.replace(p)