-
+
{timeAgo(session.started_at)}
diff --git a/src/core/__tests__/control-plane-protocol.test.ts b/src/core/__tests__/control-plane-protocol.test.ts
new file mode 100644
index 0000000..64dcd39
--- /dev/null
+++ b/src/core/__tests__/control-plane-protocol.test.ts
@@ -0,0 +1,150 @@
+import Database from "better-sqlite3";
+import { test } from "node:test";
+import assert from "node:assert/strict";
+import { SCHEMA } from "../db-schema";
+import {
+ applyControlPlaneEvent,
+ parseControlPlaneProtocolText,
+ resolveControlPlaneGate,
+} from "../control-plane-protocol";
+
+function makeDb(): Database.Database {
+ const db = new Database(":memory:");
+ db.exec(SCHEMA);
+ return db;
+}
+
+test("parseControlPlaneProtocolText extracts stage markers and hides protocol text", () => {
+ const parsed = parseControlPlaneProtocolText(
+ 'before\n[DEVLOG_STAGE] {"stage":"3/7","desc":"running tests"}\nafter',
+ );
+
+ assert.equal(parsed.text, "before\nafter");
+ assert.deepEqual(parsed.events, [
+ {
+ type: "stage",
+ current_stage: "3/7 · running tests",
+ },
+ ]);
+});
+
+test("parseControlPlaneProtocolText extracts gate markers with normalized options", () => {
+ const parsed = parseControlPlaneProtocolText(
+ '[DEVLOG_GATE] {"question":"Approve plan?","options":["Approve","Revise"],"stage":"2/4 · review"}',
+ );
+
+ assert.equal(parsed.text, "");
+ assert.deepEqual(parsed.events, [
+ {
+ type: "gate",
+ question: "Approve plan?",
+ options: ["Approve", "Revise"],
+ stage: "2/4 · review",
+ },
+ ]);
+});
+
+test("parseControlPlaneProtocolText keeps malformed markers as visible output", () => {
+ const text = "[DEVLOG_STAGE] not-json";
+ const parsed = parseControlPlaneProtocolText(text);
+
+ assert.equal(parsed.text, text);
+ assert.deepEqual(parsed.events, []);
+});
+
+test("applyControlPlaneEvent updates session and linked task state", () => {
+ const db = makeDb();
+ db.prepare("INSERT INTO tasks (id, project_id, title) VALUES ('task-1', 'test', 'Task')").run();
+ db.prepare(
+ "INSERT INTO sessions (id, project_id, task_id, status) VALUES ('session-1', 'test', 'task-1', 'running')",
+ ).run();
+
+ applyControlPlaneEvent(db, "session-1", {
+ type: "stage",
+ current_stage: "1/3 · plan",
+ });
+ const stageSession = db
+ .prepare("SELECT current_stage, gate_status FROM sessions WHERE id = 'session-1'")
+ .get() as { current_stage: string | null; gate_status: string | null };
+ const stageTask = db
+ .prepare("SELECT current_stage, gate_status FROM tasks WHERE id = 'task-1'")
+ .get() as { current_stage: string | null; gate_status: string | null };
+
+ assert.equal(stageSession.current_stage, "1/3 · plan");
+ assert.equal(stageTask.current_stage, "1/3 · plan");
+ assert.equal(stageSession.gate_status, null);
+ assert.equal(stageTask.gate_status, null);
+
+ const applied = applyControlPlaneEvent(
+ db,
+ "session-1",
+ {
+ type: "gate",
+ question: "Ship it?",
+ options: ["Yes", "No"],
+ },
+ {
+ now: () => new Date("2026-06-08T08:00:00.000Z"),
+ createId: () => "gate-test",
+ },
+ );
+
+ assert.equal(applied?.taskId, "task-1");
+ assert.deepEqual(applied?.gateStatus, {
+ id: "gate-test",
+ question: "Ship it?",
+ options: ["Yes", "No"],
+ created_at: "2026-06-08T08:00:00.000Z",
+ stage: "1/3 · plan",
+ });
+
+ const gateSession = db
+ .prepare("SELECT current_stage, gate_status FROM sessions WHERE id = 'session-1'")
+ .get() as { current_stage: string | null; gate_status: string | null };
+ const gateTask = db
+ .prepare("SELECT current_stage, gate_status FROM tasks WHERE id = 'task-1'")
+ .get() as { current_stage: string | null; gate_status: string | null };
+
+ assert.equal(gateSession.current_stage, "1/3 · plan");
+ assert.equal(gateTask.current_stage, "1/3 · plan");
+ assert.equal(gateSession.gate_status, JSON.stringify(applied?.gateStatus));
+ assert.equal(gateTask.gate_status, JSON.stringify(applied?.gateStatus));
+});
+
+test("resolveControlPlaneGate clears gate state without clearing current stage", () => {
+ const db = makeDb();
+ db.prepare("INSERT INTO tasks (id, project_id, title) VALUES ('task-1', 'test', 'Task')").run();
+ db.prepare(
+ "INSERT INTO sessions (id, project_id, task_id, status) VALUES ('session-1', 'test', 'task-1', 'paused')",
+ ).run();
+ const applied = applyControlPlaneEvent(
+ db,
+ "session-1",
+ {
+ type: "gate",
+ question: "Continue?",
+ options: ["Continue"],
+ stage: "2/3 · approval",
+ },
+ {
+ now: () => new Date("2026-06-08T08:00:00.000Z"),
+ createId: () => "gate-test",
+ },
+ );
+ assert.ok(applied?.gateStatus);
+
+ const resolved = resolveControlPlaneGate(db, "session-1");
+
+ assert.deepEqual(resolved?.gateStatus, applied.gateStatus);
+ const session = db
+ .prepare("SELECT current_stage, gate_status FROM sessions WHERE id = 'session-1'")
+ .get() as { current_stage: string | null; gate_status: string | null };
+ const task = db
+ .prepare("SELECT current_stage, gate_status FROM tasks WHERE id = 'task-1'")
+ .get() as { current_stage: string | null; gate_status: string | null };
+
+ assert.equal(session.current_stage, "2/3 · approval");
+ assert.equal(task.current_stage, "2/3 · approval");
+ assert.equal(session.gate_status, null);
+ assert.equal(task.gate_status, null);
+});
diff --git a/src/core/__tests__/control-plane-state.test.ts b/src/core/__tests__/control-plane-state.test.ts
new file mode 100644
index 0000000..560a08c
--- /dev/null
+++ b/src/core/__tests__/control-plane-state.test.ts
@@ -0,0 +1,30 @@
+import { test } from "node:test";
+import assert from "node:assert/strict";
+import { parseGateStatus } from "../control-plane-state";
+
+test("parseGateStatus returns null for missing or malformed gate state", () => {
+ assert.equal(parseGateStatus(null), null);
+ assert.equal(parseGateStatus(""), null);
+ assert.equal(parseGateStatus("not-json"), null);
+ assert.equal(parseGateStatus(JSON.stringify({ id: "gate-1" })), null);
+});
+
+test("parseGateStatus normalizes a persisted gate status", () => {
+ const gate = parseGateStatus(
+ JSON.stringify({
+ id: "gate-1",
+ question: "Approve?",
+ options: [" Yes ", "", 42, "No"],
+ created_at: "2026-06-08T08:00:00.000Z",
+ stage: "2/4 · review",
+ }),
+ );
+
+ assert.deepEqual(gate, {
+ id: "gate-1",
+ question: "Approve?",
+ options: ["Yes", "No"],
+ created_at: "2026-06-08T08:00:00.000Z",
+ stage: "2/4 · review",
+ });
+});
diff --git a/src/core/__tests__/db-schema.test.ts b/src/core/__tests__/db-schema.test.ts
index 8e8382b..abb14a0 100644
--- a/src/core/__tests__/db-schema.test.ts
+++ b/src/core/__tests__/db-schema.test.ts
@@ -2,7 +2,7 @@ import Database from "better-sqlite3";
import { test } from "node:test";
import assert from "node:assert/strict";
import { makeTestDb, insertTask } from "./test-helpers";
-import { migrateTasksV2 } from "../db";
+import { migrateControlPlaneColumns, migrateTasksV2, recoverOrphanedSessions } from "../db";
test("tasks.status accepts new in_queue and fail values", () => {
const db = makeTestDb();
@@ -31,6 +31,18 @@ test("tasks new columns exist with sensible defaults", () => {
assert.equal(row.fail_reason, null);
});
+test("tasks store control-plane stage and gate defaults", () => {
+ const db = makeTestDb();
+ const id = insertTask(db, {});
+ const row = db.prepare("SELECT current_stage, gate_status FROM tasks WHERE id = ?").get(id) as {
+ current_stage: string | null;
+ gate_status: string | null;
+ };
+
+ assert.equal(row.current_stage, null);
+ assert.equal(row.gate_status, null);
+});
+
test("sessions store agent execution and runtime auth defaults", () => {
const db = makeTestDb();
const sessionId = "session-agent-defaults";
@@ -72,6 +84,27 @@ test("sessions store agent execution and runtime auth defaults", () => {
assert.equal(row.agent_max_tokens, 8192);
});
+test("sessions store control-plane stage and gate defaults", () => {
+ const db = makeTestDb();
+ const sessionId = "session-control-plane-defaults";
+
+ db.prepare("INSERT INTO sessions (id, project_id, status) VALUES (?, ?, ?)").run(
+ sessionId,
+ "test",
+ "running",
+ );
+
+ const row = db
+ .prepare("SELECT current_stage, gate_status FROM sessions WHERE id = ?")
+ .get(sessionId) as {
+ current_stage: string | null;
+ gate_status: string | null;
+ };
+
+ assert.equal(row.current_stage, null);
+ assert.equal(row.gate_status, null);
+});
+
test("migrateTasksV2 preserves data and adds new columns on legacy DB", () => {
const db = new Database(":memory:");
// Simulate the OLD schema (pre-1.1)
@@ -97,12 +130,14 @@ test("migrateTasksV2 preserves data and adds new columns on legacy DB", () => {
migrateTasksV2(db);
// Data preserved
- const row = db.prepare("SELECT id, title, status, blocked_by, sandbox_iterations, fail_reason FROM tasks WHERE id = 'legacy-1'").get() as any;
+ const row = db.prepare("SELECT id, title, status, blocked_by, sandbox_iterations, fail_reason, current_stage, gate_status FROM tasks WHERE id = 'legacy-1'").get() as any;
assert.equal(row.title, "pre-existing");
assert.equal(row.status, "in_progress");
assert.equal(row.blocked_by, null);
assert.equal(row.sandbox_iterations, 0);
assert.equal(row.fail_reason, null);
+ assert.equal(row.current_stage, null);
+ assert.equal(row.gate_status, null);
// New status values now allowed
db.prepare("INSERT INTO tasks (id, title, status) VALUES ('new-1', 't', 'in_queue')").run();
@@ -118,3 +153,65 @@ test("migrateTasksV2 is idempotent", () => {
const row = db.prepare("SELECT status FROM tasks WHERE id = ?").get(id) as any;
assert.equal(row.status, "fail");
});
+
+test("migrateControlPlaneColumns adds task and session gate columns on legacy DB", () => {
+ const db = new Database(":memory:");
+ db.exec(`
+ CREATE TABLE tasks (
+ id TEXT PRIMARY KEY,
+ title TEXT NOT NULL,
+ status TEXT NOT NULL DEFAULT 'todo'
+ );
+ CREATE TABLE sessions (
+ id TEXT PRIMARY KEY,
+ status TEXT NOT NULL DEFAULT 'running'
+ );
+ INSERT INTO tasks (id, title, status) VALUES ('legacy-task', 'legacy task', 'todo');
+ INSERT INTO sessions (id, status) VALUES ('legacy-session', 'running');
+ `);
+
+ migrateControlPlaneColumns(db);
+ migrateControlPlaneColumns(db);
+
+ const task = db
+ .prepare("SELECT current_stage, gate_status FROM tasks WHERE id = 'legacy-task'")
+ .get() as { current_stage: string | null; gate_status: string | null };
+ const session = db
+ .prepare("SELECT current_stage, gate_status FROM sessions WHERE id = 'legacy-session'")
+ .get() as { current_stage: string | null; gate_status: string | null };
+
+ assert.equal(task.current_stage, null);
+ assert.equal(task.gate_status, null);
+ assert.equal(session.current_stage, null);
+ assert.equal(session.gate_status, null);
+});
+
+test("recoverOrphanedSessions preserves paused gated sessions", () => {
+ const db = makeTestDb();
+ const gateStatus = JSON.stringify({
+ id: "gate-1",
+ question: "Continue?",
+ options: ["Continue"],
+ created_at: "2026-06-08T08:00:00.000Z",
+ stage: "2/3 approval",
+ });
+ db.prepare(
+ "INSERT INTO sessions (id, project_id, status, pid, gate_status) VALUES ('gated', 'test', 'paused', 99999999, ?)",
+ ).run(gateStatus);
+ db.prepare(
+ "INSERT INTO sessions (id, project_id, status, pid) VALUES ('orphaned', 'test', 'running', 99999999)",
+ ).run();
+
+ recoverOrphanedSessions(db);
+
+ const gated = db
+ .prepare("SELECT status, gate_status FROM sessions WHERE id = 'gated'")
+ .get() as { status: string; gate_status: string | null };
+ const orphaned = db
+ .prepare("SELECT status FROM sessions WHERE id = 'orphaned'")
+ .get() as { status: string };
+
+ assert.equal(gated.status, "paused");
+ assert.equal(gated.gate_status, gateStatus);
+ assert.equal(orphaned.status, "failed");
+});
diff --git a/src/core/__tests__/quality-gates.test.ts b/src/core/__tests__/quality-gates.test.ts
index 5317406..1fad26e 100644
--- a/src/core/__tests__/quality-gates.test.ts
+++ b/src/core/__tests__/quality-gates.test.ts
@@ -156,3 +156,89 @@ test("sessions POST narrows arbitrary JSON before destructuring", () => {
"sessions POST should not destructure arbitrary JSON via a blind type cast",
);
});
+
+test("sessions PATCH resolves gates through a dedicated action", () => {
+ const source = readRepoFile("src/app/api/sessions/[id]/route.ts");
+
+ assert.match(
+ source,
+ /action\?:[\s\S]*"resolve_gate"/,
+ "sessions PATCH action type should include resolve_gate",
+ );
+ assert.match(
+ source,
+ /case "resolve_gate":[\s\S]*processManager\.resolveGate/,
+ "resolve_gate should call processManager.resolveGate instead of sendMessage",
+ );
+ assert.doesNotMatch(
+ source,
+ /case "resolve_gate":[\s\S]*processManager\.sendMessage[\s\S]*break;/,
+ "resolve_gate must stay isolated from normal queued send messages",
+ );
+});
+
+test("process manager resolves persisted gates without the normal send queue", () => {
+ const source = readRepoFile("src/core/process-manager.ts");
+
+ assert.match(
+ source,
+ /resolveGate[\s\S]*resolveControlPlaneGate/,
+ "resolveGate should clear persisted gate state through the control-plane helper",
+ );
+ assert.match(
+ source,
+ /resolveGate[\s\S]*ensureProcess/,
+ "resolveGate should recreate a process when memory was lost after restart",
+ );
+ assert.doesNotMatch(
+ source,
+ /resolveGate[\s\S]*messageQueues\.set/,
+ "resolveGate should not enqueue approval replies behind normal messages",
+ );
+});
+
+test("kanban task surfaces render control-plane state", () => {
+ const card = readRepoFile("src/components/kanban/task-card.tsx");
+ const dialog = readRepoFile("src/components/kanban/task-detail-dialog.tsx");
+
+ assert.match(card, /parseGateStatus/, "task cards should parse persisted gate state");
+ assert.match(card, /needs-input/, "task cards should show a needs-input badge");
+ assert.match(card, /current_stage/, "task cards should render the current stage");
+
+ assert.match(dialog, /parseGateStatus/, "task detail should parse persisted gate state");
+ assert.match(dialog, /action:\s*"resolve_gate"/, "task detail replies should call resolve_gate");
+ assert.match(dialog, /gateStatus\.options\.map/, "task detail should render gate option buttons");
+});
+
+test("session surfaces render control-plane state", () => {
+ const indicator = readRepoFile("src/components/sessions/process-indicator.tsx");
+ const card = readRepoFile("src/components/sessions/session-card.tsx");
+ const detail = readRepoFile("src/app/sessions/[id]/page.tsx");
+
+ assert.match(indicator, /currentStage/, "process indicator should accept current stage text");
+ assert.match(indicator, /needs-input/, "process indicator should render needs-input");
+ assert.match(card, /parseGateStatus/, "session cards should parse gate status");
+ assert.match(card, /current_stage/, "session cards should pass current stage");
+ assert.match(detail, /gate_status/, "session detail should pass gate state to the header indicator");
+});
+
+test("task and session lists refresh on control-plane stream events", () => {
+ const tasks = readRepoFile("src/hooks/use-tasks.ts");
+ const sessions = readRepoFile("src/hooks/use-sessions.ts");
+
+ for (const [name, source] of [
+ ["use-tasks", tasks],
+ ["use-sessions", sessions],
+ ] as const) {
+ assert.match(
+ source,
+ /new EventSource\("\/api\/devlog\/stream"\)/,
+ `${name} should subscribe to the global DevLog stream`,
+ );
+ assert.match(
+ source,
+ /control_plane_stage[\s\S]*control_plane_gate[\s\S]*control_plane_gate_resolved/,
+ `${name} should react to all control-plane event types`,
+ );
+ }
+});
diff --git a/src/core/control-plane-protocol.ts b/src/core/control-plane-protocol.ts
new file mode 100644
index 0000000..d292ef9
--- /dev/null
+++ b/src/core/control-plane-protocol.ts
@@ -0,0 +1,232 @@
+import type Database from "better-sqlite3";
+import { randomUUID } from "crypto";
+import type { GateStatus } from "./types-dashboard";
+
+export type ControlPlaneProtocolEvent =
+ | { type: "stage"; current_stage: string }
+ | { type: "gate"; question: string; options: string[]; stage?: string };
+
+export interface ParsedControlPlaneText {
+ text: string;
+ events: ControlPlaneProtocolEvent[];
+}
+
+export interface AppliedControlPlaneEvent {
+ sessionId: string;
+ taskId: string | null;
+ currentStage: string | null;
+ gateStatus: GateStatus | null;
+}
+
+export interface ResolvedControlPlaneGate {
+ sessionId: string;
+ taskId: string | null;
+ currentStage: string | null;
+ gateStatus: GateStatus;
+}
+
+const MARKER_PATTERN = /^\s*\[DEVLOG_(STAGE|GATE)\]\s+(.+?)\s*$/;
+
+export function parseControlPlaneProtocolText(
+ text: string,
+): ParsedControlPlaneText {
+ const visibleLines: string[] = [];
+ const events: ControlPlaneProtocolEvent[] = [];
+
+ for (const line of text.split(/\r?\n/)) {
+ const marker = MARKER_PATTERN.exec(line);
+ if (!marker) {
+ visibleLines.push(line);
+ continue;
+ }
+
+ const event = parseProtocolMarker(marker[1], marker[2]);
+ if (!event) {
+ visibleLines.push(line);
+ continue;
+ }
+
+ events.push(event);
+ }
+
+ return {
+ text: trimProtocolLineGaps(visibleLines.join("\n")),
+ events,
+ };
+}
+
+export function applyControlPlaneEvent(
+ db: Database.Database,
+ sessionId: string,
+ event: ControlPlaneProtocolEvent,
+ opts: {
+ now?: () => Date;
+ createId?: () => string;
+ } = {},
+): AppliedControlPlaneEvent | null {
+ const session = db
+ .prepare("SELECT task_id, current_stage FROM sessions WHERE id = ? LIMIT 1")
+ .get(sessionId) as
+ | { task_id: string | null; current_stage: string | null }
+ | undefined;
+
+ if (!session) return null;
+
+ if (event.type === "stage") {
+ db.prepare("UPDATE sessions SET current_stage = ? WHERE id = ?").run(
+ event.current_stage,
+ sessionId,
+ );
+ if (session.task_id) {
+ db.prepare(
+ "UPDATE tasks SET current_stage = ?, updated_at = datetime('now') WHERE id = ?",
+ ).run(event.current_stage, session.task_id);
+ }
+ return {
+ sessionId,
+ taskId: session.task_id,
+ currentStage: event.current_stage,
+ gateStatus: null,
+ };
+ }
+
+ const currentStage = event.stage ?? session.current_stage ?? null;
+ const gateStatus: GateStatus = {
+ id: opts.createId?.() ?? `gate_${randomUUID()}`,
+ question: event.question,
+ options: event.options,
+ created_at: (opts.now?.() ?? new Date()).toISOString(),
+ stage: currentStage,
+ };
+ const serializedGate = JSON.stringify(gateStatus);
+
+ db.prepare(
+ "UPDATE sessions SET current_stage = ?, gate_status = ? WHERE id = ?",
+ ).run(currentStage, serializedGate, sessionId);
+ if (session.task_id) {
+ db.prepare(
+ "UPDATE tasks SET current_stage = ?, gate_status = ?, updated_at = datetime('now') WHERE id = ?",
+ ).run(currentStage, serializedGate, session.task_id);
+ }
+
+ return {
+ sessionId,
+ taskId: session.task_id,
+ currentStage,
+ gateStatus,
+ };
+}
+
+export function resolveControlPlaneGate(
+ db: Database.Database,
+ sessionId: string,
+): ResolvedControlPlaneGate | null {
+ const session = db
+ .prepare("SELECT task_id, current_stage, gate_status FROM sessions WHERE id = ? LIMIT 1")
+ .get(sessionId) as
+ | { task_id: string | null; current_stage: string | null; gate_status: string | null }
+ | undefined;
+
+ if (!session?.gate_status) return null;
+ const gateStatus = parseGateStatus(session.gate_status);
+ if (!gateStatus) return null;
+
+ db.prepare("UPDATE sessions SET gate_status = NULL WHERE id = ?").run(sessionId);
+ if (session.task_id) {
+ db.prepare(
+ "UPDATE tasks SET gate_status = NULL, updated_at = datetime('now') WHERE id = ?",
+ ).run(session.task_id);
+ }
+
+ return {
+ sessionId,
+ taskId: session.task_id,
+ currentStage: session.current_stage,
+ gateStatus,
+ };
+}
+
+function parseProtocolMarker(
+ markerType: string,
+ rawJson: string,
+): ControlPlaneProtocolEvent | null {
+ let payload: unknown;
+ try {
+ payload = JSON.parse(rawJson);
+ } catch {
+ return null;
+ }
+ if (!isRecord(payload)) return null;
+
+ if (markerType === "STAGE") {
+ const stage = readNonEmptyString(payload.current_stage) ?? readNonEmptyString(payload.stage);
+ if (!stage) return null;
+ const desc = readNonEmptyString(payload.desc) ?? readNonEmptyString(payload.description);
+ return {
+ type: "stage",
+ current_stage: desc ? `${stage} · ${desc}` : stage,
+ };
+ }
+
+ if (markerType === "GATE") {
+ const question = readNonEmptyString(payload.question);
+ if (!question) return null;
+ const stage = readStageDisplay(payload);
+ return {
+ type: "gate",
+ question,
+ options: readOptions(payload.options),
+ ...(stage ? { stage } : {}),
+ };
+ }
+
+ return null;
+}
+
+function readStageDisplay(payload: Record): string | null {
+ const stage = readNonEmptyString(payload.current_stage) ?? readNonEmptyString(payload.stage);
+ if (!stage) return null;
+ const desc = readNonEmptyString(payload.desc) ?? readNonEmptyString(payload.description);
+ return desc ? `${stage} · ${desc}` : stage;
+}
+
+function readOptions(value: unknown): string[] {
+ if (!Array.isArray(value)) return [];
+ return value
+ .map((option) => (typeof option === "string" ? option.trim() : ""))
+ .filter(Boolean);
+}
+
+function readNonEmptyString(value: unknown): string | null {
+ return typeof value === "string" && value.trim() ? value.trim() : null;
+}
+
+function isRecord(value: unknown): value is Record {
+ return Boolean(value) && typeof value === "object" && !Array.isArray(value);
+}
+
+function trimProtocolLineGaps(text: string): string {
+ return text.replace(/\n{3,}/g, "\n\n").replace(/^\n+|\n+$/g, "");
+}
+
+function parseGateStatus(value: string): GateStatus | null {
+ let parsed: unknown;
+ try {
+ parsed = JSON.parse(value);
+ } catch {
+ return null;
+ }
+ if (!isRecord(parsed)) return null;
+
+ const id = readNonEmptyString(parsed.id);
+ const question = readNonEmptyString(parsed.question);
+ if (!id || !question) return null;
+
+ return {
+ id,
+ question,
+ options: readOptions(parsed.options),
+ created_at: readNonEmptyString(parsed.created_at) ?? new Date(0).toISOString(),
+ stage: readNonEmptyString(parsed.stage),
+ };
+}
diff --git a/src/core/control-plane-state.ts b/src/core/control-plane-state.ts
new file mode 100644
index 0000000..f01fa50
--- /dev/null
+++ b/src/core/control-plane-state.ts
@@ -0,0 +1,39 @@
+import type { GateStatus } from "./types-dashboard";
+
+export function parseGateStatus(
+ value: string | null | undefined,
+): GateStatus | null {
+ if (!value) return null;
+
+ let parsed: unknown;
+ try {
+ parsed = JSON.parse(value);
+ } catch {
+ return null;
+ }
+ if (!isRecord(parsed)) return null;
+
+ const id = readString(parsed.id);
+ const question = readString(parsed.question);
+ if (!id || !question) return null;
+
+ return {
+ id,
+ question,
+ options: Array.isArray(parsed.options)
+ ? parsed.options
+ .map((option) => (typeof option === "string" ? option.trim() : ""))
+ .filter(Boolean)
+ : [],
+ created_at: readString(parsed.created_at) ?? "",
+ stage: readString(parsed.stage),
+ };
+}
+
+function readString(value: unknown): string | null {
+ return typeof value === "string" && value.trim() ? value.trim() : null;
+}
+
+function isRecord(value: unknown): value is Record {
+ return Boolean(value) && typeof value === "object" && !Array.isArray(value);
+}
diff --git a/src/core/db-pool.ts b/src/core/db-pool.ts
index a47040e..81d8c88 100644
--- a/src/core/db-pool.ts
+++ b/src/core/db-pool.ts
@@ -4,7 +4,7 @@ import { dirname, join } from "node:path";
import { homedir } from "node:os";
import { REGISTRY_SCHEMA } from "./db-schema-registry";
import { SCHEMA } from "./db-schema";
-import { migrateTasksV2 } from "./db";
+import { migrateControlPlaneColumns, migrateTasksV2 } from "./db";
export interface DbPool {
getRegistry(): Database.Database;
@@ -49,6 +49,7 @@ export function createDbPool(opts: DbPoolOptions = {}): DbPool {
db.pragma("foreign_keys = ON");
db.exec(SCHEMA);
migrateTasksV2(db);
+ migrateControlPlaneColumns(db);
projectDbs.set(projectId, db);
return db;
}
diff --git a/src/core/db-schema.ts b/src/core/db-schema.ts
index a8ec057..367b6ad 100644
--- a/src/core/db-schema.ts
+++ b/src/core/db-schema.ts
@@ -31,6 +31,8 @@ CREATE TABLE IF NOT EXISTS tasks (
blocked_by TEXT,
sandbox_iterations INTEGER NOT NULL DEFAULT 0,
fail_reason TEXT,
+ current_stage TEXT,
+ gate_status TEXT,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
updated_at TEXT NOT NULL DEFAULT (datetime('now')),
completed_at TEXT
@@ -58,6 +60,8 @@ CREATE TABLE IF NOT EXISTS sessions (
agent_api_version TEXT NOT NULL DEFAULT '',
agent_base_url TEXT NOT NULL DEFAULT '${DEFAULT_API_BASE_URL}',
agent_max_tokens INTEGER NOT NULL DEFAULT ${DEFAULT_API_MAX_TOKENS},
+ current_stage TEXT,
+ gate_status TEXT,
prompt TEXT,
exit_code INTEGER,
log_path TEXT,
diff --git a/src/core/db.ts b/src/core/db.ts
index 76b54ad..8df8394 100644
--- a/src/core/db.ts
+++ b/src/core/db.ts
@@ -37,6 +37,7 @@ export function getDb(): Database.Database {
_db.pragma("foreign_keys = ON");
_db.exec(SCHEMA);
migrateTasksV2(_db);
+ migrateControlPlaneColumns(_db);
// Migrate: add claude_session_id column if missing
try {
@@ -70,6 +71,8 @@ export function getDb(): Database.Database {
agent_api_version TEXT NOT NULL DEFAULT '',
agent_base_url TEXT NOT NULL DEFAULT '${DEFAULT_API_BASE_URL}',
agent_max_tokens INTEGER NOT NULL DEFAULT ${DEFAULT_API_MAX_TOKENS},
+ current_stage TEXT,
+ gate_status TEXT,
prompt TEXT,
exit_code INTEGER, log_path TEXT,
started_at TEXT NOT NULL DEFAULT (datetime('now')),
@@ -173,6 +176,7 @@ export function getDb(): Database.Database {
} catch {
// Column already exists
}
+ migrateControlPlaneColumns(_db);
// Migrate: update tasks CHECK constraint to include 'review' and 'blocked'
try {
@@ -191,6 +195,8 @@ export function getDb(): Database.Database {
session_id TEXT,
sort_order INTEGER NOT NULL DEFAULT 0,
prompt TEXT,
+ current_stage TEXT,
+ gate_status TEXT,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
updated_at TEXT NOT NULL DEFAULT (datetime('now')),
completed_at TEXT
@@ -226,6 +232,12 @@ export function migrateTasksV2(db: Database.Database): void {
if (!has("fail_reason")) {
db.exec("ALTER TABLE tasks ADD COLUMN fail_reason TEXT");
}
+ if (!has("current_stage")) {
+ db.exec("ALTER TABLE tasks ADD COLUMN current_stage TEXT");
+ }
+ if (!has("gate_status")) {
+ db.exec("ALTER TABLE tasks ADD COLUMN gate_status TEXT");
+ }
// Status CHECK widening: SQLite cannot ALTER CHECK; recreate the table only if old CHECK is detected.
const stmt = db.prepare("SELECT sql FROM sqlite_master WHERE type='table' AND name='tasks'").get() as { sql: string } | undefined;
@@ -245,12 +257,14 @@ export function migrateTasksV2(db: Database.Database): void {
blocked_by TEXT,
sandbox_iterations INTEGER NOT NULL DEFAULT 0,
fail_reason TEXT,
+ current_stage TEXT,
+ gate_status TEXT,
created_at TEXT NOT NULL DEFAULT (datetime('now')),
updated_at TEXT NOT NULL DEFAULT (datetime('now')),
completed_at TEXT
);
- INSERT INTO tasks_new (id, project_id, title, description, status, priority, worktree_name, session_id, sort_order, prompt, blocked_by, sandbox_iterations, fail_reason, created_at, updated_at, completed_at)
- SELECT id, project_id, title, description, status, priority, worktree_name, session_id, sort_order, prompt, NULL, 0, NULL, created_at, updated_at, completed_at
+ INSERT INTO tasks_new (id, project_id, title, description, status, priority, worktree_name, session_id, sort_order, prompt, blocked_by, sandbox_iterations, fail_reason, current_stage, gate_status, created_at, updated_at, completed_at)
+ SELECT id, project_id, title, description, status, priority, worktree_name, session_id, sort_order, prompt, blocked_by, sandbox_iterations, fail_reason, current_stage, gate_status, created_at, updated_at, completed_at
FROM tasks;
DROP TABLE tasks;
ALTER TABLE tasks_new RENAME TO tasks;
@@ -261,10 +275,31 @@ export function migrateTasksV2(db: Database.Database): void {
}
}
-function recoverOrphanedSessions(db: Database.Database): void {
+export function migrateControlPlaneColumns(db: Database.Database): void {
+ for (const table of ["tasks", "sessions"]) {
+ const cols = db.prepare(`PRAGMA table_info(${table})`).all() as Array<{ name: string }>;
+ const has = (name: string) => cols.some((col) => col.name === name);
+
+ if (!has("current_stage")) {
+ db.exec(`ALTER TABLE ${table} ADD COLUMN current_stage TEXT`);
+ }
+ if (!has("gate_status")) {
+ db.exec(`ALTER TABLE ${table} ADD COLUMN gate_status TEXT`);
+ }
+ }
+}
+
+export function recoverOrphanedSessions(db: Database.Database): void {
const orphaned = db
- .prepare("SELECT id, pid FROM sessions WHERE status IN ('running', 'idle', 'paused', 'pending')")
- .all() as { id: string; pid: number | null }[];
+ .prepare(
+ "SELECT id, pid, status, gate_status FROM sessions WHERE status IN ('running', 'idle', 'paused', 'pending')",
+ )
+ .all() as {
+ id: string;
+ pid: number | null;
+ status: string;
+ gate_status: string | null;
+ }[];
for (const session of orphaned) {
let alive = false;
@@ -278,6 +313,9 @@ function recoverOrphanedSessions(db: Database.Database): void {
}
if (!alive) {
+ if (session.status === "paused" && session.gate_status) {
+ continue;
+ }
db.prepare(
"UPDATE sessions SET status = 'failed', ended_at = datetime('now') WHERE id = ?"
).run(session.id);
diff --git a/src/core/process-manager.ts b/src/core/process-manager.ts
index 69bc443..60a342e 100644
--- a/src/core/process-manager.ts
+++ b/src/core/process-manager.ts
@@ -9,6 +9,13 @@ import {
type SystemLogLevel,
type ToolCall,
} from "./stream-manager";
+import {
+ applyControlPlaneEvent,
+ parseControlPlaneProtocolText,
+ resolveControlPlaneGate,
+ type ControlPlaneProtocolEvent,
+ type ResolvedControlPlaneGate,
+} from "./control-plane-protocol";
import {
markSessionFailedAndReleaseLinkedTask,
onSessionExit,
@@ -1218,6 +1225,104 @@ class ProcessManager {
});
}
+ resolveGate(
+ sessionId: string,
+ response: string,
+ ): { ok: true } | { ok: false; error: string } {
+ const trimmed = response.trim();
+ if (!trimmed) {
+ return { ok: false, error: "response is required" };
+ }
+
+ const db = getDb();
+ const resolved = resolveControlPlaneGate(db, sessionId);
+ if (!resolved) {
+ return { ok: false, error: "no pending gate" };
+ }
+
+ const insertResult = db.prepare(
+ "INSERT INTO session_messages (session_id, role, content) VALUES (?, 'user', ?)",
+ ).run(sessionId, trimmed);
+ const messageId = Number(insertResult.lastInsertRowid);
+
+ streamManager.emit(sessionId, {
+ type: "message",
+ id: messageId,
+ role: "user",
+ content: trimmed,
+ });
+
+ let sp = this.sessions.get(sessionId);
+ if (!sp || sp.proc.killed) {
+ sp = this.ensureProcess(sessionId) ?? undefined;
+ }
+
+ if (sp && !sp.proc.killed) {
+ const requestId = sp.pendingPermission?.requestId ?? resolved.gateStatus.id;
+ sp.pendingPermission = null;
+ if (sp.paused) {
+ sp.proc.kill("SIGCONT");
+ sp.paused = false;
+ }
+ sp.isProcessing = true;
+ this.writeGateResponse(sp, trimmed);
+ streamManager.emit(sessionId, {
+ type: "permission_resolved",
+ request_id: requestId,
+ approved: true,
+ });
+ }
+
+ db.prepare(
+ "UPDATE sessions SET status = 'running' WHERE id = ? AND status = 'paused'",
+ ).run(sessionId);
+
+ const resolvedEvent = {
+ type: "control_plane_gate_resolved" as const,
+ session_id: sessionId,
+ task_id: resolved.taskId,
+ gate_id: resolved.gateStatus.id,
+ response: trimmed,
+ };
+ streamManager.emit(sessionId, resolvedEvent);
+ streamManager.emit("global", resolvedEvent);
+ streamManager.emit(sessionId, { type: "status", status: "running" });
+ streamManager.emit("global", { type: "status", status: "running" });
+
+ return { ok: true };
+ }
+
+ private writeGateResponse(sp: SessionProcess, response: string): void {
+ if (!sp.proc.stdin || sp.proc.stdin.destroyed || sp.proc.stdin.writableEnded) {
+ this.emitSystemLog(
+ "warning",
+ sp.sessionId,
+ "Gate resolved, but agent stdin is no longer writable.",
+ );
+ return;
+ }
+
+ try {
+ if (sp.inputProtocol === "claude-stream-json") {
+ const inputMsg = JSON.stringify({
+ type: "user",
+ message: { role: "user", content: response },
+ session_id: sp.claudeSessionId ?? "default",
+ parent_tool_use_id: null,
+ });
+ sp.proc.stdin.write(inputMsg + "\n");
+ return;
+ }
+ sp.proc.stdin.write(response + "\n");
+ } catch (err) {
+ this.emitSystemLog(
+ "warning",
+ sp.sessionId,
+ `Failed to write gate response: ${err instanceof Error ? err.message : String(err)}`,
+ );
+ }
+ }
+
private handleGenericOutputChunk(
sessionId: string,
sp: SessionProcess,
@@ -1533,11 +1638,105 @@ class ProcessManager {
text: string,
): void {
if (!text) return;
- sp.textBuffer += text;
+ const parsed = parseControlPlaneProtocolText(text);
+ for (const event of parsed.events) {
+ this.handleControlPlaneEvent(sessionId, event);
+ }
+
+ if (!parsed.text) return;
+ sp.textBuffer += parsed.text;
streamManager.emit(sessionId, {
type: "text_delta",
- text,
+ text: parsed.text,
+ });
+ }
+
+ private handleControlPlaneEvent(
+ sessionId: string,
+ event: ControlPlaneProtocolEvent,
+ ): void {
+ let applied: ReturnType = null;
+ try {
+ applied = applyControlPlaneEvent(getDb(), sessionId, event);
+ } catch (error) {
+ this.emitSystemLog(
+ "warning",
+ sessionId,
+ `Ignored DevLog control marker: ${error instanceof Error ? error.message : String(error)}`,
+ );
+ return;
+ }
+ if (!applied) return;
+
+ if (event.type === "stage") {
+ const streamEvent = {
+ type: "control_plane_stage" as const,
+ session_id: sessionId,
+ task_id: applied.taskId,
+ current_stage: event.current_stage,
+ };
+ streamManager.emit(sessionId, streamEvent);
+ streamManager.emit("global", streamEvent);
+ return;
+ }
+
+ if (applied.gateStatus) {
+ const streamEvent = {
+ type: "control_plane_gate" as const,
+ session_id: sessionId,
+ task_id: applied.taskId,
+ current_stage: applied.currentStage,
+ gate_status: applied.gateStatus,
+ };
+ streamManager.emit(sessionId, streamEvent);
+ streamManager.emit("global", streamEvent);
+ this.pauseForControlPlaneGate(sessionId, {
+ sessionId,
+ taskId: applied.taskId,
+ currentStage: applied.currentStage,
+ gateStatus: applied.gateStatus,
+ });
+ }
+ }
+
+ private pauseForControlPlaneGate(
+ sessionId: string,
+ gate: ResolvedControlPlaneGate,
+ ): void {
+ const toolInput = {
+ question: gate.gateStatus.question,
+ options: gate.gateStatus.options,
+ stage: gate.gateStatus.stage ?? null,
+ };
+ const sp = this.sessions.get(sessionId);
+ if (sp && !sp.proc.killed) {
+ sp.pendingPermission = {
+ requestId: gate.gateStatus.id,
+ toolName: "AskHuman",
+ toolInput,
+ };
+ sp.proc.kill("SIGSTOP");
+ sp.paused = true;
+ sp.isProcessing = false;
+ }
+
+ const db = getDb();
+ db.prepare(
+ "UPDATE sessions SET status = 'paused' WHERE id = ? AND status NOT IN ('completed', 'failed', 'killed')",
+ ).run(sessionId);
+ streamManager.emit(sessionId, {
+ type: "permission_request",
+ tool_name: "AskHuman",
+ tool_input: toolInput,
+ request_id: gate.gateStatus.id,
});
+ streamManager.emit(sessionId, { type: "status", status: "paused" });
+ streamManager.emit("global", { type: "status", status: "paused" });
+ this.emitSystemLog(
+ "info",
+ sessionId,
+ `Session ${sessionId.slice(0, 8)} paused for human gate`,
+ );
}
/**
@@ -1616,11 +1815,7 @@ class ProcessManager {
for (const block of msg.content) {
if (block.type === "text" && block.text) {
- sp.textBuffer += block.text;
- streamManager.emit(sessionId, {
- type: "text_delta",
- text: block.text,
- });
+ this.emitTextDelta(sessionId, sp, block.text);
} else if (block.type === "tool_use" && block.name) {
const toolCall: ToolCall = {
name: block.name,
@@ -1641,11 +1836,7 @@ class ProcessManager {
if (type === "content_block_delta") {
const delta = event.delta as { type?: string; text?: string } | undefined;
if (delta?.type === "text_delta" && delta.text) {
- sp.textBuffer += delta.text;
- streamManager.emit(sessionId, {
- type: "text_delta",
- text: delta.text,
- });
+ this.emitTextDelta(sessionId, sp, delta.text);
}
return;
}
diff --git a/src/core/stream-manager.ts b/src/core/stream-manager.ts
index 617254b..727b6a5 100644
--- a/src/core/stream-manager.ts
+++ b/src/core/stream-manager.ts
@@ -1,3 +1,5 @@
+import type { GateStatus } from "./types-dashboard";
+
export interface ToolCall {
name: string;
input: Record;
@@ -32,7 +34,27 @@ export type ChatStreamEvent =
| { type: "permission_request"; tool_name: string; tool_input: Record; request_id: string }
| { type: "permission_resolved"; request_id: string; approved: boolean }
| { type: "message_queued"; content: string; position: number }
- | { type: "queue_drained"; remaining: number };
+ | { type: "queue_drained"; remaining: number }
+ | {
+ type: "control_plane_stage";
+ session_id: string;
+ task_id: string | null;
+ current_stage: string;
+ }
+ | {
+ type: "control_plane_gate";
+ session_id: string;
+ task_id: string | null;
+ current_stage: string | null;
+ gate_status: GateStatus;
+ }
+ | {
+ type: "control_plane_gate_resolved";
+ session_id: string;
+ task_id: string | null;
+ gate_id: string;
+ response: string;
+ };
export function createSystemLogEvent({
level,
diff --git a/src/core/types-dashboard.ts b/src/core/types-dashboard.ts
index 3a2ec1d..4d3a4c4 100644
--- a/src/core/types-dashboard.ts
+++ b/src/core/types-dashboard.ts
@@ -25,6 +25,8 @@ export interface Task {
blocked_by?: string | null; // JSON array of task ids when status === 'in_queue'
sandbox_iterations?: number;
fail_reason?: string | null;
+ current_stage?: string | null;
+ gate_status?: string | null; // JSON-encoded GateStatus when a workflow is awaiting input
}
export type SessionStatus =
@@ -62,6 +64,8 @@ export interface Session {
agent_api_version: string;
agent_base_url: string;
agent_max_tokens: number;
+ current_stage?: string | null;
+ gate_status?: string | null;
prompt: string | null;
exit_code: number | null;
log_path: string | null;
@@ -69,6 +73,14 @@ export interface Session {
ended_at: string | null;
}
+export interface GateStatus {
+ id: string;
+ question: string;
+ options: string[];
+ created_at: string;
+ stage?: string | null;
+}
+
/** A structured tool call from Claude's response */
export interface ToolCall {
name: string;
diff --git a/src/hooks/use-sessions.ts b/src/hooks/use-sessions.ts
index dddb1f4..7cfe2a0 100644
--- a/src/hooks/use-sessions.ts
+++ b/src/hooks/use-sessions.ts
@@ -3,6 +3,15 @@
import { useState, useEffect, useCallback } from "react";
import type { Session } from "@/core/types-dashboard";
import type { SessionRuntimeAuthInput } from "@/core/session-runtime-auth";
+import type { ChatStreamEvent } from "@/core/stream-manager";
+
+function shouldRefreshSessionsForEvent(event: ChatStreamEvent): boolean {
+ return (
+ event.type === "control_plane_stage" ||
+ event.type === "control_plane_gate" ||
+ event.type === "control_plane_gate_resolved"
+ );
+}
export function useSessions() {
const [sessions, setSessions] = useState([]);
@@ -25,6 +34,25 @@ export function useSessions() {
return () => clearInterval(interval);
}, [fetchSessions]);
+ useEffect(() => {
+ const source = new EventSource("/api/devlog/stream");
+ source.onmessage = (message) => {
+ let event: ChatStreamEvent;
+ try {
+ event = JSON.parse(message.data) as ChatStreamEvent;
+ } catch {
+ return;
+ }
+ if (shouldRefreshSessionsForEvent(event)) {
+ fetchSessions();
+ }
+ };
+
+ return () => {
+ source.close();
+ };
+ }, [fetchSessions]);
+
const launchSession = async (data: {
task_id?: string;
worktree_name?: string;
diff --git a/src/hooks/use-tasks.ts b/src/hooks/use-tasks.ts
index b367cf8..d775716 100644
--- a/src/hooks/use-tasks.ts
+++ b/src/hooks/use-tasks.ts
@@ -3,6 +3,15 @@
import { useState, useEffect, useCallback } from "react";
import type { Task, TaskStatus, TaskPriority, Session } from "@/core/types-dashboard";
import type { SessionRuntimeAuthInput } from "@/core/session-runtime-auth";
+import type { ChatStreamEvent } from "@/core/stream-manager";
+
+function shouldRefreshTasksForEvent(event: ChatStreamEvent): boolean {
+ return (
+ event.type === "control_plane_stage" ||
+ event.type === "control_plane_gate" ||
+ event.type === "control_plane_gate_resolved"
+ );
+}
export function useTasks() {
const [tasks, setTasks] = useState([]);
@@ -25,6 +34,25 @@ export function useTasks() {
return () => clearInterval(interval);
}, [fetchTasks]);
+ useEffect(() => {
+ const source = new EventSource("/api/devlog/stream");
+ source.onmessage = (message) => {
+ let event: ChatStreamEvent;
+ try {
+ event = JSON.parse(message.data) as ChatStreamEvent;
+ } catch {
+ return;
+ }
+ if (shouldRefreshTasksForEvent(event)) {
+ fetchTasks();
+ }
+ };
+
+ return () => {
+ source.close();
+ };
+ }, [fetchTasks]);
+
const createTask = async (data: {
title: string;
description?: string;