Skip to content

Commit ec9f6fa

Browse files
authored
Add Hugging Face dataset export flow for competition submissions (#459)
* Add HF dataset export logic and admin commands * Add huggingface-hub and datasets dependencies * Add tests for HF export and admin API endpoint * apply review fixes, req hardening and in memory buffer addition * switch to standalone sql file * switch to tempfile, hf xet support is not available over inmemorybuffer --------- Co-authored-by: Sinatras <SinatrasC@users.noreply.github.com>
1 parent 5f29b3a commit ec9f6fa

9 files changed

Lines changed: 871 additions & 0 deletions

File tree

pyproject.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@ dependencies = [
2323
"fastapi[all]",
2424
"uvicorn",
2525
"jinja2",
26+
"huggingface-hub>=0.20",
27+
"pyarrow>=14.0",
2628
]
2729

2830
[project.optional-dependencies]
@@ -37,6 +39,9 @@ dev = [
3739
[tool.setuptools.packages.find]
3840
where = ["src"]
3941

42+
[tool.setuptools.package-data]
43+
libkernelbot = ["sql/*.sql"]
44+
4045
[tool.coverage.run]
4146
omit = ["src/libkernelbot/run_eval.py", "src/libkernelbot/launchers/*.py"]
4247
relative_files = true

src/kernelbot/api/main.py

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -675,6 +675,53 @@ async def admin_update_problems(
675675
}
676676

677677

678+
@app.post("/admin/export-hf")
679+
async def admin_export_hf(
680+
payload: dict,
681+
_: Annotated[None, Depends(require_admin)],
682+
db_context=Depends(get_db),
683+
) -> dict:
684+
"""Export competition submissions to a Hugging Face dataset as parquet.
685+
686+
Payload:
687+
leaderboard_ids: list[int] - IDs of leaderboards to export
688+
filename: str - parquet filename in the repo (e.g. "nvidia_nvfp4_submissions.parquet")
689+
private: bool - if true, upload to private live repo; if false, upload to public repo (default: true)
690+
"""
691+
from libkernelbot.hf_export import export_to_hf
692+
693+
leaderboard_ids = payload.get("leaderboard_ids")
694+
filename = payload.get("filename")
695+
private = payload.get("private", True)
696+
697+
if not isinstance(leaderboard_ids, list) or not leaderboard_ids:
698+
raise HTTPException(status_code=400, detail="leaderboard_ids must be a non-empty list of integers")
699+
if not all(isinstance(leaderboard_id, int) for leaderboard_id in leaderboard_ids):
700+
raise HTTPException(status_code=400, detail="leaderboard_ids must be a non-empty list of integers")
701+
if not isinstance(filename, str) or not filename.endswith(".parquet"):
702+
raise HTTPException(status_code=400, detail="filename must end with .parquet")
703+
if not env.HF_TOKEN:
704+
raise HTTPException(status_code=500, detail="HF_TOKEN not configured")
705+
706+
repo_id = env.HF_PUBLIC_DATASET if not private else env.HF_PRIVATE_DATASET
707+
708+
try:
709+
with db_context as db:
710+
result = export_to_hf(
711+
db=db,
712+
leaderboard_ids=leaderboard_ids,
713+
repo_id=repo_id,
714+
filename=filename,
715+
token=env.HF_TOKEN,
716+
private=private,
717+
)
718+
return {"status": "ok", **result}
719+
except ValueError as e:
720+
raise HTTPException(status_code=400, detail=str(e)) from e
721+
except Exception as e:
722+
raise HTTPException(status_code=500, detail=f"Export failed: {e}") from e
723+
724+
678725
@app.get("/leaderboards")
679726
async def get_leaderboards(db_context=Depends(get_db)):
680727
"""An endpoint that returns all leaderboards.

src/kernelbot/cogs/admin_cog.py

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,13 @@ def __init__(self, bot: "ClusterBot"):
123123
name="set-forum-ids", description="Sets forum IDs"
124124
)(self.set_forum_ids)
125125

126+
self.export_to_hf = bot.admin_group.command(
127+
name="export-hf", description="Export competition data to Hugging Face dataset"
128+
)(self.export_to_hf)
129+
126130
self._scheduled_cleanup_temp_users.start()
131+
if env.HF_TOKEN:
132+
self._scheduled_hf_export.start()
127133

128134
# --------------------------------------------------------------------------
129135
# | HELPER FUNCTIONS |
@@ -873,6 +879,106 @@ async def _scheduled_cleanup_temp_users(self):
873879
db.cleanup_temp_users()
874880
logger.info("Temporary users cleanup completed")
875881

882+
@tasks.loop(hours=24)
883+
async def _scheduled_hf_export(self):
884+
"""Daily export of active competition submissions to private HF dataset.
885+
886+
Once a competition expires, it drops out of the scheduled export set. If
887+
there are still results settling after the deadline, a manual export is
888+
needed once the queue drains. Currently public HF dataset releases are
889+
handled manually.
890+
"""
891+
from libkernelbot.hf_export import export_to_hf, get_active_competition_leaderboards
892+
893+
try:
894+
with self.bot.leaderboard_db as db:
895+
leaderboards = db.get_leaderboards()
896+
active = get_active_competition_leaderboards(
897+
leaderboards,
898+
now=datetime.now(timezone.utc),
899+
)
900+
901+
if not active:
902+
logger.info("HF export: no active competitions, skipping")
903+
return
904+
905+
leaderboard_ids = [lb["id"] for lb in active]
906+
result = export_to_hf(
907+
db=db,
908+
leaderboard_ids=leaderboard_ids,
909+
repo_id=env.HF_PRIVATE_DATASET,
910+
filename="active_submissions.parquet",
911+
token=env.HF_TOKEN,
912+
private=True,
913+
)
914+
logger.info("Scheduled HF export complete: %s", result)
915+
except Exception:
916+
logger.exception("Scheduled HF export failed")
917+
918+
@_scheduled_hf_export.before_loop
919+
async def _before_hf_export(self):
920+
await self.bot.wait_until_ready()
921+
922+
@discord.app_commands.describe(
923+
leaderboard_name="Name of the competition to export",
924+
filename="Parquet filename (default: <leaderboard_name>.parquet)",
925+
private="Upload to private repo (default: true)",
926+
)
927+
@discord.app_commands.autocomplete(leaderboard_name=leaderboard_name_autocomplete)
928+
@with_error_handling
929+
async def export_to_hf(
930+
self,
931+
interaction: discord.Interaction,
932+
leaderboard_name: str,
933+
filename: Optional[str] = None,
934+
private: bool = True,
935+
):
936+
from libkernelbot.hf_export import export_to_hf as do_export
937+
938+
is_admin = await self.admin_check(interaction)
939+
if not is_admin:
940+
await send_discord_message(
941+
interaction,
942+
"You need to have Admin permissions to run this command",
943+
ephemeral=True,
944+
)
945+
return
946+
947+
if not env.HF_TOKEN:
948+
await send_discord_message(interaction, "HF_TOKEN not configured.", ephemeral=True)
949+
return
950+
951+
await interaction.response.defer(ephemeral=True)
952+
953+
if filename is None:
954+
filename = f"{leaderboard_name}.parquet"
955+
if not filename.endswith(".parquet"):
956+
filename += ".parquet"
957+
958+
repo_id = env.HF_PRIVATE_DATASET if private else env.HF_PUBLIC_DATASET
959+
960+
try:
961+
with self.bot.leaderboard_db as db:
962+
lb_id = db.get_leaderboard_id(leaderboard_name)
963+
result = do_export(
964+
db=db,
965+
leaderboard_ids=[lb_id],
966+
repo_id=repo_id,
967+
filename=filename,
968+
token=env.HF_TOKEN,
969+
private=private,
970+
)
971+
await send_discord_message(
972+
interaction,
973+
f"Exported {result['rows']} rows to `{repo_id}/{filename}`.",
974+
ephemeral=True,
975+
)
976+
except ValueError as e:
977+
await send_discord_message(interaction, str(e), ephemeral=True)
978+
except Exception as e:
979+
logger.error("HF export failed: %s", e, exc_info=True)
980+
await send_discord_message(interaction, f"Export failed: {e}", ephemeral=True)
981+
876982
####################################################################################################################
877983
# MIGRATION COMMANDS --- TO BE DELETED LATER
878984
####################################################################################################################

src/kernelbot/env.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717
env.DISCORD_DEBUG_CLUSTER_STAGING_ID = os.getenv("DISCORD_DEBUG_CLUSTER_STAGING_ID")
1818

1919
env.ADMIN_TOKEN = os.getenv("ADMIN_TOKEN")
20+
env.HF_TOKEN = os.getenv("HF_TOKEN")
21+
env.HF_PRIVATE_DATASET = os.getenv("HF_PRIVATE_DATASET", "GPUMODE/kernelbot-data-live")
22+
env.HF_PUBLIC_DATASET = os.getenv("HF_PUBLIC_DATASET", "GPUMODE/kernelbot-data")
2023

2124
# Only required to run the CLI against this instance
2225
# setting these is required only to run the CLI against local instance

src/libkernelbot/hf_export.py

Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
"""Export competition submissions to Hugging Face datasets as parquet files."""
2+
3+
import io
4+
import tempfile
5+
from datetime import datetime, timezone
6+
from importlib.resources import files
7+
8+
import pyarrow as pa
9+
import pyarrow.parquet as pq
10+
from huggingface_hub import HfApi
11+
12+
from libkernelbot.leaderboard_db import LeaderboardDB
13+
from libkernelbot.utils import setup_logging
14+
15+
logger = setup_logging(__name__)
16+
HF_EXPORT_ROWS_SQL = files("libkernelbot").joinpath("sql/get_hf_export_rows.sql").read_text(
17+
encoding="utf-8"
18+
)
19+
20+
# Explicit schema matching GPUMODE/kernelbot-data nvidia_nvfp4_submissions.parquet
21+
SUBMISSIONS_SCHEMA = pa.schema([
22+
("submission_id", pa.int64()),
23+
("leaderboard_id", pa.int64()),
24+
("problem_name", pa.large_string()),
25+
("user_id", pa.large_string()),
26+
("user_name", pa.large_string()),
27+
("code_id", pa.int64()),
28+
("file_name", pa.large_string()),
29+
("submission_time", pa.timestamp("us", tz="UTC")),
30+
("status", pa.large_string()),
31+
("score", pa.float64()),
32+
("passed", pa.bool_()),
33+
("mode", pa.large_string()),
34+
("runner", pa.large_string()),
35+
("code", pa.large_string()),
36+
])
37+
38+
39+
def _normalize_deadline(deadline: datetime) -> datetime:
40+
"""Ensure deadlines are timezone-aware before comparing them."""
41+
if deadline.tzinfo is None:
42+
return deadline.replace(tzinfo=timezone.utc)
43+
return deadline
44+
45+
46+
MAX_COMPETITION_HORIZON_DAYS = 365
47+
48+
49+
def get_active_competition_leaderboards(
50+
leaderboards: list[dict],
51+
*,
52+
now: datetime | None = None,
53+
) -> list[dict]:
54+
"""Return leaderboards that belong to real, active competitions.
55+
56+
Filters out:
57+
- Expired leaderboards (deadline <= now)
58+
- Dev leaderboards (name ending with "-dev")
59+
- Permanent/practice leaderboards (deadline > 1 year from now, e.g. year 2100)
60+
"""
61+
if now is None:
62+
now = datetime.now(timezone.utc)
63+
64+
from datetime import timedelta
65+
66+
horizon = now + timedelta(days=MAX_COMPETITION_HORIZON_DAYS)
67+
68+
active_competitions = []
69+
for leaderboard in leaderboards:
70+
deadline = _normalize_deadline(leaderboard["deadline"])
71+
if deadline > now and deadline < horizon and not leaderboard["name"].endswith("-dev"):
72+
active_competitions.append(leaderboard)
73+
return active_competitions
74+
75+
76+
def ensure_public_export_allowed(
77+
db: LeaderboardDB,
78+
leaderboard_ids: list[int],
79+
*,
80+
now: datetime | None = None,
81+
) -> None:
82+
"""Block public exports while any selected leaderboard is still active."""
83+
if now is None:
84+
now = datetime.now(timezone.utc)
85+
86+
selected_ids = set(leaderboard_ids)
87+
active_names = []
88+
for leaderboard in db.get_leaderboards():
89+
if leaderboard["id"] not in selected_ids:
90+
continue
91+
deadline = _normalize_deadline(leaderboard["deadline"])
92+
if deadline > now:
93+
active_names.append(leaderboard["name"])
94+
95+
if active_names:
96+
active_names.sort()
97+
raise ValueError(
98+
"Cannot export active leaderboards to the public dataset: "
99+
+ ", ".join(active_names)
100+
)
101+
102+
103+
def get_hf_export_rows(db: LeaderboardDB, leaderboard_ids: list[int]) -> list[dict]:
104+
"""Fetch deduplicated submissions for export."""
105+
if not leaderboard_ids:
106+
return []
107+
108+
db.cursor.execute(HF_EXPORT_ROWS_SQL, (leaderboard_ids,))
109+
110+
columns = [
111+
"submission_id", "leaderboard_id", "problem_name", "user_id", "user_name",
112+
"code_id", "file_name", "submission_time", "status", "score", "passed",
113+
"mode", "runner", "code",
114+
]
115+
return [dict(zip(columns, row, strict=True)) for row in db.cursor.fetchall()]
116+
117+
118+
def rows_to_parquet_bytes(rows: list[dict]) -> bytes:
119+
"""Convert a list of row dicts to parquet bytes using the canonical schema."""
120+
if not rows:
121+
table = pa.table({field.name: pa.array([], type=field.type) for field in SUBMISSIONS_SCHEMA})
122+
else:
123+
for row in rows:
124+
if row.get("user_id") is not None:
125+
row["user_id"] = str(row["user_id"])
126+
if row.get("user_name") is None:
127+
row["user_name"] = ""
128+
if row.get("score") is not None:
129+
row["score"] = float(row["score"])
130+
table = pa.Table.from_pylist(rows, schema=SUBMISSIONS_SCHEMA)
131+
132+
buf = io.BytesIO()
133+
pq.write_table(table, buf, compression="snappy")
134+
return buf.getvalue()
135+
136+
137+
def export_to_hf(
138+
db: LeaderboardDB,
139+
leaderboard_ids: list[int],
140+
repo_id: str,
141+
filename: str,
142+
token: str,
143+
private: bool = True,
144+
) -> dict:
145+
"""Export deduplicated submissions to a HF dataset repo as a parquet file.
146+
147+
Returns a summary dict with row count and repo info.
148+
"""
149+
if not private:
150+
ensure_public_export_allowed(db, leaderboard_ids)
151+
152+
api = HfApi(token=token)
153+
api.create_repo(repo_id, repo_type="dataset", private=private, exist_ok=True)
154+
155+
rows = get_hf_export_rows(db, leaderboard_ids)
156+
parquet_bytes = rows_to_parquet_bytes(rows)
157+
with tempfile.NamedTemporaryFile(suffix=".parquet") as tmp:
158+
tmp.write(parquet_bytes)
159+
tmp.flush()
160+
api.upload_file(
161+
path_or_fileobj=tmp.name,
162+
path_in_repo=filename,
163+
repo_id=repo_id,
164+
repo_type="dataset",
165+
)
166+
167+
logger.info("Exported %d rows to %s/%s", len(rows), repo_id, filename)
168+
return {"rows": len(rows), "repo_id": repo_id, "filename": filename}
169+
170+
171+
def publish_to_public_repo(
172+
db: LeaderboardDB,
173+
leaderboard_ids: list[int],
174+
public_repo_id: str,
175+
filename: str,
176+
token: str,
177+
) -> dict:
178+
"""Export final competition data to the public dataset repo."""
179+
return export_to_hf(
180+
db=db,
181+
leaderboard_ids=leaderboard_ids,
182+
repo_id=public_repo_id,
183+
filename=filename,
184+
token=token,
185+
private=False,
186+
)

0 commit comments

Comments
 (0)