diff --git a/docs/superpowers/specs/2026-06-08-devlog-control-plane-stdout-protocol.md b/docs/superpowers/specs/2026-06-08-devlog-control-plane-stdout-protocol.md new file mode 100644 index 0000000..4e39c33 --- /dev/null +++ b/docs/superpowers/specs/2026-06-08-devlog-control-plane-stdout-protocol.md @@ -0,0 +1,35 @@ +# DevLog Control Plane Stdout Protocol + +DevLog agents can report workflow progress and human approval gates by writing one JSON marker per stdout line. The markers are engine-neutral and do not require the agent to call DevLog APIs. + +## Stage Marker + +```text +[DEVLOG_STAGE] {"stage":"3/7","desc":"running tests"} +``` + +Fields: + +- `stage`: required string. Short stage label such as `3/7`, `Phase 2`, or `Review`. +- `desc`: optional string. Human-readable current activity. +- `current_stage`: optional string alternative when the agent already has the full display text. + +DevLog stores the latest stage as `current_stage`. When both `stage` and `desc` are present, the display value is `stage · desc`. + +## Gate Marker + +```text +[DEVLOG_GATE] {"question":"Approve the migration plan?","options":["Approve","Revise"],"stage":"2/4","desc":"plan review"} +``` + +Fields: + +- `question`: required string. The human decision prompt. +- `options`: optional string array. Suggested responses for button rendering. +- `stage` / `desc` / `current_stage`: optional stage context for the gate. + +DevLog stores the gate as JSON in `gate_status` with an internal `id`, `question`, `options`, `created_at`, and `stage`. + +## Compatibility + +Agents that do not emit these markers continue to work with the existing lifecycle status display. Malformed markers are left as ordinary visible output instead of failing the session. diff --git a/src/app/api/sessions/[id]/route.ts b/src/app/api/sessions/[id]/route.ts index cea7baf..6105e7e 100644 --- a/src/app/api/sessions/[id]/route.ts +++ b/src/app/api/sessions/[id]/route.ts @@ -26,9 +26,10 @@ export async function PATCH( ) { const { id } = await params; const body = await req.json(); - const { action, message, approved, reason } = body as { - action?: "send" | "kill" | "pause" | "end" | "respond_permission"; + const { action, message, response, approved, reason } = body as { + action?: "send" | "kill" | "pause" | "end" | "respond_permission" | "resolve_gate"; message?: string; + response?: string; approved?: boolean; reason?: string; }; @@ -60,6 +61,18 @@ export async function PATCH( processManager.respondToPermission(id, approved, reason); break; + case "resolve_gate": { + const gateResponse = response ?? message; + if (!gateResponse?.trim()) { + return NextResponse.json({ error: "response is required" }, { status: 400 }); + } + const result = processManager.resolveGate(id, gateResponse.trim()); + if (!result.ok) { + return NextResponse.json({ error: result.error }, { status: 409 }); + } + break; + } + case "kill": processManager.kill(id); break; diff --git a/src/app/sessions/[id]/page.tsx b/src/app/sessions/[id]/page.tsx index 70e9688..bd49163 100644 --- a/src/app/sessions/[id]/page.tsx +++ b/src/app/sessions/[id]/page.tsx @@ -11,6 +11,7 @@ import { SessionVcc } from "@/components/sessions/session-vcc"; import { ProcessIndicator } from "@/components/sessions/process-indicator"; import { Button } from "@/components/ui/button"; import { Badge } from "@/components/ui/badge"; +import { parseGateStatus } from "@/core/control-plane-state"; import { ArrowLeft, Square, @@ -66,6 +67,7 @@ export default function SessionDetailPage() { } const isActive = isInteractiveSessionStatus(session.status); + const gateStatus = parseGateStatus(session.gate_status); const isEnded = session.status === "completed" || session.status === "failed" || @@ -88,7 +90,11 @@ export default function SessionDetailPage() { {session.prompt?.slice(0, 120) ?? session.id}
- + AI task run diff --git a/src/components/kanban/board.tsx b/src/components/kanban/board.tsx index fdf5d36..c701c8d 100644 --- a/src/components/kanban/board.tsx +++ b/src/components/kanban/board.tsx @@ -20,7 +20,7 @@ import { Skeleton } from "@/components/ui/skeleton"; const COLUMNS = getTaskBoardColumns(); export function KanbanBoard() { - const { loading, tasksByStatus, createTask, updateTask, deleteTask, reorder, executeTask } = useTasks(); + const { loading, tasksByStatus, createTask, updateTask, deleteTask, reorder, executeTask, refresh } = useTasks(); const taskSessions = useTaskSessions(); const { runtimePayload, byokReady, loaded, settingsReady } = useAgentSettings(); const router = useRouter(); @@ -185,6 +185,7 @@ export function KanbanBoard() { onOpenChange={setDetailOpen} onUpdate={handleUpdateTask} onLaunchSession={handleLaunchSession} + onRefresh={refresh} />
); diff --git a/src/components/kanban/task-card.tsx b/src/components/kanban/task-card.tsx index 29cd648..1704afd 100644 --- a/src/components/kanban/task-card.tsx +++ b/src/components/kanban/task-card.tsx @@ -5,9 +5,10 @@ import { Draggable } from "@hello-pangea/dnd"; import { Badge } from "@/components/ui/badge"; import { Button } from "@/components/ui/button"; import { Card } from "@/components/ui/card"; -import { Trash2, GitBranch, Terminal, ChevronRight, Pause, Play } from "lucide-react"; +import { Trash2, GitBranch, Terminal, ChevronRight, Pause, Play, AlertCircle, Milestone } from "lucide-react"; import { canExecuteTaskFromCard, canPauseTaskFromCard } from "./task-card-actions"; import { isActiveSessionStatus } from "@/core/task-readiness"; +import { parseGateStatus } from "@/core/control-plane-state"; import type { Task, Session } from "@/core/types-dashboard"; import { cn } from "@/core/dashboard-utils"; import dayjs from "dayjs"; @@ -47,6 +48,8 @@ export function TaskCard({ task, index, session, onDelete, onClick, onExecute, o const isLive = isActiveSessionStatus(session?.status); const canExecute = !!onExecute && canExecuteTaskFromCard(task, session); const canPause = !!onPause && canPauseTaskFromCard(task, session); + const gateStatus = parseGateStatus(task.gate_status ?? session?.gate_status); + const currentStage = task.current_stage ?? session?.current_stage ?? null; return ( @@ -113,7 +116,22 @@ export function TaskCard({ task, index, session, onDelete, onClick, onExecute, o {task.description}

)} + {currentStage && ( +
+ + {currentStage} +
+ )}
+ {gateStatus && ( + + + needs-input + + )} Promise<{ id: string } | null>; + onRefresh?: () => Promise; } export function TaskDetailDialog({ @@ -83,6 +86,7 @@ export function TaskDetailDialog({ onOpenChange, onUpdate, onLaunchSession, + onRefresh, }: TaskDetailDialogProps) { const router = useRouter(); const [editing, setEditing] = useState(false); @@ -96,6 +100,9 @@ export function TaskDetailDialog({ const [selectedWorktree, setSelectedWorktree] = useState(""); const [codingAgentId, setCodingAgentId] = useState(DEFAULT_CODING_AGENT_ID); const [agentTeamId, setAgentTeamId] = useState(DEFAULT_AGENT_TEAM_ID); + const [gateResponse, setGateResponse] = useState(""); + const [resolvingGate, setResolvingGate] = useState(false); + const [resolvedGateId, setResolvedGateId] = useState(null); const { settings, runtimePayload, byokReady, loaded, settingsReady } = useAgentSettings(); @@ -107,6 +114,9 @@ export function TaskDetailDialog({ setPriority(task.priority); setPrompt(task.prompt ?? ""); setEditing(false); + setGateResponse(""); + setResolvingGate(false); + setResolvedGateId(null); } }, [task]); @@ -180,6 +190,30 @@ export function TaskDetailDialog({ return new Date(dateStr).toLocaleString(); }; const canLaunchSession = isTaskExecutableStatus(task.status); + const parsedGateStatus = parseGateStatus(task.gate_status); + const gateStatus = + parsedGateStatus?.id === resolvedGateId ? null : parsedGateStatus; + const currentStage = task.current_stage ?? gateStatus?.stage ?? null; + + const handleResolveGate = async (response: string) => { + const trimmed = response.trim(); + if (!task.session_id || !gateStatus || !trimmed) return; + + setResolvingGate(true); + try { + const res = await fetch(`/api/sessions/${task.session_id}`, { + method: "PATCH", + headers: { "Content-Type": "application/json" }, + body: JSON.stringify({ action: "resolve_gate", response: trimmed }), + }); + if (!res.ok) return; + setResolvedGateId(gateStatus.id); + setGateResponse(""); + await onRefresh?.(); + } finally { + setResolvingGate(false); + } + }; return ( @@ -330,6 +364,66 @@ export function TaskDetailDialog({
)} + {currentStage && ( +
+ {currentStage} +
+ )} + + {gateStatus && ( +
+
+ +
+ + needs-input + +

+ {gateStatus.question} +

+
+
+ {gateStatus.options.length > 0 ? ( +
+ {gateStatus.options.map((option) => ( + + ))} +
+ ) : ( +
+ setGateResponse(event.target.value)} + disabled={resolvingGate || !task.session_id} + className="h-8 text-xs" + /> + +
+ )} +
+ )} + {/* Metadata */}
{task.worktree_name && ( diff --git a/src/components/sessions/process-indicator.tsx b/src/components/sessions/process-indicator.tsx index ca06c02..305d977 100644 --- a/src/components/sessions/process-indicator.tsx +++ b/src/components/sessions/process-indicator.tsx @@ -1,7 +1,9 @@ "use client"; +import { AlertCircle, Milestone } from "lucide-react"; +import { Badge } from "@/components/ui/badge"; import { cn } from "@/core/dashboard-utils"; -import type { SessionStatus } from "@/core/types-dashboard"; +import type { GateStatus, SessionStatus } from "@/core/types-dashboard"; const STATUS_CONFIG: Record = { pending: { color: "bg-yellow-500", label: "Pending", pulse: true }, @@ -15,28 +17,52 @@ const STATUS_CONFIG: Record - - {config.pulse && ( +
+
+ + {config.pulse && ( + + )} + + {config.label} + {gateStatus && ( + + + needs-input + )} - - - {config.label} +
+ {currentStage && ( +
+ + {currentStage} +
+ )}
); } diff --git a/src/components/sessions/session-card.tsx b/src/components/sessions/session-card.tsx index d8c7b0e..ea2a9d1 100644 --- a/src/components/sessions/session-card.tsx +++ b/src/components/sessions/session-card.tsx @@ -13,6 +13,7 @@ import { MessageSquare, Clock, } from "lucide-react"; +import { parseGateStatus } from "@/core/control-plane-state"; import { isActiveSessionStatus } from "@/core/task-readiness"; import type { Session } from "@/core/types-dashboard"; @@ -40,13 +41,18 @@ export function SessionCard({ onDelete, }: SessionCardProps) { const isActive = isActiveSessionStatus(session.status); + const gateStatus = parseGateStatus(session.gate_status); return ( {/* Top row: status + time */}
- +
{timeAgo(session.started_at)} diff --git a/src/core/__tests__/control-plane-protocol.test.ts b/src/core/__tests__/control-plane-protocol.test.ts new file mode 100644 index 0000000..64dcd39 --- /dev/null +++ b/src/core/__tests__/control-plane-protocol.test.ts @@ -0,0 +1,150 @@ +import Database from "better-sqlite3"; +import { test } from "node:test"; +import assert from "node:assert/strict"; +import { SCHEMA } from "../db-schema"; +import { + applyControlPlaneEvent, + parseControlPlaneProtocolText, + resolveControlPlaneGate, +} from "../control-plane-protocol"; + +function makeDb(): Database.Database { + const db = new Database(":memory:"); + db.exec(SCHEMA); + return db; +} + +test("parseControlPlaneProtocolText extracts stage markers and hides protocol text", () => { + const parsed = parseControlPlaneProtocolText( + 'before\n[DEVLOG_STAGE] {"stage":"3/7","desc":"running tests"}\nafter', + ); + + assert.equal(parsed.text, "before\nafter"); + assert.deepEqual(parsed.events, [ + { + type: "stage", + current_stage: "3/7 · running tests", + }, + ]); +}); + +test("parseControlPlaneProtocolText extracts gate markers with normalized options", () => { + const parsed = parseControlPlaneProtocolText( + '[DEVLOG_GATE] {"question":"Approve plan?","options":["Approve","Revise"],"stage":"2/4 · review"}', + ); + + assert.equal(parsed.text, ""); + assert.deepEqual(parsed.events, [ + { + type: "gate", + question: "Approve plan?", + options: ["Approve", "Revise"], + stage: "2/4 · review", + }, + ]); +}); + +test("parseControlPlaneProtocolText keeps malformed markers as visible output", () => { + const text = "[DEVLOG_STAGE] not-json"; + const parsed = parseControlPlaneProtocolText(text); + + assert.equal(parsed.text, text); + assert.deepEqual(parsed.events, []); +}); + +test("applyControlPlaneEvent updates session and linked task state", () => { + const db = makeDb(); + db.prepare("INSERT INTO tasks (id, project_id, title) VALUES ('task-1', 'test', 'Task')").run(); + db.prepare( + "INSERT INTO sessions (id, project_id, task_id, status) VALUES ('session-1', 'test', 'task-1', 'running')", + ).run(); + + applyControlPlaneEvent(db, "session-1", { + type: "stage", + current_stage: "1/3 · plan", + }); + const stageSession = db + .prepare("SELECT current_stage, gate_status FROM sessions WHERE id = 'session-1'") + .get() as { current_stage: string | null; gate_status: string | null }; + const stageTask = db + .prepare("SELECT current_stage, gate_status FROM tasks WHERE id = 'task-1'") + .get() as { current_stage: string | null; gate_status: string | null }; + + assert.equal(stageSession.current_stage, "1/3 · plan"); + assert.equal(stageTask.current_stage, "1/3 · plan"); + assert.equal(stageSession.gate_status, null); + assert.equal(stageTask.gate_status, null); + + const applied = applyControlPlaneEvent( + db, + "session-1", + { + type: "gate", + question: "Ship it?", + options: ["Yes", "No"], + }, + { + now: () => new Date("2026-06-08T08:00:00.000Z"), + createId: () => "gate-test", + }, + ); + + assert.equal(applied?.taskId, "task-1"); + assert.deepEqual(applied?.gateStatus, { + id: "gate-test", + question: "Ship it?", + options: ["Yes", "No"], + created_at: "2026-06-08T08:00:00.000Z", + stage: "1/3 · plan", + }); + + const gateSession = db + .prepare("SELECT current_stage, gate_status FROM sessions WHERE id = 'session-1'") + .get() as { current_stage: string | null; gate_status: string | null }; + const gateTask = db + .prepare("SELECT current_stage, gate_status FROM tasks WHERE id = 'task-1'") + .get() as { current_stage: string | null; gate_status: string | null }; + + assert.equal(gateSession.current_stage, "1/3 · plan"); + assert.equal(gateTask.current_stage, "1/3 · plan"); + assert.equal(gateSession.gate_status, JSON.stringify(applied?.gateStatus)); + assert.equal(gateTask.gate_status, JSON.stringify(applied?.gateStatus)); +}); + +test("resolveControlPlaneGate clears gate state without clearing current stage", () => { + const db = makeDb(); + db.prepare("INSERT INTO tasks (id, project_id, title) VALUES ('task-1', 'test', 'Task')").run(); + db.prepare( + "INSERT INTO sessions (id, project_id, task_id, status) VALUES ('session-1', 'test', 'task-1', 'paused')", + ).run(); + const applied = applyControlPlaneEvent( + db, + "session-1", + { + type: "gate", + question: "Continue?", + options: ["Continue"], + stage: "2/3 · approval", + }, + { + now: () => new Date("2026-06-08T08:00:00.000Z"), + createId: () => "gate-test", + }, + ); + assert.ok(applied?.gateStatus); + + const resolved = resolveControlPlaneGate(db, "session-1"); + + assert.deepEqual(resolved?.gateStatus, applied.gateStatus); + const session = db + .prepare("SELECT current_stage, gate_status FROM sessions WHERE id = 'session-1'") + .get() as { current_stage: string | null; gate_status: string | null }; + const task = db + .prepare("SELECT current_stage, gate_status FROM tasks WHERE id = 'task-1'") + .get() as { current_stage: string | null; gate_status: string | null }; + + assert.equal(session.current_stage, "2/3 · approval"); + assert.equal(task.current_stage, "2/3 · approval"); + assert.equal(session.gate_status, null); + assert.equal(task.gate_status, null); +}); diff --git a/src/core/__tests__/control-plane-state.test.ts b/src/core/__tests__/control-plane-state.test.ts new file mode 100644 index 0000000..560a08c --- /dev/null +++ b/src/core/__tests__/control-plane-state.test.ts @@ -0,0 +1,30 @@ +import { test } from "node:test"; +import assert from "node:assert/strict"; +import { parseGateStatus } from "../control-plane-state"; + +test("parseGateStatus returns null for missing or malformed gate state", () => { + assert.equal(parseGateStatus(null), null); + assert.equal(parseGateStatus(""), null); + assert.equal(parseGateStatus("not-json"), null); + assert.equal(parseGateStatus(JSON.stringify({ id: "gate-1" })), null); +}); + +test("parseGateStatus normalizes a persisted gate status", () => { + const gate = parseGateStatus( + JSON.stringify({ + id: "gate-1", + question: "Approve?", + options: [" Yes ", "", 42, "No"], + created_at: "2026-06-08T08:00:00.000Z", + stage: "2/4 · review", + }), + ); + + assert.deepEqual(gate, { + id: "gate-1", + question: "Approve?", + options: ["Yes", "No"], + created_at: "2026-06-08T08:00:00.000Z", + stage: "2/4 · review", + }); +}); diff --git a/src/core/__tests__/db-schema.test.ts b/src/core/__tests__/db-schema.test.ts index 8e8382b..abb14a0 100644 --- a/src/core/__tests__/db-schema.test.ts +++ b/src/core/__tests__/db-schema.test.ts @@ -2,7 +2,7 @@ import Database from "better-sqlite3"; import { test } from "node:test"; import assert from "node:assert/strict"; import { makeTestDb, insertTask } from "./test-helpers"; -import { migrateTasksV2 } from "../db"; +import { migrateControlPlaneColumns, migrateTasksV2, recoverOrphanedSessions } from "../db"; test("tasks.status accepts new in_queue and fail values", () => { const db = makeTestDb(); @@ -31,6 +31,18 @@ test("tasks new columns exist with sensible defaults", () => { assert.equal(row.fail_reason, null); }); +test("tasks store control-plane stage and gate defaults", () => { + const db = makeTestDb(); + const id = insertTask(db, {}); + const row = db.prepare("SELECT current_stage, gate_status FROM tasks WHERE id = ?").get(id) as { + current_stage: string | null; + gate_status: string | null; + }; + + assert.equal(row.current_stage, null); + assert.equal(row.gate_status, null); +}); + test("sessions store agent execution and runtime auth defaults", () => { const db = makeTestDb(); const sessionId = "session-agent-defaults"; @@ -72,6 +84,27 @@ test("sessions store agent execution and runtime auth defaults", () => { assert.equal(row.agent_max_tokens, 8192); }); +test("sessions store control-plane stage and gate defaults", () => { + const db = makeTestDb(); + const sessionId = "session-control-plane-defaults"; + + db.prepare("INSERT INTO sessions (id, project_id, status) VALUES (?, ?, ?)").run( + sessionId, + "test", + "running", + ); + + const row = db + .prepare("SELECT current_stage, gate_status FROM sessions WHERE id = ?") + .get(sessionId) as { + current_stage: string | null; + gate_status: string | null; + }; + + assert.equal(row.current_stage, null); + assert.equal(row.gate_status, null); +}); + test("migrateTasksV2 preserves data and adds new columns on legacy DB", () => { const db = new Database(":memory:"); // Simulate the OLD schema (pre-1.1) @@ -97,12 +130,14 @@ test("migrateTasksV2 preserves data and adds new columns on legacy DB", () => { migrateTasksV2(db); // Data preserved - const row = db.prepare("SELECT id, title, status, blocked_by, sandbox_iterations, fail_reason FROM tasks WHERE id = 'legacy-1'").get() as any; + const row = db.prepare("SELECT id, title, status, blocked_by, sandbox_iterations, fail_reason, current_stage, gate_status FROM tasks WHERE id = 'legacy-1'").get() as any; assert.equal(row.title, "pre-existing"); assert.equal(row.status, "in_progress"); assert.equal(row.blocked_by, null); assert.equal(row.sandbox_iterations, 0); assert.equal(row.fail_reason, null); + assert.equal(row.current_stage, null); + assert.equal(row.gate_status, null); // New status values now allowed db.prepare("INSERT INTO tasks (id, title, status) VALUES ('new-1', 't', 'in_queue')").run(); @@ -118,3 +153,65 @@ test("migrateTasksV2 is idempotent", () => { const row = db.prepare("SELECT status FROM tasks WHERE id = ?").get(id) as any; assert.equal(row.status, "fail"); }); + +test("migrateControlPlaneColumns adds task and session gate columns on legacy DB", () => { + const db = new Database(":memory:"); + db.exec(` + CREATE TABLE tasks ( + id TEXT PRIMARY KEY, + title TEXT NOT NULL, + status TEXT NOT NULL DEFAULT 'todo' + ); + CREATE TABLE sessions ( + id TEXT PRIMARY KEY, + status TEXT NOT NULL DEFAULT 'running' + ); + INSERT INTO tasks (id, title, status) VALUES ('legacy-task', 'legacy task', 'todo'); + INSERT INTO sessions (id, status) VALUES ('legacy-session', 'running'); + `); + + migrateControlPlaneColumns(db); + migrateControlPlaneColumns(db); + + const task = db + .prepare("SELECT current_stage, gate_status FROM tasks WHERE id = 'legacy-task'") + .get() as { current_stage: string | null; gate_status: string | null }; + const session = db + .prepare("SELECT current_stage, gate_status FROM sessions WHERE id = 'legacy-session'") + .get() as { current_stage: string | null; gate_status: string | null }; + + assert.equal(task.current_stage, null); + assert.equal(task.gate_status, null); + assert.equal(session.current_stage, null); + assert.equal(session.gate_status, null); +}); + +test("recoverOrphanedSessions preserves paused gated sessions", () => { + const db = makeTestDb(); + const gateStatus = JSON.stringify({ + id: "gate-1", + question: "Continue?", + options: ["Continue"], + created_at: "2026-06-08T08:00:00.000Z", + stage: "2/3 approval", + }); + db.prepare( + "INSERT INTO sessions (id, project_id, status, pid, gate_status) VALUES ('gated', 'test', 'paused', 99999999, ?)", + ).run(gateStatus); + db.prepare( + "INSERT INTO sessions (id, project_id, status, pid) VALUES ('orphaned', 'test', 'running', 99999999)", + ).run(); + + recoverOrphanedSessions(db); + + const gated = db + .prepare("SELECT status, gate_status FROM sessions WHERE id = 'gated'") + .get() as { status: string; gate_status: string | null }; + const orphaned = db + .prepare("SELECT status FROM sessions WHERE id = 'orphaned'") + .get() as { status: string }; + + assert.equal(gated.status, "paused"); + assert.equal(gated.gate_status, gateStatus); + assert.equal(orphaned.status, "failed"); +}); diff --git a/src/core/__tests__/quality-gates.test.ts b/src/core/__tests__/quality-gates.test.ts index 5317406..1fad26e 100644 --- a/src/core/__tests__/quality-gates.test.ts +++ b/src/core/__tests__/quality-gates.test.ts @@ -156,3 +156,89 @@ test("sessions POST narrows arbitrary JSON before destructuring", () => { "sessions POST should not destructure arbitrary JSON via a blind type cast", ); }); + +test("sessions PATCH resolves gates through a dedicated action", () => { + const source = readRepoFile("src/app/api/sessions/[id]/route.ts"); + + assert.match( + source, + /action\?:[\s\S]*"resolve_gate"/, + "sessions PATCH action type should include resolve_gate", + ); + assert.match( + source, + /case "resolve_gate":[\s\S]*processManager\.resolveGate/, + "resolve_gate should call processManager.resolveGate instead of sendMessage", + ); + assert.doesNotMatch( + source, + /case "resolve_gate":[\s\S]*processManager\.sendMessage[\s\S]*break;/, + "resolve_gate must stay isolated from normal queued send messages", + ); +}); + +test("process manager resolves persisted gates without the normal send queue", () => { + const source = readRepoFile("src/core/process-manager.ts"); + + assert.match( + source, + /resolveGate[\s\S]*resolveControlPlaneGate/, + "resolveGate should clear persisted gate state through the control-plane helper", + ); + assert.match( + source, + /resolveGate[\s\S]*ensureProcess/, + "resolveGate should recreate a process when memory was lost after restart", + ); + assert.doesNotMatch( + source, + /resolveGate[\s\S]*messageQueues\.set/, + "resolveGate should not enqueue approval replies behind normal messages", + ); +}); + +test("kanban task surfaces render control-plane state", () => { + const card = readRepoFile("src/components/kanban/task-card.tsx"); + const dialog = readRepoFile("src/components/kanban/task-detail-dialog.tsx"); + + assert.match(card, /parseGateStatus/, "task cards should parse persisted gate state"); + assert.match(card, /needs-input/, "task cards should show a needs-input badge"); + assert.match(card, /current_stage/, "task cards should render the current stage"); + + assert.match(dialog, /parseGateStatus/, "task detail should parse persisted gate state"); + assert.match(dialog, /action:\s*"resolve_gate"/, "task detail replies should call resolve_gate"); + assert.match(dialog, /gateStatus\.options\.map/, "task detail should render gate option buttons"); +}); + +test("session surfaces render control-plane state", () => { + const indicator = readRepoFile("src/components/sessions/process-indicator.tsx"); + const card = readRepoFile("src/components/sessions/session-card.tsx"); + const detail = readRepoFile("src/app/sessions/[id]/page.tsx"); + + assert.match(indicator, /currentStage/, "process indicator should accept current stage text"); + assert.match(indicator, /needs-input/, "process indicator should render needs-input"); + assert.match(card, /parseGateStatus/, "session cards should parse gate status"); + assert.match(card, /current_stage/, "session cards should pass current stage"); + assert.match(detail, /gate_status/, "session detail should pass gate state to the header indicator"); +}); + +test("task and session lists refresh on control-plane stream events", () => { + const tasks = readRepoFile("src/hooks/use-tasks.ts"); + const sessions = readRepoFile("src/hooks/use-sessions.ts"); + + for (const [name, source] of [ + ["use-tasks", tasks], + ["use-sessions", sessions], + ] as const) { + assert.match( + source, + /new EventSource\("\/api\/devlog\/stream"\)/, + `${name} should subscribe to the global DevLog stream`, + ); + assert.match( + source, + /control_plane_stage[\s\S]*control_plane_gate[\s\S]*control_plane_gate_resolved/, + `${name} should react to all control-plane event types`, + ); + } +}); diff --git a/src/core/control-plane-protocol.ts b/src/core/control-plane-protocol.ts new file mode 100644 index 0000000..d292ef9 --- /dev/null +++ b/src/core/control-plane-protocol.ts @@ -0,0 +1,232 @@ +import type Database from "better-sqlite3"; +import { randomUUID } from "crypto"; +import type { GateStatus } from "./types-dashboard"; + +export type ControlPlaneProtocolEvent = + | { type: "stage"; current_stage: string } + | { type: "gate"; question: string; options: string[]; stage?: string }; + +export interface ParsedControlPlaneText { + text: string; + events: ControlPlaneProtocolEvent[]; +} + +export interface AppliedControlPlaneEvent { + sessionId: string; + taskId: string | null; + currentStage: string | null; + gateStatus: GateStatus | null; +} + +export interface ResolvedControlPlaneGate { + sessionId: string; + taskId: string | null; + currentStage: string | null; + gateStatus: GateStatus; +} + +const MARKER_PATTERN = /^\s*\[DEVLOG_(STAGE|GATE)\]\s+(.+?)\s*$/; + +export function parseControlPlaneProtocolText( + text: string, +): ParsedControlPlaneText { + const visibleLines: string[] = []; + const events: ControlPlaneProtocolEvent[] = []; + + for (const line of text.split(/\r?\n/)) { + const marker = MARKER_PATTERN.exec(line); + if (!marker) { + visibleLines.push(line); + continue; + } + + const event = parseProtocolMarker(marker[1], marker[2]); + if (!event) { + visibleLines.push(line); + continue; + } + + events.push(event); + } + + return { + text: trimProtocolLineGaps(visibleLines.join("\n")), + events, + }; +} + +export function applyControlPlaneEvent( + db: Database.Database, + sessionId: string, + event: ControlPlaneProtocolEvent, + opts: { + now?: () => Date; + createId?: () => string; + } = {}, +): AppliedControlPlaneEvent | null { + const session = db + .prepare("SELECT task_id, current_stage FROM sessions WHERE id = ? LIMIT 1") + .get(sessionId) as + | { task_id: string | null; current_stage: string | null } + | undefined; + + if (!session) return null; + + if (event.type === "stage") { + db.prepare("UPDATE sessions SET current_stage = ? WHERE id = ?").run( + event.current_stage, + sessionId, + ); + if (session.task_id) { + db.prepare( + "UPDATE tasks SET current_stage = ?, updated_at = datetime('now') WHERE id = ?", + ).run(event.current_stage, session.task_id); + } + return { + sessionId, + taskId: session.task_id, + currentStage: event.current_stage, + gateStatus: null, + }; + } + + const currentStage = event.stage ?? session.current_stage ?? null; + const gateStatus: GateStatus = { + id: opts.createId?.() ?? `gate_${randomUUID()}`, + question: event.question, + options: event.options, + created_at: (opts.now?.() ?? new Date()).toISOString(), + stage: currentStage, + }; + const serializedGate = JSON.stringify(gateStatus); + + db.prepare( + "UPDATE sessions SET current_stage = ?, gate_status = ? WHERE id = ?", + ).run(currentStage, serializedGate, sessionId); + if (session.task_id) { + db.prepare( + "UPDATE tasks SET current_stage = ?, gate_status = ?, updated_at = datetime('now') WHERE id = ?", + ).run(currentStage, serializedGate, session.task_id); + } + + return { + sessionId, + taskId: session.task_id, + currentStage, + gateStatus, + }; +} + +export function resolveControlPlaneGate( + db: Database.Database, + sessionId: string, +): ResolvedControlPlaneGate | null { + const session = db + .prepare("SELECT task_id, current_stage, gate_status FROM sessions WHERE id = ? LIMIT 1") + .get(sessionId) as + | { task_id: string | null; current_stage: string | null; gate_status: string | null } + | undefined; + + if (!session?.gate_status) return null; + const gateStatus = parseGateStatus(session.gate_status); + if (!gateStatus) return null; + + db.prepare("UPDATE sessions SET gate_status = NULL WHERE id = ?").run(sessionId); + if (session.task_id) { + db.prepare( + "UPDATE tasks SET gate_status = NULL, updated_at = datetime('now') WHERE id = ?", + ).run(session.task_id); + } + + return { + sessionId, + taskId: session.task_id, + currentStage: session.current_stage, + gateStatus, + }; +} + +function parseProtocolMarker( + markerType: string, + rawJson: string, +): ControlPlaneProtocolEvent | null { + let payload: unknown; + try { + payload = JSON.parse(rawJson); + } catch { + return null; + } + if (!isRecord(payload)) return null; + + if (markerType === "STAGE") { + const stage = readNonEmptyString(payload.current_stage) ?? readNonEmptyString(payload.stage); + if (!stage) return null; + const desc = readNonEmptyString(payload.desc) ?? readNonEmptyString(payload.description); + return { + type: "stage", + current_stage: desc ? `${stage} · ${desc}` : stage, + }; + } + + if (markerType === "GATE") { + const question = readNonEmptyString(payload.question); + if (!question) return null; + const stage = readStageDisplay(payload); + return { + type: "gate", + question, + options: readOptions(payload.options), + ...(stage ? { stage } : {}), + }; + } + + return null; +} + +function readStageDisplay(payload: Record): string | null { + const stage = readNonEmptyString(payload.current_stage) ?? readNonEmptyString(payload.stage); + if (!stage) return null; + const desc = readNonEmptyString(payload.desc) ?? readNonEmptyString(payload.description); + return desc ? `${stage} · ${desc}` : stage; +} + +function readOptions(value: unknown): string[] { + if (!Array.isArray(value)) return []; + return value + .map((option) => (typeof option === "string" ? option.trim() : "")) + .filter(Boolean); +} + +function readNonEmptyString(value: unknown): string | null { + return typeof value === "string" && value.trim() ? value.trim() : null; +} + +function isRecord(value: unknown): value is Record { + return Boolean(value) && typeof value === "object" && !Array.isArray(value); +} + +function trimProtocolLineGaps(text: string): string { + return text.replace(/\n{3,}/g, "\n\n").replace(/^\n+|\n+$/g, ""); +} + +function parseGateStatus(value: string): GateStatus | null { + let parsed: unknown; + try { + parsed = JSON.parse(value); + } catch { + return null; + } + if (!isRecord(parsed)) return null; + + const id = readNonEmptyString(parsed.id); + const question = readNonEmptyString(parsed.question); + if (!id || !question) return null; + + return { + id, + question, + options: readOptions(parsed.options), + created_at: readNonEmptyString(parsed.created_at) ?? new Date(0).toISOString(), + stage: readNonEmptyString(parsed.stage), + }; +} diff --git a/src/core/control-plane-state.ts b/src/core/control-plane-state.ts new file mode 100644 index 0000000..f01fa50 --- /dev/null +++ b/src/core/control-plane-state.ts @@ -0,0 +1,39 @@ +import type { GateStatus } from "./types-dashboard"; + +export function parseGateStatus( + value: string | null | undefined, +): GateStatus | null { + if (!value) return null; + + let parsed: unknown; + try { + parsed = JSON.parse(value); + } catch { + return null; + } + if (!isRecord(parsed)) return null; + + const id = readString(parsed.id); + const question = readString(parsed.question); + if (!id || !question) return null; + + return { + id, + question, + options: Array.isArray(parsed.options) + ? parsed.options + .map((option) => (typeof option === "string" ? option.trim() : "")) + .filter(Boolean) + : [], + created_at: readString(parsed.created_at) ?? "", + stage: readString(parsed.stage), + }; +} + +function readString(value: unknown): string | null { + return typeof value === "string" && value.trim() ? value.trim() : null; +} + +function isRecord(value: unknown): value is Record { + return Boolean(value) && typeof value === "object" && !Array.isArray(value); +} diff --git a/src/core/db-pool.ts b/src/core/db-pool.ts index a47040e..81d8c88 100644 --- a/src/core/db-pool.ts +++ b/src/core/db-pool.ts @@ -4,7 +4,7 @@ import { dirname, join } from "node:path"; import { homedir } from "node:os"; import { REGISTRY_SCHEMA } from "./db-schema-registry"; import { SCHEMA } from "./db-schema"; -import { migrateTasksV2 } from "./db"; +import { migrateControlPlaneColumns, migrateTasksV2 } from "./db"; export interface DbPool { getRegistry(): Database.Database; @@ -49,6 +49,7 @@ export function createDbPool(opts: DbPoolOptions = {}): DbPool { db.pragma("foreign_keys = ON"); db.exec(SCHEMA); migrateTasksV2(db); + migrateControlPlaneColumns(db); projectDbs.set(projectId, db); return db; } diff --git a/src/core/db-schema.ts b/src/core/db-schema.ts index a8ec057..367b6ad 100644 --- a/src/core/db-schema.ts +++ b/src/core/db-schema.ts @@ -31,6 +31,8 @@ CREATE TABLE IF NOT EXISTS tasks ( blocked_by TEXT, sandbox_iterations INTEGER NOT NULL DEFAULT 0, fail_reason TEXT, + current_stage TEXT, + gate_status TEXT, created_at TEXT NOT NULL DEFAULT (datetime('now')), updated_at TEXT NOT NULL DEFAULT (datetime('now')), completed_at TEXT @@ -58,6 +60,8 @@ CREATE TABLE IF NOT EXISTS sessions ( agent_api_version TEXT NOT NULL DEFAULT '', agent_base_url TEXT NOT NULL DEFAULT '${DEFAULT_API_BASE_URL}', agent_max_tokens INTEGER NOT NULL DEFAULT ${DEFAULT_API_MAX_TOKENS}, + current_stage TEXT, + gate_status TEXT, prompt TEXT, exit_code INTEGER, log_path TEXT, diff --git a/src/core/db.ts b/src/core/db.ts index 76b54ad..8df8394 100644 --- a/src/core/db.ts +++ b/src/core/db.ts @@ -37,6 +37,7 @@ export function getDb(): Database.Database { _db.pragma("foreign_keys = ON"); _db.exec(SCHEMA); migrateTasksV2(_db); + migrateControlPlaneColumns(_db); // Migrate: add claude_session_id column if missing try { @@ -70,6 +71,8 @@ export function getDb(): Database.Database { agent_api_version TEXT NOT NULL DEFAULT '', agent_base_url TEXT NOT NULL DEFAULT '${DEFAULT_API_BASE_URL}', agent_max_tokens INTEGER NOT NULL DEFAULT ${DEFAULT_API_MAX_TOKENS}, + current_stage TEXT, + gate_status TEXT, prompt TEXT, exit_code INTEGER, log_path TEXT, started_at TEXT NOT NULL DEFAULT (datetime('now')), @@ -173,6 +176,7 @@ export function getDb(): Database.Database { } catch { // Column already exists } + migrateControlPlaneColumns(_db); // Migrate: update tasks CHECK constraint to include 'review' and 'blocked' try { @@ -191,6 +195,8 @@ export function getDb(): Database.Database { session_id TEXT, sort_order INTEGER NOT NULL DEFAULT 0, prompt TEXT, + current_stage TEXT, + gate_status TEXT, created_at TEXT NOT NULL DEFAULT (datetime('now')), updated_at TEXT NOT NULL DEFAULT (datetime('now')), completed_at TEXT @@ -226,6 +232,12 @@ export function migrateTasksV2(db: Database.Database): void { if (!has("fail_reason")) { db.exec("ALTER TABLE tasks ADD COLUMN fail_reason TEXT"); } + if (!has("current_stage")) { + db.exec("ALTER TABLE tasks ADD COLUMN current_stage TEXT"); + } + if (!has("gate_status")) { + db.exec("ALTER TABLE tasks ADD COLUMN gate_status TEXT"); + } // Status CHECK widening: SQLite cannot ALTER CHECK; recreate the table only if old CHECK is detected. const stmt = db.prepare("SELECT sql FROM sqlite_master WHERE type='table' AND name='tasks'").get() as { sql: string } | undefined; @@ -245,12 +257,14 @@ export function migrateTasksV2(db: Database.Database): void { blocked_by TEXT, sandbox_iterations INTEGER NOT NULL DEFAULT 0, fail_reason TEXT, + current_stage TEXT, + gate_status TEXT, created_at TEXT NOT NULL DEFAULT (datetime('now')), updated_at TEXT NOT NULL DEFAULT (datetime('now')), completed_at TEXT ); - INSERT INTO tasks_new (id, project_id, title, description, status, priority, worktree_name, session_id, sort_order, prompt, blocked_by, sandbox_iterations, fail_reason, created_at, updated_at, completed_at) - SELECT id, project_id, title, description, status, priority, worktree_name, session_id, sort_order, prompt, NULL, 0, NULL, created_at, updated_at, completed_at + INSERT INTO tasks_new (id, project_id, title, description, status, priority, worktree_name, session_id, sort_order, prompt, blocked_by, sandbox_iterations, fail_reason, current_stage, gate_status, created_at, updated_at, completed_at) + SELECT id, project_id, title, description, status, priority, worktree_name, session_id, sort_order, prompt, blocked_by, sandbox_iterations, fail_reason, current_stage, gate_status, created_at, updated_at, completed_at FROM tasks; DROP TABLE tasks; ALTER TABLE tasks_new RENAME TO tasks; @@ -261,10 +275,31 @@ export function migrateTasksV2(db: Database.Database): void { } } -function recoverOrphanedSessions(db: Database.Database): void { +export function migrateControlPlaneColumns(db: Database.Database): void { + for (const table of ["tasks", "sessions"]) { + const cols = db.prepare(`PRAGMA table_info(${table})`).all() as Array<{ name: string }>; + const has = (name: string) => cols.some((col) => col.name === name); + + if (!has("current_stage")) { + db.exec(`ALTER TABLE ${table} ADD COLUMN current_stage TEXT`); + } + if (!has("gate_status")) { + db.exec(`ALTER TABLE ${table} ADD COLUMN gate_status TEXT`); + } + } +} + +export function recoverOrphanedSessions(db: Database.Database): void { const orphaned = db - .prepare("SELECT id, pid FROM sessions WHERE status IN ('running', 'idle', 'paused', 'pending')") - .all() as { id: string; pid: number | null }[]; + .prepare( + "SELECT id, pid, status, gate_status FROM sessions WHERE status IN ('running', 'idle', 'paused', 'pending')", + ) + .all() as { + id: string; + pid: number | null; + status: string; + gate_status: string | null; + }[]; for (const session of orphaned) { let alive = false; @@ -278,6 +313,9 @@ function recoverOrphanedSessions(db: Database.Database): void { } if (!alive) { + if (session.status === "paused" && session.gate_status) { + continue; + } db.prepare( "UPDATE sessions SET status = 'failed', ended_at = datetime('now') WHERE id = ?" ).run(session.id); diff --git a/src/core/process-manager.ts b/src/core/process-manager.ts index 69bc443..60a342e 100644 --- a/src/core/process-manager.ts +++ b/src/core/process-manager.ts @@ -9,6 +9,13 @@ import { type SystemLogLevel, type ToolCall, } from "./stream-manager"; +import { + applyControlPlaneEvent, + parseControlPlaneProtocolText, + resolveControlPlaneGate, + type ControlPlaneProtocolEvent, + type ResolvedControlPlaneGate, +} from "./control-plane-protocol"; import { markSessionFailedAndReleaseLinkedTask, onSessionExit, @@ -1218,6 +1225,104 @@ class ProcessManager { }); } + resolveGate( + sessionId: string, + response: string, + ): { ok: true } | { ok: false; error: string } { + const trimmed = response.trim(); + if (!trimmed) { + return { ok: false, error: "response is required" }; + } + + const db = getDb(); + const resolved = resolveControlPlaneGate(db, sessionId); + if (!resolved) { + return { ok: false, error: "no pending gate" }; + } + + const insertResult = db.prepare( + "INSERT INTO session_messages (session_id, role, content) VALUES (?, 'user', ?)", + ).run(sessionId, trimmed); + const messageId = Number(insertResult.lastInsertRowid); + + streamManager.emit(sessionId, { + type: "message", + id: messageId, + role: "user", + content: trimmed, + }); + + let sp = this.sessions.get(sessionId); + if (!sp || sp.proc.killed) { + sp = this.ensureProcess(sessionId) ?? undefined; + } + + if (sp && !sp.proc.killed) { + const requestId = sp.pendingPermission?.requestId ?? resolved.gateStatus.id; + sp.pendingPermission = null; + if (sp.paused) { + sp.proc.kill("SIGCONT"); + sp.paused = false; + } + sp.isProcessing = true; + this.writeGateResponse(sp, trimmed); + streamManager.emit(sessionId, { + type: "permission_resolved", + request_id: requestId, + approved: true, + }); + } + + db.prepare( + "UPDATE sessions SET status = 'running' WHERE id = ? AND status = 'paused'", + ).run(sessionId); + + const resolvedEvent = { + type: "control_plane_gate_resolved" as const, + session_id: sessionId, + task_id: resolved.taskId, + gate_id: resolved.gateStatus.id, + response: trimmed, + }; + streamManager.emit(sessionId, resolvedEvent); + streamManager.emit("global", resolvedEvent); + streamManager.emit(sessionId, { type: "status", status: "running" }); + streamManager.emit("global", { type: "status", status: "running" }); + + return { ok: true }; + } + + private writeGateResponse(sp: SessionProcess, response: string): void { + if (!sp.proc.stdin || sp.proc.stdin.destroyed || sp.proc.stdin.writableEnded) { + this.emitSystemLog( + "warning", + sp.sessionId, + "Gate resolved, but agent stdin is no longer writable.", + ); + return; + } + + try { + if (sp.inputProtocol === "claude-stream-json") { + const inputMsg = JSON.stringify({ + type: "user", + message: { role: "user", content: response }, + session_id: sp.claudeSessionId ?? "default", + parent_tool_use_id: null, + }); + sp.proc.stdin.write(inputMsg + "\n"); + return; + } + sp.proc.stdin.write(response + "\n"); + } catch (err) { + this.emitSystemLog( + "warning", + sp.sessionId, + `Failed to write gate response: ${err instanceof Error ? err.message : String(err)}`, + ); + } + } + private handleGenericOutputChunk( sessionId: string, sp: SessionProcess, @@ -1533,11 +1638,105 @@ class ProcessManager { text: string, ): void { if (!text) return; - sp.textBuffer += text; + const parsed = parseControlPlaneProtocolText(text); + for (const event of parsed.events) { + this.handleControlPlaneEvent(sessionId, event); + } + + if (!parsed.text) return; + sp.textBuffer += parsed.text; streamManager.emit(sessionId, { type: "text_delta", - text, + text: parsed.text, + }); + } + + private handleControlPlaneEvent( + sessionId: string, + event: ControlPlaneProtocolEvent, + ): void { + let applied: ReturnType = null; + try { + applied = applyControlPlaneEvent(getDb(), sessionId, event); + } catch (error) { + this.emitSystemLog( + "warning", + sessionId, + `Ignored DevLog control marker: ${error instanceof Error ? error.message : String(error)}`, + ); + return; + } + if (!applied) return; + + if (event.type === "stage") { + const streamEvent = { + type: "control_plane_stage" as const, + session_id: sessionId, + task_id: applied.taskId, + current_stage: event.current_stage, + }; + streamManager.emit(sessionId, streamEvent); + streamManager.emit("global", streamEvent); + return; + } + + if (applied.gateStatus) { + const streamEvent = { + type: "control_plane_gate" as const, + session_id: sessionId, + task_id: applied.taskId, + current_stage: applied.currentStage, + gate_status: applied.gateStatus, + }; + streamManager.emit(sessionId, streamEvent); + streamManager.emit("global", streamEvent); + this.pauseForControlPlaneGate(sessionId, { + sessionId, + taskId: applied.taskId, + currentStage: applied.currentStage, + gateStatus: applied.gateStatus, + }); + } + } + + private pauseForControlPlaneGate( + sessionId: string, + gate: ResolvedControlPlaneGate, + ): void { + const toolInput = { + question: gate.gateStatus.question, + options: gate.gateStatus.options, + stage: gate.gateStatus.stage ?? null, + }; + const sp = this.sessions.get(sessionId); + if (sp && !sp.proc.killed) { + sp.pendingPermission = { + requestId: gate.gateStatus.id, + toolName: "AskHuman", + toolInput, + }; + sp.proc.kill("SIGSTOP"); + sp.paused = true; + sp.isProcessing = false; + } + + const db = getDb(); + db.prepare( + "UPDATE sessions SET status = 'paused' WHERE id = ? AND status NOT IN ('completed', 'failed', 'killed')", + ).run(sessionId); + streamManager.emit(sessionId, { + type: "permission_request", + tool_name: "AskHuman", + tool_input: toolInput, + request_id: gate.gateStatus.id, }); + streamManager.emit(sessionId, { type: "status", status: "paused" }); + streamManager.emit("global", { type: "status", status: "paused" }); + this.emitSystemLog( + "info", + sessionId, + `Session ${sessionId.slice(0, 8)} paused for human gate`, + ); } /** @@ -1616,11 +1815,7 @@ class ProcessManager { for (const block of msg.content) { if (block.type === "text" && block.text) { - sp.textBuffer += block.text; - streamManager.emit(sessionId, { - type: "text_delta", - text: block.text, - }); + this.emitTextDelta(sessionId, sp, block.text); } else if (block.type === "tool_use" && block.name) { const toolCall: ToolCall = { name: block.name, @@ -1641,11 +1836,7 @@ class ProcessManager { if (type === "content_block_delta") { const delta = event.delta as { type?: string; text?: string } | undefined; if (delta?.type === "text_delta" && delta.text) { - sp.textBuffer += delta.text; - streamManager.emit(sessionId, { - type: "text_delta", - text: delta.text, - }); + this.emitTextDelta(sessionId, sp, delta.text); } return; } diff --git a/src/core/stream-manager.ts b/src/core/stream-manager.ts index 617254b..727b6a5 100644 --- a/src/core/stream-manager.ts +++ b/src/core/stream-manager.ts @@ -1,3 +1,5 @@ +import type { GateStatus } from "./types-dashboard"; + export interface ToolCall { name: string; input: Record; @@ -32,7 +34,27 @@ export type ChatStreamEvent = | { type: "permission_request"; tool_name: string; tool_input: Record; request_id: string } | { type: "permission_resolved"; request_id: string; approved: boolean } | { type: "message_queued"; content: string; position: number } - | { type: "queue_drained"; remaining: number }; + | { type: "queue_drained"; remaining: number } + | { + type: "control_plane_stage"; + session_id: string; + task_id: string | null; + current_stage: string; + } + | { + type: "control_plane_gate"; + session_id: string; + task_id: string | null; + current_stage: string | null; + gate_status: GateStatus; + } + | { + type: "control_plane_gate_resolved"; + session_id: string; + task_id: string | null; + gate_id: string; + response: string; + }; export function createSystemLogEvent({ level, diff --git a/src/core/types-dashboard.ts b/src/core/types-dashboard.ts index 3a2ec1d..4d3a4c4 100644 --- a/src/core/types-dashboard.ts +++ b/src/core/types-dashboard.ts @@ -25,6 +25,8 @@ export interface Task { blocked_by?: string | null; // JSON array of task ids when status === 'in_queue' sandbox_iterations?: number; fail_reason?: string | null; + current_stage?: string | null; + gate_status?: string | null; // JSON-encoded GateStatus when a workflow is awaiting input } export type SessionStatus = @@ -62,6 +64,8 @@ export interface Session { agent_api_version: string; agent_base_url: string; agent_max_tokens: number; + current_stage?: string | null; + gate_status?: string | null; prompt: string | null; exit_code: number | null; log_path: string | null; @@ -69,6 +73,14 @@ export interface Session { ended_at: string | null; } +export interface GateStatus { + id: string; + question: string; + options: string[]; + created_at: string; + stage?: string | null; +} + /** A structured tool call from Claude's response */ export interface ToolCall { name: string; diff --git a/src/hooks/use-sessions.ts b/src/hooks/use-sessions.ts index dddb1f4..7cfe2a0 100644 --- a/src/hooks/use-sessions.ts +++ b/src/hooks/use-sessions.ts @@ -3,6 +3,15 @@ import { useState, useEffect, useCallback } from "react"; import type { Session } from "@/core/types-dashboard"; import type { SessionRuntimeAuthInput } from "@/core/session-runtime-auth"; +import type { ChatStreamEvent } from "@/core/stream-manager"; + +function shouldRefreshSessionsForEvent(event: ChatStreamEvent): boolean { + return ( + event.type === "control_plane_stage" || + event.type === "control_plane_gate" || + event.type === "control_plane_gate_resolved" + ); +} export function useSessions() { const [sessions, setSessions] = useState([]); @@ -25,6 +34,25 @@ export function useSessions() { return () => clearInterval(interval); }, [fetchSessions]); + useEffect(() => { + const source = new EventSource("/api/devlog/stream"); + source.onmessage = (message) => { + let event: ChatStreamEvent; + try { + event = JSON.parse(message.data) as ChatStreamEvent; + } catch { + return; + } + if (shouldRefreshSessionsForEvent(event)) { + fetchSessions(); + } + }; + + return () => { + source.close(); + }; + }, [fetchSessions]); + const launchSession = async (data: { task_id?: string; worktree_name?: string; diff --git a/src/hooks/use-tasks.ts b/src/hooks/use-tasks.ts index b367cf8..d775716 100644 --- a/src/hooks/use-tasks.ts +++ b/src/hooks/use-tasks.ts @@ -3,6 +3,15 @@ import { useState, useEffect, useCallback } from "react"; import type { Task, TaskStatus, TaskPriority, Session } from "@/core/types-dashboard"; import type { SessionRuntimeAuthInput } from "@/core/session-runtime-auth"; +import type { ChatStreamEvent } from "@/core/stream-manager"; + +function shouldRefreshTasksForEvent(event: ChatStreamEvent): boolean { + return ( + event.type === "control_plane_stage" || + event.type === "control_plane_gate" || + event.type === "control_plane_gate_resolved" + ); +} export function useTasks() { const [tasks, setTasks] = useState([]); @@ -25,6 +34,25 @@ export function useTasks() { return () => clearInterval(interval); }, [fetchTasks]); + useEffect(() => { + const source = new EventSource("/api/devlog/stream"); + source.onmessage = (message) => { + let event: ChatStreamEvent; + try { + event = JSON.parse(message.data) as ChatStreamEvent; + } catch { + return; + } + if (shouldRefreshTasksForEvent(event)) { + fetchTasks(); + } + }; + + return () => { + source.close(); + }; + }, [fetchTasks]); + const createTask = async (data: { title: string; description?: string;