Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 48 additions & 0 deletions apps/code/src/main/services/agent/interrupt-reason.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
import { describe, expect, it } from "vitest";
import { applyInterruptReasonToCancelledResponse } from "./interrupt-reason";

describe("applyInterruptReasonToCancelledResponse", () => {
it("injects the reason into a cancelled response that has none", () => {
const message = {
jsonrpc: "2.0",
id: 7,
result: { stopReason: "cancelled" },
};

applyInterruptReasonToCancelledResponse(message, "connection_lost");

expect(
(message.result as { _meta?: { interruptReason?: string } })._meta
?.interruptReason,
).toBe("connection_lost");
});

it("does not overwrite a reason the agent already provided", () => {
const message = {
result: {
stopReason: "cancelled",
_meta: { interruptReason: "moving_to_worktree" },
},
};

applyInterruptReasonToCancelledResponse(message, "connection_lost");

expect(message.result._meta.interruptReason).toBe("moving_to_worktree");
});

it("ignores non-cancelled responses", () => {
const message = { result: { stopReason: "end_turn" } };

applyInterruptReasonToCancelledResponse(message, "connection_lost");

expect((message.result as { _meta?: unknown })._meta).toBeUndefined();
});

it("no-ops when there is no reason to apply", () => {
const message = { result: { stopReason: "cancelled" } };

applyInterruptReasonToCancelledResponse(message, undefined);

expect((message.result as { _meta?: unknown })._meta).toBeUndefined();
});
});
26 changes: 26 additions & 0 deletions apps/code/src/main/services/agent/interrupt-reason.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
/**
* Ensure a cancelled prompt response carries the interrupt reason we passed to
* `session/cancel`. The agent can finalize a cancelled turn — especially when
* its upstream request was aborted by a network failure — without echoing that
* reason back, which would make the UI mislabel the turn as "interrupted by
* user" instead of, say, "connection lost". Mutates the message in place.
*/
export function applyInterruptReasonToCancelledResponse(
message: unknown,
interruptReason: string | undefined,
): void {
if (!interruptReason) return;
if (typeof message !== "object" || message === null) return;

const result = (message as { result?: unknown }).result;
if (typeof result !== "object" || result === null) return;

const typedResult = result as {
stopReason?: string;
_meta?: { interruptReason?: string };
};
if (typedResult.stopReason !== "cancelled") return;
if (typedResult._meta?.interruptReason) return;

typedResult._meta = { ...typedResult._meta, interruptReason };
}
1 change: 1 addition & 0 deletions apps/code/src/main/services/agent/schemas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ export const cancelSessionInput = z.object({
export const interruptReasonSchema = z.enum([
"user_request",
"moving_to_worktree",
"connection_lost",
]);
export type InterruptReason = z.infer<typeof interruptReasonSchema>;

Expand Down
103 changes: 103 additions & 0 deletions apps/code/src/main/services/agent/service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,109 @@ describe("AgentService", () => {
vi.restoreAllMocks();
});

describe("prompt watchdog", () => {
beforeEach(() => {
vi.useFakeTimers();
});

afterEach(() => {
vi.useRealTimers();
});

function injectPromptSession(
svc: AgentService,
taskRunId: string,
clientSideConnection: Record<string, unknown>,
) {
const sessions = (svc as unknown as { sessions: Map<string, unknown> })
.sessions;
sessions.set(taskRunId, {
taskRunId,
taskId: `task-${taskRunId}`,
repoPath: "/mock/repo",
agent: { cleanup: vi.fn().mockResolvedValue(undefined) },
clientSideConnection,
channel: `ch-${taskRunId}`,
createdAt: Date.now(),
lastActivityAt: Date.now(),
lastStreamActivityAt: Date.now(),
config: { sessionId: "acp-1", adapter: "claude" },
promptPending: false,
inFlightMcpToolCalls: new Map(),
mcpToolApprovals: {},
toolInstallations: {},
});
}

it("rejects and cancels a prompt whose stream stalls past the watchdog window", async () => {
const cancel = vi.fn().mockResolvedValue(undefined);
injectPromptSession(service, "run-1", {
// Never resolves — simulates a stream that hangs mid-turn.
prompt: vi.fn(() => new Promise(() => {})),
cancel,
});

const promptPromise = service.prompt("run-1", [
{ type: "text", text: "hello" },
]);
const assertion = expect(promptPromise).rejects.toThrow(
/stall|activity|stuck|timed out|unresponsive/i,
);

// No ACP traffic arrives; advance past the watchdog deadline.
await vi.advanceTimersByTimeAsync(16 * 60 * 1000);
await assertion;

// The hung turn is torn down rather than left dangling, and the session
// is no longer wedged in a pending state.
expect(cancel).toHaveBeenCalled();
expect(service.getSession("run-1")?.promptPending).toBe(false);
});

it("does not trip the watchdog when the prompt completes promptly", async () => {
injectPromptSession(service, "run-1", {
prompt: vi.fn().mockResolvedValue({ stopReason: "end_turn" }),
cancel: vi.fn(),
});

const result = await service.prompt("run-1", [
{ type: "text", text: "hello" },
]);

expect(result.stopReason).toBe("end_turn");
});

it("resets the watchdog when stream activity arrives", async () => {
let resolvePrompt: (value: { stopReason: string }) => void = () => {};
injectPromptSession(service, "run-1", {
prompt: vi.fn(
() =>
new Promise<{ stopReason: string }>((res) => {
resolvePrompt = res;
}),
),
cancel: vi.fn(),
});

const promptPromise = service.prompt("run-1", [
{ type: "text", text: "hi" },
]);

// Activity arrives just before the deadline, resetting the watchdog.
await vi.advanceTimersByTimeAsync(14 * 60 * 1000);
const session = service.getSession("run-1");
if (session) session.lastStreamActivityAt = Date.now();

// Another long-but-active stretch must not trip thanks to the reset.
await vi.advanceTimersByTimeAsync(14 * 60 * 1000);
resolvePrompt({ stopReason: "end_turn" });

await expect(promptPromise).resolves.toEqual(
expect.objectContaining({ stopReason: "end_turn" }),
);
});
});

describe("MCP servers", () => {
it("marks desktop sessions as local even though they have a taskRunId", async () => {
await service.startSession({
Expand Down
71 changes: 66 additions & 5 deletions apps/code/src/main/services/agent/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ import { loadSessionEnvOverrides } from "../session-env/loader";
import type { SleepService } from "../sleep/service";
import type { AgentAuthAdapter, McpToolInstallations } from "./auth-adapter";
import { discoverExternalPlugins } from "./discover-plugins";
import { applyInterruptReasonToCancelledResponse } from "./interrupt-reason";
import {
AgentServiceEvent,
type AgentServiceEvents,
Expand Down Expand Up @@ -238,6 +239,8 @@ interface ManagedSession {
channel: string;
createdAt: number;
lastActivityAt: number;
/** Last time any ACP stream traffic was seen; drives the prompt watchdog. */
lastStreamActivityAt: number;
config: SessionConfig;
interruptReason?: InterruptReason;
promptPending: boolean;
Expand Down Expand Up @@ -283,6 +286,13 @@ interface PendingPermission {
@injectable()
export class AgentService extends TypedEventEmitter<AgentServiceEvents> {
private static readonly IDLE_TIMEOUT_MS = 15 * 60 * 1000;
// Backstop for a prompt whose stream stalls (e.g. a flaky network leaves the
// turn hanging mid-flight). If no ACP traffic is seen for this long the
// prompt is rejected so the renderer surfaces an error and can recover,
// instead of `promptPending` wedging the session forever. Generous so it
// never trips on a legitimately long, quiet tool run.
private static readonly PROMPT_STALL_TIMEOUT_MS = 15 * 60 * 1000;
private static readonly PROMPT_STALL_CHECK_MS = 30 * 1000;

private sessions = new Map<string, ManagedSession>();
private pendingPermissions = new Map<string, PendingPermission>();
Expand Down Expand Up @@ -844,6 +854,7 @@ When creating pull requests, add the following footer at the end of the PR descr
channel,
createdAt: Date.now(),
lastActivityAt: Date.now(),
lastStreamActivityAt: Date.now(),
config,
promptPending: false,
configOptions,
Expand Down Expand Up @@ -919,20 +930,54 @@ When creating pull requests, add the following footer at the end of the PR descr
}

session.lastActivityAt = Date.now();
session.lastStreamActivityAt = Date.now();
session.promptPending = true;
// A fresh turn starts clean — don't carry a prior cancel's reason forward
// (it would mislabel a later cancellation of this new turn).
session.interruptReason = undefined;
this.recordActivity(sessionId);
this.sleepService.acquire(sessionId);

let watchdog: ReturnType<typeof setInterval> | null = null;
try {
const result = await session.clientSideConnection.prompt({
const promptPromise = session.clientSideConnection.prompt({
sessionId: getAgentSessionId(session),
prompt: finalPrompt,
});

// Race the turn against a stream-stall watchdog. The watchdog resets
// whenever ACP traffic updates `lastStreamActivityAt` (see onAcpMessage).
const stallPromise = new Promise<never>((_, reject) => {
watchdog = setInterval(() => {
const idleMs = Date.now() - session.lastStreamActivityAt;
if (idleMs < AgentService.PROMPT_STALL_TIMEOUT_MS) return;

if (watchdog) {
clearInterval(watchdog);
watchdog = null;
}
log.warn("Prompt watchdog tripped — stream stalled, cancelling", {
sessionId,
idleMs,
});
// Tear down the hung turn so it isn't left dangling.
this.cancelPrompt(sessionId).catch(() => {});
reject(
new Error(
"The agent stopped responding (no activity for too long). " +
"Check your connection and try again.",
),
);
}, AgentService.PROMPT_STALL_CHECK_MS);
});

const result = await Promise.race([promptPromise, stallPromise]);
return {
stopReason: result.stopReason,
_meta: result._meta as PromptOutput["_meta"],
};
} finally {
if (watchdog) clearInterval(watchdog);
session.promptPending = false;
session.lastActivityAt = Date.now();
this.recordActivity(sessionId);
Expand Down Expand Up @@ -973,14 +1018,17 @@ When creating pull requests, add the following footer at the end of the PR descr

try {
this.cancelInFlightMcpToolCalls(session);
await session.clientSideConnection.cancel({
sessionId: getAgentSessionId(session),
_meta: reason ? { interruptReason: reason } : undefined,
});
// Record the reason before awaiting the cancel so the stream tap can
// stamp it onto the agent's cancelled response, even if that response
// races ahead of this call returning.
if (reason) {
session.interruptReason = reason;
log.info("Session interrupted", { sessionId, reason });
}
await session.clientSideConnection.cancel({
sessionId: getAgentSessionId(session),
_meta: reason ? { interruptReason: reason } : undefined,
});
return true;
} catch (err) {
log.error("Failed to cancel prompt", { sessionId, err });
Expand Down Expand Up @@ -1263,6 +1311,19 @@ For git operations while detached:
};

const onAcpMessage = (message: unknown) => {
// Every ACP message (either direction) is a heartbeat for the prompt
// watchdog — seeing traffic means the stream is still progressing.
const session = this.sessions.get(taskRunId);
if (session) session.lastStreamActivityAt = Date.now();

// If we cancelled this turn with a specific reason but the agent's
// cancelled response didn't echo it (common when the cancel races a
// network abort), inject it so the UI labels the turn correctly.
applyInterruptReasonToCancelledResponse(
message,
session?.interruptReason,
);

const acpMessage: AcpMessage = {
type: "acp_message",
ts: Date.now(),
Expand Down
Loading