From eb3a55b4db0b2f7312fd7446cb05d263a4b25e7d Mon Sep 17 00:00:00 2001 From: Subham Kumar Das <35267544+lost-particles@users.noreply.github.com> Date: Sun, 31 May 2026 23:51:19 -0400 Subject: [PATCH] fix: handle poor and intermittent connectivity gracefully (#2163) Make the app resilient to offline/flaky networks instead of silently dropping messages or hanging: - Preserve typed input when a send fails; restore it to the composer - Re-enqueue local queued messages when a drain dispatch fails - Auto-recover cloud sessions on reconnect, not just on window focus - Bounded connectivity grace-wait before failing a send - Prompt watchdog so a stalled stream fails loudly instead of hanging - Auth-proxy aborts the connection on a mid-stream upstream failure so a truncated reply isn't treated as a clean end-of-stream - Don't present a network-interrupted turn as a clean finish: show an inline "Response may be incomplete. Failed due to network issue" footer with a Retry button (instead of "Interrupted by user") - Offline send/retry shows a "waiting to reconnect" indicator with timer, then gives up after 40s --- .../services/agent/interrupt-reason.test.ts | 48 +++ .../main/services/agent/interrupt-reason.ts | 26 ++ apps/code/src/main/services/agent/schemas.ts | 1 + .../src/main/services/agent/service.test.ts | 103 +++++ apps/code/src/main/services/agent/service.ts | 71 +++- .../main/services/auth-proxy/service.test.ts | 84 ++++ .../src/main/services/auth-proxy/service.ts | 14 +- apps/code/src/renderer/App.tsx | 3 + .../connectivity/connectivityRecovery.test.ts | 94 +++++ .../connectivity/connectivityRecovery.ts | 54 +++ .../message-editor/components/PromptInput.tsx | 4 +- .../tiptap/useDraftSync.test.tsx | 26 ++ .../message-editor/tiptap/useDraftSync.ts | 12 + .../message-editor/tiptap/useTiptapEditor.ts | 27 +- .../sessions/components/ConversationView.tsx | 55 ++- .../components/GeneratingIndicator.test.tsx | 82 ++++ .../components/GeneratingIndicator.tsx | 77 +++- .../sessions/components/SessionFooter.tsx | 7 + .../sessions/components/SessionView.tsx | 12 +- .../components/buildConversationItems.test.ts | 67 ++++ .../components/buildConversationItems.ts | 30 +- .../sessions/hooks/useSessionCallbacks.ts | 19 +- .../features/sessions/service/service.test.ts | 358 +++++++++++++++++- .../features/sessions/service/service.ts | 244 +++++++++++- .../features/sessions/stores/sessionStore.ts | 9 + apps/code/src/shared/errors.ts | 23 ++ 26 files changed, 1489 insertions(+), 61 deletions(-) create mode 100644 apps/code/src/main/services/agent/interrupt-reason.test.ts create mode 100644 apps/code/src/main/services/agent/interrupt-reason.ts create mode 100644 apps/code/src/main/services/auth-proxy/service.test.ts create mode 100644 apps/code/src/renderer/features/connectivity/connectivityRecovery.test.ts create mode 100644 apps/code/src/renderer/features/connectivity/connectivityRecovery.ts create mode 100644 apps/code/src/renderer/features/sessions/components/GeneratingIndicator.test.tsx diff --git a/apps/code/src/main/services/agent/interrupt-reason.test.ts b/apps/code/src/main/services/agent/interrupt-reason.test.ts new file mode 100644 index 0000000000..8e47ad7bf5 --- /dev/null +++ b/apps/code/src/main/services/agent/interrupt-reason.test.ts @@ -0,0 +1,48 @@ +import { describe, expect, it } from "vitest"; +import { applyInterruptReasonToCancelledResponse } from "./interrupt-reason"; + +describe("applyInterruptReasonToCancelledResponse", () => { + it("injects the reason into a cancelled response that has none", () => { + const message = { + jsonrpc: "2.0", + id: 7, + result: { stopReason: "cancelled" }, + }; + + applyInterruptReasonToCancelledResponse(message, "connection_lost"); + + expect( + (message.result as { _meta?: { interruptReason?: string } })._meta + ?.interruptReason, + ).toBe("connection_lost"); + }); + + it("does not overwrite a reason the agent already provided", () => { + const message = { + result: { + stopReason: "cancelled", + _meta: { interruptReason: "moving_to_worktree" }, + }, + }; + + applyInterruptReasonToCancelledResponse(message, "connection_lost"); + + expect(message.result._meta.interruptReason).toBe("moving_to_worktree"); + }); + + it("ignores non-cancelled responses", () => { + const message = { result: { stopReason: "end_turn" } }; + + applyInterruptReasonToCancelledResponse(message, "connection_lost"); + + expect((message.result as { _meta?: unknown })._meta).toBeUndefined(); + }); + + it("no-ops when there is no reason to apply", () => { + const message = { result: { stopReason: "cancelled" } }; + + applyInterruptReasonToCancelledResponse(message, undefined); + + expect((message.result as { _meta?: unknown })._meta).toBeUndefined(); + }); +}); diff --git a/apps/code/src/main/services/agent/interrupt-reason.ts b/apps/code/src/main/services/agent/interrupt-reason.ts new file mode 100644 index 0000000000..1d887acdb2 --- /dev/null +++ b/apps/code/src/main/services/agent/interrupt-reason.ts @@ -0,0 +1,26 @@ +/** + * Ensure a cancelled prompt response carries the interrupt reason we passed to + * `session/cancel`. The agent can finalize a cancelled turn — especially when + * its upstream request was aborted by a network failure — without echoing that + * reason back, which would make the UI mislabel the turn as "interrupted by + * user" instead of, say, "connection lost". Mutates the message in place. + */ +export function applyInterruptReasonToCancelledResponse( + message: unknown, + interruptReason: string | undefined, +): void { + if (!interruptReason) return; + if (typeof message !== "object" || message === null) return; + + const result = (message as { result?: unknown }).result; + if (typeof result !== "object" || result === null) return; + + const typedResult = result as { + stopReason?: string; + _meta?: { interruptReason?: string }; + }; + if (typedResult.stopReason !== "cancelled") return; + if (typedResult._meta?.interruptReason) return; + + typedResult._meta = { ...typedResult._meta, interruptReason }; +} diff --git a/apps/code/src/main/services/agent/schemas.ts b/apps/code/src/main/services/agent/schemas.ts index 410d77ea59..86097a5c97 100644 --- a/apps/code/src/main/services/agent/schemas.ts +++ b/apps/code/src/main/services/agent/schemas.ts @@ -150,6 +150,7 @@ export const cancelSessionInput = z.object({ export const interruptReasonSchema = z.enum([ "user_request", "moving_to_worktree", + "connection_lost", ]); export type InterruptReason = z.infer; diff --git a/apps/code/src/main/services/agent/service.test.ts b/apps/code/src/main/services/agent/service.test.ts index 5e277e6ad7..6a4241eebb 100644 --- a/apps/code/src/main/services/agent/service.test.ts +++ b/apps/code/src/main/services/agent/service.test.ts @@ -240,6 +240,109 @@ describe("AgentService", () => { vi.restoreAllMocks(); }); + describe("prompt watchdog", () => { + beforeEach(() => { + vi.useFakeTimers(); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + function injectPromptSession( + svc: AgentService, + taskRunId: string, + clientSideConnection: Record, + ) { + const sessions = (svc as unknown as { sessions: Map }) + .sessions; + sessions.set(taskRunId, { + taskRunId, + taskId: `task-${taskRunId}`, + repoPath: "/mock/repo", + agent: { cleanup: vi.fn().mockResolvedValue(undefined) }, + clientSideConnection, + channel: `ch-${taskRunId}`, + createdAt: Date.now(), + lastActivityAt: Date.now(), + lastStreamActivityAt: Date.now(), + config: { sessionId: "acp-1", adapter: "claude" }, + promptPending: false, + inFlightMcpToolCalls: new Map(), + mcpToolApprovals: {}, + toolInstallations: {}, + }); + } + + it("rejects and cancels a prompt whose stream stalls past the watchdog window", async () => { + const cancel = vi.fn().mockResolvedValue(undefined); + injectPromptSession(service, "run-1", { + // Never resolves — simulates a stream that hangs mid-turn. + prompt: vi.fn(() => new Promise(() => {})), + cancel, + }); + + const promptPromise = service.prompt("run-1", [ + { type: "text", text: "hello" }, + ]); + const assertion = expect(promptPromise).rejects.toThrow( + /stall|activity|stuck|timed out|unresponsive/i, + ); + + // No ACP traffic arrives; advance past the watchdog deadline. + await vi.advanceTimersByTimeAsync(16 * 60 * 1000); + await assertion; + + // The hung turn is torn down rather than left dangling, and the session + // is no longer wedged in a pending state. + expect(cancel).toHaveBeenCalled(); + expect(service.getSession("run-1")?.promptPending).toBe(false); + }); + + it("does not trip the watchdog when the prompt completes promptly", async () => { + injectPromptSession(service, "run-1", { + prompt: vi.fn().mockResolvedValue({ stopReason: "end_turn" }), + cancel: vi.fn(), + }); + + const result = await service.prompt("run-1", [ + { type: "text", text: "hello" }, + ]); + + expect(result.stopReason).toBe("end_turn"); + }); + + it("resets the watchdog when stream activity arrives", async () => { + let resolvePrompt: (value: { stopReason: string }) => void = () => {}; + injectPromptSession(service, "run-1", { + prompt: vi.fn( + () => + new Promise<{ stopReason: string }>((res) => { + resolvePrompt = res; + }), + ), + cancel: vi.fn(), + }); + + const promptPromise = service.prompt("run-1", [ + { type: "text", text: "hi" }, + ]); + + // Activity arrives just before the deadline, resetting the watchdog. + await vi.advanceTimersByTimeAsync(14 * 60 * 1000); + const session = service.getSession("run-1"); + if (session) session.lastStreamActivityAt = Date.now(); + + // Another long-but-active stretch must not trip thanks to the reset. + await vi.advanceTimersByTimeAsync(14 * 60 * 1000); + resolvePrompt({ stopReason: "end_turn" }); + + await expect(promptPromise).resolves.toEqual( + expect.objectContaining({ stopReason: "end_turn" }), + ); + }); + }); + describe("MCP servers", () => { it("marks desktop sessions as local even though they have a taskRunId", async () => { await service.startSession({ diff --git a/apps/code/src/main/services/agent/service.ts b/apps/code/src/main/services/agent/service.ts index d60b09b6cc..3fe5d5dfd2 100644 --- a/apps/code/src/main/services/agent/service.ts +++ b/apps/code/src/main/services/agent/service.ts @@ -59,6 +59,7 @@ import { loadSessionEnvOverrides } from "../session-env/loader"; import type { SleepService } from "../sleep/service"; import type { AgentAuthAdapter, McpToolInstallations } from "./auth-adapter"; import { discoverExternalPlugins } from "./discover-plugins"; +import { applyInterruptReasonToCancelledResponse } from "./interrupt-reason"; import { AgentServiceEvent, type AgentServiceEvents, @@ -238,6 +239,8 @@ interface ManagedSession { channel: string; createdAt: number; lastActivityAt: number; + /** Last time any ACP stream traffic was seen; drives the prompt watchdog. */ + lastStreamActivityAt: number; config: SessionConfig; interruptReason?: InterruptReason; promptPending: boolean; @@ -283,6 +286,13 @@ interface PendingPermission { @injectable() export class AgentService extends TypedEventEmitter { private static readonly IDLE_TIMEOUT_MS = 15 * 60 * 1000; + // Backstop for a prompt whose stream stalls (e.g. a flaky network leaves the + // turn hanging mid-flight). If no ACP traffic is seen for this long the + // prompt is rejected so the renderer surfaces an error and can recover, + // instead of `promptPending` wedging the session forever. Generous so it + // never trips on a legitimately long, quiet tool run. + private static readonly PROMPT_STALL_TIMEOUT_MS = 15 * 60 * 1000; + private static readonly PROMPT_STALL_CHECK_MS = 30 * 1000; private sessions = new Map(); private pendingPermissions = new Map(); @@ -844,6 +854,7 @@ When creating pull requests, add the following footer at the end of the PR descr channel, createdAt: Date.now(), lastActivityAt: Date.now(), + lastStreamActivityAt: Date.now(), config, promptPending: false, configOptions, @@ -919,20 +930,54 @@ When creating pull requests, add the following footer at the end of the PR descr } session.lastActivityAt = Date.now(); + session.lastStreamActivityAt = Date.now(); session.promptPending = true; + // A fresh turn starts clean — don't carry a prior cancel's reason forward + // (it would mislabel a later cancellation of this new turn). + session.interruptReason = undefined; this.recordActivity(sessionId); this.sleepService.acquire(sessionId); + let watchdog: ReturnType | null = null; try { - const result = await session.clientSideConnection.prompt({ + const promptPromise = session.clientSideConnection.prompt({ sessionId: getAgentSessionId(session), prompt: finalPrompt, }); + + // Race the turn against a stream-stall watchdog. The watchdog resets + // whenever ACP traffic updates `lastStreamActivityAt` (see onAcpMessage). + const stallPromise = new Promise((_, reject) => { + watchdog = setInterval(() => { + const idleMs = Date.now() - session.lastStreamActivityAt; + if (idleMs < AgentService.PROMPT_STALL_TIMEOUT_MS) return; + + if (watchdog) { + clearInterval(watchdog); + watchdog = null; + } + log.warn("Prompt watchdog tripped — stream stalled, cancelling", { + sessionId, + idleMs, + }); + // Tear down the hung turn so it isn't left dangling. + this.cancelPrompt(sessionId).catch(() => {}); + reject( + new Error( + "The agent stopped responding (no activity for too long). " + + "Check your connection and try again.", + ), + ); + }, AgentService.PROMPT_STALL_CHECK_MS); + }); + + const result = await Promise.race([promptPromise, stallPromise]); return { stopReason: result.stopReason, _meta: result._meta as PromptOutput["_meta"], }; } finally { + if (watchdog) clearInterval(watchdog); session.promptPending = false; session.lastActivityAt = Date.now(); this.recordActivity(sessionId); @@ -973,14 +1018,17 @@ When creating pull requests, add the following footer at the end of the PR descr try { this.cancelInFlightMcpToolCalls(session); - await session.clientSideConnection.cancel({ - sessionId: getAgentSessionId(session), - _meta: reason ? { interruptReason: reason } : undefined, - }); + // Record the reason before awaiting the cancel so the stream tap can + // stamp it onto the agent's cancelled response, even if that response + // races ahead of this call returning. if (reason) { session.interruptReason = reason; log.info("Session interrupted", { sessionId, reason }); } + await session.clientSideConnection.cancel({ + sessionId: getAgentSessionId(session), + _meta: reason ? { interruptReason: reason } : undefined, + }); return true; } catch (err) { log.error("Failed to cancel prompt", { sessionId, err }); @@ -1263,6 +1311,19 @@ For git operations while detached: }; const onAcpMessage = (message: unknown) => { + // Every ACP message (either direction) is a heartbeat for the prompt + // watchdog — seeing traffic means the stream is still progressing. + const session = this.sessions.get(taskRunId); + if (session) session.lastStreamActivityAt = Date.now(); + + // If we cancelled this turn with a specific reason but the agent's + // cancelled response didn't echo it (common when the cancel races a + // network abort), inject it so the UI labels the turn correctly. + applyInterruptReasonToCancelledResponse( + message, + session?.interruptReason, + ); + const acpMessage: AcpMessage = { type: "acp_message", ts: Date.now(), diff --git a/apps/code/src/main/services/auth-proxy/service.test.ts b/apps/code/src/main/services/auth-proxy/service.test.ts new file mode 100644 index 0000000000..2109b2cc48 --- /dev/null +++ b/apps/code/src/main/services/auth-proxy/service.test.ts @@ -0,0 +1,84 @@ +import type http from "node:http"; +import { describe, expect, it, vi } from "vitest"; +import type { AuthService } from "../auth/service"; +import { AuthProxyService } from "./service"; + +function createService(authenticatedFetch: ReturnType) { + const authService = { authenticatedFetch } as unknown as AuthService; + return new AuthProxyService(authService); +} + +function createMockResponse() { + const res = { + _sent: false, + writeHead: vi.fn(function writeHead(this: { _sent: boolean }) { + res._sent = true; + }), + write: vi.fn(() => true), + end: vi.fn(), + destroy: vi.fn(), + once: vi.fn(), + get headersSent() { + return res._sent; + }, + }; + return res as unknown as http.ServerResponse & typeof res; +} + +describe("AuthProxyService.forwardRequest", () => { + it("destroys the connection when the upstream stream fails mid-response", async () => { + let reads = 0; + const reader = { + read: vi.fn(async () => { + reads += 1; + if (reads === 1) + return { done: false, value: new Uint8Array([1, 2, 3]) }; + throw new Error("terminated"); + }), + }; + const authenticatedFetch = vi.fn().mockResolvedValue({ + status: 200, + headers: { forEach: () => {} }, + body: { getReader: () => reader }, + }); + const service = createService(authenticatedFetch); + const res = createMockResponse(); + + await ( + service as unknown as { + forwardRequest: ( + url: string, + options: RequestInit, + res: http.ServerResponse, + ) => Promise; + } + ).forwardRequest("https://gateway.example/v1/messages", {}, res); + + // Mid-stream failure (headers already sent) must abort the connection so + // the client sees a truncated/errored response, not a clean end-of-stream. + expect(res.destroy).toHaveBeenCalled(); + expect(res.end).not.toHaveBeenCalled(); + }); + + it("returns a 502 when the upstream fails before any response is sent", async () => { + const authenticatedFetch = vi + .fn() + .mockRejectedValue(new Error("fetch failed")); + const service = createService(authenticatedFetch); + const res = createMockResponse(); + + await ( + service as unknown as { + forwardRequest: ( + url: string, + options: RequestInit, + res: http.ServerResponse, + ) => Promise; + } + ).forwardRequest("https://gateway.example/v1/messages", {}, res); + + expect(res.writeHead).toHaveBeenCalledWith(502); + expect(res.end).toHaveBeenCalled(); + expect(res.destroy).not.toHaveBeenCalled(); + }); +}); diff --git a/apps/code/src/main/services/auth-proxy/service.ts b/apps/code/src/main/services/auth-proxy/service.ts index 3896996cb2..3415648705 100644 --- a/apps/code/src/main/services/auth-proxy/service.ts +++ b/apps/code/src/main/services/auth-proxy/service.ts @@ -201,9 +201,19 @@ export class AuthProxyService { await pump(); } catch (err) { log.error("Proxy forward error", { url, err }); - if (!res.headersSent) { - res.writeHead(502); + if (res.headersSent) { + // The response already started streaming, so we can't change the + // status. Abort the socket instead of ending it cleanly — a clean + // `res.end()` looks like a normal end-of-stream to the agent's SDK, + // which can then finalize a truncated reply as a successful turn. + // Destroying signals an incomplete/errored response so it retries or + // fails loudly instead. + res.destroy( + err instanceof Error ? err : new Error("Proxy stream error"), + ); + return; } + res.writeHead(502); res.end("Proxy error"); } } diff --git a/apps/code/src/renderer/App.tsx b/apps/code/src/renderer/App.tsx index a5748db25b..12c31bf71a 100644 --- a/apps/code/src/renderer/App.tsx +++ b/apps/code/src/renderer/App.tsx @@ -17,6 +17,7 @@ import { AddDirectoryDialog } from "@features/folder-picker/components/AddDirect import { OnboardingFlow } from "@features/onboarding/components/OnboardingFlow"; import { useOnboardingStore } from "@features/onboarding/stores/onboardingStore"; import { Flex, Spinner, Text } from "@radix-ui/themes"; +import { initializeConnectivityRecovery } from "@renderer/features/connectivity/connectivityRecovery"; import { initializeConnectivityToast } from "@renderer/features/connectivity/connectivityToast"; import { initializeConnectivityStore } from "@renderer/stores/connectivityStore"; import { useFocusStore } from "@renderer/stores/focusStore"; @@ -64,7 +65,9 @@ function App() { useEffect(() => { const disposeStore = initializeConnectivityStore(); const disposeToast = initializeConnectivityToast(); + const disposeRecovery = initializeConnectivityRecovery(); return () => { + disposeRecovery(); disposeToast(); disposeStore(); }; diff --git a/apps/code/src/renderer/features/connectivity/connectivityRecovery.test.ts b/apps/code/src/renderer/features/connectivity/connectivityRecovery.test.ts new file mode 100644 index 0000000000..ebee59da67 --- /dev/null +++ b/apps/code/src/renderer/features/connectivity/connectivityRecovery.test.ts @@ -0,0 +1,94 @@ +import { getSessionService } from "@features/sessions/service/service"; +import { useConnectivityStore } from "@stores/connectivityStore"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { initializeConnectivityRecovery } from "./connectivityRecovery"; + +vi.mock("@features/sessions/service/service", () => ({ + getSessionService: vi.fn(), +})); + +// The connectivity store constructs the tRPC client at import; stub it since +// these tests drive the store directly and never hit the network. +vi.mock("@renderer/trpc/client", () => ({ + trpcClient: {}, +})); + +describe("initializeConnectivityRecovery", () => { + let retry: ReturnType; + let markDropped: ReturnType; + let clearGiveup: ReturnType; + + beforeEach(() => { + vi.clearAllMocks(); + vi.useFakeTimers(); + useConnectivityStore.setState({ isOnline: true }); + retry = vi.fn(); + markDropped = vi.fn(); + clearGiveup = vi.fn(); + vi.mocked(getSessionService).mockReturnValue({ + retryUnhealthyCloudSessions: retry, + markInflightTurnsNetworkDropped: markDropped, + clearOfflineTurnGiveupTimers: clearGiveup, + } as unknown as ReturnType); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + it("retries unhealthy cloud sessions after the connection is restored", () => { + const dispose = initializeConnectivityRecovery(); + + useConnectivityStore.getState().setOnline(false); + useConnectivityStore.getState().setOnline(true); + vi.advanceTimersByTime(2_000); + + expect(retry).toHaveBeenCalledTimes(1); + dispose(); + }); + + it("flags in-flight turns as network-dropped when the connection drops", () => { + const dispose = initializeConnectivityRecovery(); + + useConnectivityStore.getState().setOnline(false); + + expect(markDropped).toHaveBeenCalledTimes(1); + dispose(); + }); + + it("clears offline give-up timers immediately on reconnect so turns resume", () => { + const dispose = initializeConnectivityRecovery(); + + useConnectivityStore.getState().setOnline(false); + useConnectivityStore.getState().setOnline(true); + + // Cleared right away — not gated behind the reconnect debounce. + expect(clearGiveup).toHaveBeenCalledTimes(1); + dispose(); + }); + + it("does not retry when connectivity never dropped", () => { + const dispose = initializeConnectivityRecovery(); + + // Re-asserting the same online state is not a transition. + useConnectivityStore.getState().setOnline(true); + vi.advanceTimersByTime(2_000); + + expect(retry).not.toHaveBeenCalled(); + dispose(); + }); + + it("does not retry if the connection flaps back offline before stabilizing", () => { + const dispose = initializeConnectivityRecovery(); + + useConnectivityStore.getState().setOnline(false); + useConnectivityStore.getState().setOnline(true); + vi.advanceTimersByTime(500); + // Drops again before the debounce window elapses. + useConnectivityStore.getState().setOnline(false); + vi.advanceTimersByTime(2_000); + + expect(retry).not.toHaveBeenCalled(); + dispose(); + }); +}); diff --git a/apps/code/src/renderer/features/connectivity/connectivityRecovery.ts b/apps/code/src/renderer/features/connectivity/connectivityRecovery.ts new file mode 100644 index 0000000000..142c2a10c6 --- /dev/null +++ b/apps/code/src/renderer/features/connectivity/connectivityRecovery.ts @@ -0,0 +1,54 @@ +import { getSessionService } from "@features/sessions/service/service"; +import { useConnectivityStore } from "@stores/connectivityStore"; + +// Wait for the connection to look stable before kicking recovery so a flapping +// network doesn't trigger a retry storm. +const RECONNECT_DEBOUNCE_MS = 1_500; + +// When connectivity is restored, cloud sessions whose SSE stream exhausted its +// reconnect budget while offline are left in the `error` state with no stream +// to deliver `turn_complete`. `retryUnhealthyCloudSessions` re-establishes +// them. Previously this only ran on window focus; firing it on the +// offline->online transition lets a flaky-then-recovered connection self-heal +// without the user having to refocus the app. +export function initializeConnectivityRecovery() { + let pendingTimer: ReturnType | null = null; + + const clearPending = () => { + if (pendingTimer) { + clearTimeout(pendingTimer); + pendingTimer = null; + } + }; + + const unsubscribe = useConnectivityStore.subscribe( + (state) => state.isOnline, + (isOnline, wasOnline) => { + if (isOnline === wasOnline) return; + + if (!isOnline) { + // Dropped (or flapped) before stabilizing — cancel any pending retry. + clearPending(); + // Flag in-flight turns so a completion that arrives after the drop is + // treated as possibly-incomplete rather than a clean "finished". + getSessionService().markInflightTurnsNetworkDropped(); + return; + } + + clearPending(); + // Connection is back — let in-flight turns resume instead of being + // cancelled by their offline give-up timers. Done immediately (not + // debounced) so a quick blip never trips the give-up. + getSessionService().clearOfflineTurnGiveupTimers(); + pendingTimer = setTimeout(() => { + pendingTimer = null; + getSessionService().retryUnhealthyCloudSessions(); + }, RECONNECT_DEBOUNCE_MS); + }, + ); + + return () => { + clearPending(); + unsubscribe(); + }; +} diff --git a/apps/code/src/renderer/features/message-editor/components/PromptInput.tsx b/apps/code/src/renderer/features/message-editor/components/PromptInput.tsx index 4e54828720..bbb6147a1d 100644 --- a/apps/code/src/renderer/features/message-editor/components/PromptInput.tsx +++ b/apps/code/src/renderer/features/message-editor/components/PromptInput.tsx @@ -46,7 +46,9 @@ export interface PromptInputProps { getPromptHistory?: () => string[]; // callbacks onBeforeSubmit?: (text: string, clearEditor: () => void) => boolean; - onSubmit?: (text: string) => void; + // Returning (or resolving to) `false` signals the send failed, so the editor + // restores the submitted content instead of dropping it. + onSubmit?: (text: string) => unknown; onBashCommand?: (command: string) => void; onBashModeChange?: (isBashMode: boolean) => void; onCancel?: () => void; diff --git a/apps/code/src/renderer/features/message-editor/tiptap/useDraftSync.test.tsx b/apps/code/src/renderer/features/message-editor/tiptap/useDraftSync.test.tsx index 133365e525..0e9fc24586 100644 --- a/apps/code/src/renderer/features/message-editor/tiptap/useDraftSync.test.tsx +++ b/apps/code/src/renderer/features/message-editor/tiptap/useDraftSync.test.tsx @@ -21,6 +21,20 @@ function DraftAttachmentsProbe({ sessionId }: { sessionId: string }) { ); } +function RestoreContentProbe({ sessionId }: { sessionId: string }) { + const { restoreContent } = useDraftSync(null, sessionId); + return ( + + ); +} + describe("useDraftSync", () => { beforeEach(() => { vi.clearAllMocks(); @@ -60,4 +74,16 @@ describe("useDraftSync", () => { rerender(); expect(screen.getByText("empty")).toBeInTheDocument(); }); + + it("re-injects content as pending content when restoreContent is called", () => { + render(); + + act(() => { + screen.getByText("restore").click(); + }); + + expect(useDraftStore.getState().pendingContent["session-1"]).toEqual({ + segments: [{ type: "text", text: "lost message" }], + }); + }); }); diff --git a/apps/code/src/renderer/features/message-editor/tiptap/useDraftSync.ts b/apps/code/src/renderer/features/message-editor/tiptap/useDraftSync.ts index c9bc8a2ad4..f383420a36 100644 --- a/apps/code/src/renderer/features/message-editor/tiptap/useDraftSync.ts +++ b/apps/code/src/renderer/features/message-editor/tiptap/useDraftSync.ts @@ -222,10 +222,22 @@ export function useDraftSync( [], ); + // Re-inject previously-submitted content back into the editor, e.g. when a + // send fails on a flaky network so the user doesn't have to retype. Reuses + // the pendingContent path that already re-hydrates a mounted editor. + const restoreContent = useCallback( + (content: EditorContent) => { + if (isContentEmpty(content)) return; + draftActions.setPendingContent(sessionId, content); + }, + [sessionId, draftActions], + ); + return { saveDraft, clearDraft, getContent, + restoreContent, restoredAttachments, }; } diff --git a/apps/code/src/renderer/features/message-editor/tiptap/useTiptapEditor.ts b/apps/code/src/renderer/features/message-editor/tiptap/useTiptapEditor.ts index 7df2b75f70..13aa954da2 100644 --- a/apps/code/src/renderer/features/message-editor/tiptap/useTiptapEditor.ts +++ b/apps/code/src/renderer/features/message-editor/tiptap/useTiptapEditor.ts @@ -44,7 +44,10 @@ export interface UseTiptapEditorOptions { clearOnSubmit?: boolean; getPromptHistory?: () => string[]; onBeforeSubmit?: (text: string, clearEditor: () => void) => boolean; - onSubmit?: (text: string) => void; + // Returning (or resolving to) `false`, or rejecting, signals the send failed, + // so the editor restores the content the user just submitted. Any other value + // is treated as success. + onSubmit?: (text: string) => unknown; onBashCommand?: (command: string) => void; onBashModeChange?: (isBashMode: boolean) => void; onEmptyChange?: (isEmpty: boolean) => void; @@ -577,6 +580,18 @@ export function useTiptapEditor(options: UseTiptapEditorOptions) { draft.clearDraft(); }; + // Put the just-submitted content back when an async send fails (e.g. a + // flaky network drops the request). Skipped if the user already started a + // new message so we don't clobber it. + const restoreOnFailure = () => { + if (!clearOnSubmit) return; + if (!editor.isEmpty) return; + draft.restoreContent(content); + if (content.attachments?.length) { + setAttachments(content.attachments); + } + }; + if (enableBashMode && text.startsWith("!")) { // Bash mode requires immediate execution, can't be queued. // Intentionally bypasses onBeforeSubmit — bash commands run inline and @@ -596,8 +611,14 @@ export function useTiptapEditor(options: UseTiptapEditorOptions) { } } - // Normal prompts can be queued when loading - callbackRefs.current.onSubmit?.(serialized); + // Normal prompts can be queued when loading. The send may fail + // asynchronously; restore the content if it does. + const submitResult = callbackRefs.current.onSubmit?.(serialized); + Promise.resolve(submitResult) + .then((ok) => { + if (ok === false) restoreOnFailure(); + }) + .catch(() => restoreOnFailure()); } doClear(); diff --git a/apps/code/src/renderer/features/sessions/components/ConversationView.tsx b/apps/code/src/renderer/features/sessions/components/ConversationView.tsx index 4afb50fd67..b5738f49e2 100644 --- a/apps/code/src/renderer/features/sessions/components/ConversationView.tsx +++ b/apps/code/src/renderer/features/sessions/components/ConversationView.tsx @@ -11,7 +11,7 @@ import { } from "@features/sessions/stores/sessionStore"; import { useSettingsStore } from "@features/settings/stores/settingsStore"; import { SkillButtonActionMessage } from "@features/skill-buttons/components/SkillButtonActionMessage"; -import { ArrowDown, XCircle } from "@phosphor-icons/react"; +import { ArrowClockwise, ArrowDown, XCircle } from "@phosphor-icons/react"; import { WorkerPoolContextProvider } from "@pierre/diffs/react"; import WorkerUrl from "@pierre/diffs/worker/worker.js?worker&url"; import { Box, Button, Flex, Text } from "@radix-ui/themes"; @@ -59,6 +59,8 @@ interface ConversationViewProps { task?: Task; slackThreadUrl?: string; compact?: boolean; + /** Resend a prompt — used by the Retry on a network-failed turn. */ + onRetryPrompt?: (text: string) => void; } export function ConversationView({ @@ -70,6 +72,7 @@ export function ConversationView({ task, slackThreadUrl, compact = false, + onRetryPrompt, }: ConversationViewProps) { const listRef = useRef(null); const isAtBottomRef = useRef(true); @@ -78,6 +81,9 @@ export function ConversationView({ const showDebugLogs = debugLogsCloudRuns; const contextUsage = useContextUsage(events); + const session = useSessionForTask(taskId); + const connectionLostPromptIds = session?.connectionLostPromptIds; + const { items: conversationItems, lastTurnInfo, @@ -86,8 +92,9 @@ export function ConversationView({ () => buildConversationItems(events, isPromptPending, { showDebugLogs, + connectionLostPromptIds, }), - [events, isPromptPending, showDebugLogs], + [events, isPromptPending, showDebugLogs, connectionLostPromptIds], ); const firstUserMessageIdRef = useRef(undefined); @@ -111,7 +118,6 @@ export function ConversationView({ const pendingPermissionsCount = pendingPermissions.size; const queuedMessages = useQueuedMessagesForTask(taskId); const optimisticItems = useOptimisticItemsForTask(taskId); - const session = useSessionForTask(taskId); const pausedDurationMs = session?.pausedDurationMs ?? 0; const queuedItems = useMemo[]>( @@ -220,7 +226,16 @@ export function ConversationView({ /> ) : null; case "turn_cancelled": - return ; + return ( + onRetryPrompt(item.promptText as string) + : undefined + } + /> + ); case "user_shell_execute": return ; case "queued": @@ -240,7 +255,14 @@ export function ConversationView({ ); } }, - [repoPath, taskId, slackThreadUrl, firstUserMessageId, initialItemIds], + [ + repoPath, + taskId, + slackThreadUrl, + firstUserMessageId, + initialItemIds, + onRetryPrompt, + ], ); const getItemKey = useCallback((item: ConversationItem) => item.id, []); @@ -285,6 +307,7 @@ export function ConversationView({ task={task} isPromptPending={isPromptPending} promptStartedAt={promptStartedAt} + activitySignal={events.length} lastGenerationDuration={ lastTurnInfo?.isComplete ? Math.max(0, lastTurnInfo.durationMs - pausedDurationMs) @@ -337,21 +360,37 @@ const SessionUpdateRow = memo(function SessionUpdateRow({ const TurnCancelledView = memo(function TurnCancelledView({ interruptReason, + onRetry, }: { interruptReason?: string; + onRetry?: () => void; }) { + const isNetworkFailure = interruptReason === "connection_lost"; const message = interruptReason === "moving_to_worktree" ? "Paused while worktree is focused" - : "Interrupted by user"; + : isNetworkFailure + ? "Response may be incomplete. Failed due to network issue" + : "Interrupted by user"; return ( - - + + {message} + {isNetworkFailure && onRetry && ( + + )} ); diff --git a/apps/code/src/renderer/features/sessions/components/GeneratingIndicator.test.tsx b/apps/code/src/renderer/features/sessions/components/GeneratingIndicator.test.tsx new file mode 100644 index 0000000000..728f4747f6 --- /dev/null +++ b/apps/code/src/renderer/features/sessions/components/GeneratingIndicator.test.tsx @@ -0,0 +1,82 @@ +import { act, render, screen } from "@testing-library/react"; +import { afterEach, beforeEach, describe, expect, it, vi } from "vitest"; +import { GeneratingIndicator } from "./GeneratingIndicator"; + +describe("GeneratingIndicator slow-connection hint", () => { + beforeEach(() => { + vi.useFakeTimers(); + }); + + afterEach(() => { + vi.useRealTimers(); + }); + + it("surfaces a slow-connection hint after a long quiet stretch", () => { + render( + , + ); + + expect(screen.queryByText(/slow connection/i)).toBeNull(); + + act(() => { + vi.advanceTimersByTime(9000); + }); + + expect(screen.getByText(/slow connection/i)).toBeInTheDocument(); + }); + + it("clears the hint when stream activity resumes", () => { + const { rerender } = render( + , + ); + + act(() => { + vi.advanceTimersByTime(9000); + }); + expect(screen.getByText(/slow connection/i)).toBeInTheDocument(); + + // A new event arrives — the agent is making progress again. + rerender( + , + ); + act(() => { + vi.advanceTimersByTime(200); + }); + + expect(screen.queryByText(/slow connection/i)).toBeNull(); + }); + + it("never shows the hint when no activity signal is provided", () => { + render(); + + act(() => { + vi.advanceTimersByTime(60000); + }); + + expect(screen.queryByText(/slow connection/i)).toBeNull(); + }); + + it("shows a reconnecting state instead of the thinking activity when offline", () => { + render(); + + expect(screen.getByText(/waiting to reconnect/i)).toBeInTheDocument(); + }); + + it("shows the normal thinking activity when online", () => { + render(); + + expect(screen.queryByText(/waiting to reconnect/i)).toBeNull(); + }); +}); diff --git a/apps/code/src/renderer/features/sessions/components/GeneratingIndicator.tsx b/apps/code/src/renderer/features/sessions/components/GeneratingIndicator.tsx index fc72e73aab..26f65ac700 100644 --- a/apps/code/src/renderer/features/sessions/components/GeneratingIndicator.tsx +++ b/apps/code/src/renderer/features/sessions/components/GeneratingIndicator.tsx @@ -1,4 +1,4 @@ -import { Brain, Circle } from "@phosphor-icons/react"; +import { Brain, Circle, WifiSlash } from "@phosphor-icons/react"; import { Flex, Text } from "@radix-ui/themes"; import { useEffect, useRef, useState } from "react"; @@ -114,31 +114,65 @@ export function formatDuration(ms: number, fractionDigits = 2): string { return `${secs}.${fractionalValue.toString().padStart(fractionDigits, "0")}s`; } +// After this long with no stream activity we hint that the wait may be the +// network rather than the agent, addressing the "long lead time with no sign +// it's the user's internet" report. +const SLOW_HINT_MS = 8000; + interface GeneratingIndicatorProps { /** Timestamp (ms) when the prompt started. Only render this component while a prompt is pending. */ startedAt?: number | null; /** Accumulated time (ms) spent waiting for user input, subtracted from elapsed display. */ pausedDurationMs?: number; + /** + * A value that changes whenever stream activity arrives (e.g. the session's + * event count). When provided, a slow-connection hint appears if no activity + * is seen for `slowHintMs`, and clears as soon as it changes again. Omit to + * disable the hint entirely. + */ + activitySignal?: number; + /** How long to wait with no activity before hinting at a slow connection. */ + slowHintMs?: number; + /** + * When false, the turn can't make progress because the device is offline, so + * we show a "waiting to reconnect" state instead of the thinking activity. + */ + isOnline?: boolean; } export function GeneratingIndicator({ startedAt, pausedDurationMs, + activitySignal, + slowHintMs = SLOW_HINT_MS, + isOnline = true, }: GeneratingIndicatorProps) { const [elapsed, setElapsed] = useState(0); const [activity, setActivity] = useState(getRandomThinkingMessage); + const [isSlow, setIsSlow] = useState(false); const pausedRef = useRef(pausedDurationMs ?? 0); pausedRef.current = pausedDurationMs ?? 0; + // Timestamp of the most recent stream activity; resets the slow-hint timer. + const lastActivityRef = useRef(Date.now()); + // biome-ignore lint/correctness/useExhaustiveDependencies: intentionally keyed on activitySignal to mark each new activity. + useEffect(() => { + lastActivityRef.current = Date.now(); + setIsSlow(false); + }, [activitySignal]); + useEffect(() => { const startTime = startedAt ?? Date.now(); const interval = setInterval(() => { setElapsed(Math.max(0, Date.now() - startTime - pausedRef.current)); + if (activitySignal !== undefined) { + setIsSlow(Date.now() - lastActivityRef.current >= slowHintMs); + } }, 50); return () => clearInterval(interval); - }, [startedAt]); + }, [startedAt, activitySignal, slowHintMs]); useEffect(() => { const interval = setInterval(() => { @@ -148,6 +182,33 @@ export function GeneratingIndicator({ return () => clearInterval(interval); }, []); + if (!isOnline) { + return ( + + + + Connection lost — waiting to reconnect… + + + (Esc to stop + + + + {formatDuration(elapsed, 1)}) + + + ); + } + return ( {formatDuration(elapsed, 1)}) + {isSlow && ( + <> + + + Slow connection? Still trying… + + + )} ); } diff --git a/apps/code/src/renderer/features/sessions/components/SessionFooter.tsx b/apps/code/src/renderer/features/sessions/components/SessionFooter.tsx index 6b988222b4..16761fb71b 100644 --- a/apps/code/src/renderer/features/sessions/components/SessionFooter.tsx +++ b/apps/code/src/renderer/features/sessions/components/SessionFooter.tsx @@ -1,4 +1,5 @@ import type { ContextUsage } from "@features/sessions/hooks/useContextUsage"; +import { useConnectivity } from "@hooks/useConnectivity"; import { Brain, Pause } from "@phosphor-icons/react"; import { Box, Flex, Text } from "@radix-ui/themes"; import type { Task } from "@shared/types"; @@ -18,6 +19,8 @@ interface SessionFooterProps { pausedDurationMs?: number; isCompacting?: boolean; usage?: ContextUsage | null; + /** Changes when stream activity arrives; drives the slow-connection hint. */ + activitySignal?: number; } export function SessionFooter({ @@ -31,7 +34,9 @@ export function SessionFooter({ pausedDurationMs, isCompacting = false, usage, + activitySignal, }: SessionFooterProps) { + const { isOnline } = useConnectivity(); const rightSide = ( {task && } @@ -67,6 +72,8 @@ export function SessionFooter({ {queuedCount > 0 && ( diff --git a/apps/code/src/renderer/features/sessions/components/SessionView.tsx b/apps/code/src/renderer/features/sessions/components/SessionView.tsx index 49b9cdfa95..4698d81342 100644 --- a/apps/code/src/renderer/features/sessions/components/SessionView.tsx +++ b/apps/code/src/renderer/features/sessions/components/SessionView.tsx @@ -57,7 +57,9 @@ interface SessionViewProps { isPromptPending?: boolean | null; promptStartedAt?: number | null; onBeforeSubmit?: (text: string, clearEditor: () => void) => boolean; - onSendPrompt: (text: string) => void; + // Returning (or resolving to) `false` signals the send failed, so the editor + // restores the user's input. Any other value is treated as success. + onSendPrompt: (text: string) => unknown; onBashCommand?: (command: string) => void; onCancelPrompt: () => void; repoPath?: string | null; @@ -255,9 +257,9 @@ export function SessionView({ const handleSubmit = useCallback( (text: string) => { - if (text.trim()) { - onSendPrompt(text); - } + if (!text.trim()) return; + // Return the result so the editor can restore input on send failure. + return onSendPrompt(text); }, [onSendPrompt], ); @@ -473,6 +475,7 @@ export function SessionView({ taskId={taskId} task={task} slackThreadUrl={slackThreadUrl} + onRetryPrompt={onSendPrompt} /> diff --git a/apps/code/src/renderer/features/sessions/components/buildConversationItems.test.ts b/apps/code/src/renderer/features/sessions/components/buildConversationItems.test.ts index 0bdc3d3d88..9a2cde30b2 100644 --- a/apps/code/src/renderer/features/sessions/components/buildConversationItems.test.ts +++ b/apps/code/src/renderer/features/sessions/components/buildConversationItems.test.ts @@ -121,6 +121,73 @@ describe("buildConversationItems", () => { ]); }); + it("carries the prompt text and reason on a network-cancelled turn so it can be retried", () => { + const events: AcpMessage[] = [ + userPromptMsg(1, 5, "summarize the resume"), + { + type: "acp_message", + ts: 2, + message: { + jsonrpc: "2.0", + id: 5, + result: { + stopReason: "cancelled", + _meta: { interruptReason: "connection_lost" }, + }, + }, + }, + ]; + + const result = buildConversationItems(events, null); + const cancelled = result.items.find((i) => i.type === "turn_cancelled"); + + expect(cancelled).toMatchObject({ + type: "turn_cancelled", + interruptReason: "connection_lost", + promptText: "summarize the resume", + }); + }); + + it("labels a turn 'connection_lost' by prompt id when the cancelled signal carried no reason", () => { + // The turn_complete notification path carries stopReason but no interrupt + // reason — the renderer-known connectionLostPromptIds fills it in. + const events: AcpMessage[] = [ + userPromptMsg(1, 9, "summarize the resume"), + turnCompleteMsg(2, "cancelled"), + ]; + + const result = buildConversationItems(events, null, { + connectionLostPromptIds: [9], + }); + const cancelled = result.items.find((i) => i.type === "turn_cancelled"); + + expect(cancelled).toMatchObject({ + type: "turn_cancelled", + interruptReason: "connection_lost", + promptText: "summarize the resume", + }); + }); + + it("shows the network-failure footer on an end_turn turn flagged connection-lost", () => { + // A turn the agent reported as a clean end_turn, but which the app flagged + // as network-affected, still gets the inline footer + Retry. + const events: AcpMessage[] = [ + userPromptMsg(1, 11, "summarize the resume"), + promptResponseMsg(2, 11), + ]; + + const result = buildConversationItems(events, null, { + connectionLostPromptIds: [11], + }); + const cancelled = result.items.find((i) => i.type === "turn_cancelled"); + + expect(cancelled).toMatchObject({ + type: "turn_cancelled", + interruptReason: "connection_lost", + promptText: "summarize the resume", + }); + }); + it("marks cloud turns complete from structured turn completion notifications", () => { const result = buildConversationItems( [userPromptMsg(10, 42, "hello"), turnCompleteMsg(25)], diff --git a/apps/code/src/renderer/features/sessions/components/buildConversationItems.ts b/apps/code/src/renderer/features/sessions/components/buildConversationItems.ts index fbd0d1ee4b..c98874659a 100644 --- a/apps/code/src/renderer/features/sessions/components/buildConversationItems.ts +++ b/apps/code/src/renderer/features/sessions/components/buildConversationItems.ts @@ -54,7 +54,13 @@ export type ConversationItem = actionType: GitActionType; turnId: string; } - | { type: "turn_cancelled"; id: string; interruptReason?: string } + | { + type: "turn_cancelled"; + id: string; + interruptReason?: string; + /** Original prompt text, so a network-failed turn can offer a Retry. */ + promptText?: string; + } | UserShellExecute | { type: "queued"; id: string; message: QueuedMessage }; @@ -87,6 +93,8 @@ interface TurnState { isComplete: boolean; stopReason?: string; interruptReason?: string; + /** The user's prompt text for this turn (for a Retry on network failure). */ + promptText?: string; durationMs: number; toolCalls: Map; context: TurnContext; @@ -106,6 +114,8 @@ interface ItemBuilder { * event for the same id mutates the same card, regardless of which turn is * currently active. */ progressCards: Map; + /** Prompt ids of turns the app flagged as network-failed (renderer-known). */ + connectionLostPromptIds?: number[]; } function createItemBuilder(): ItemBuilder { @@ -162,6 +172,9 @@ function pushItem(b: ItemBuilder, update: RenderItem) { export interface BuildConversationOptions { /** Render `debug`-level console logs inline; without this only info/warn/error show up. */ showDebugLogs?: boolean; + /** Prompt ids of turns abandoned/truncated by a network outage; each is + * labeled "Failed due to network issue" even if its signal carried no reason. */ + connectionLostPromptIds?: number[]; } export function buildConversationItems( @@ -170,6 +183,7 @@ export function buildConversationItems( options?: BuildConversationOptions, ): BuildResult { const b = createItemBuilder(); + b.connectionLostPromptIds = options?.connectionLostPromptIds; for (const event of events) { const msg = event.message; @@ -259,6 +273,7 @@ function handlePromptRequest( context, gitAction, itemCount: 0, + promptText: userContent, }; b.pendingPrompts.set(msg.id, b.currentTurn); @@ -323,6 +338,16 @@ function completePromptTurn( const wasCancelled = turn.stopReason === "cancelled"; turn.context.turnCancelled = wasCancelled; + // A turn the app flagged as network-affected (cancelled by the offline + // give-up, or completed `end_turn` after a drop) gets the inline "Response + // may be incomplete. Failed due to network issue" footer with a Retry — + // even when the agent's signal carried no interrupt reason. + const wasNetworkAffected = + b.connectionLostPromptIds?.includes(turn.promptId) ?? false; + if (wasNetworkAffected && !turn.interruptReason) { + turn.interruptReason = "connection_lost"; + } + if (turn.gitAction.isGitAction && turn.gitAction.actionType) { b.items.push({ type: "git_action_result", @@ -332,11 +357,12 @@ function completePromptTurn( }); } - if (wasCancelled) { + if (wasCancelled || wasNetworkAffected) { b.items.push({ type: "turn_cancelled", id: `${turn.id}-cancelled`, interruptReason: turn.interruptReason, + promptText: turn.promptText, }); } diff --git a/apps/code/src/renderer/features/sessions/hooks/useSessionCallbacks.ts b/apps/code/src/renderer/features/sessions/hooks/useSessionCallbacks.ts index ae236342fb..ae36365b92 100644 --- a/apps/code/src/renderer/features/sessions/hooks/useSessionCallbacks.ts +++ b/apps/code/src/renderer/features/sessions/hooks/useSessionCallbacks.ts @@ -2,6 +2,7 @@ import { tryExecuteCodeCommand } from "@features/message-editor/commands"; import { useDraftStore } from "@features/message-editor/stores/draftStore"; import { useTaskViewed } from "@features/sidebar/hooks/useTaskViewed"; import { trpcClient } from "@renderer/trpc/client"; +import { isSendConnectivityError } from "@shared/errors"; import type { Task } from "@shared/types"; import { useNavigationStore } from "@stores/navigationStore"; import { logger } from "@utils/logger"; @@ -36,8 +37,10 @@ export function useSessionCallbacks({ const sessionRef = useRef(session); sessionRef.current = session; + // Returns false when the send fails so the editor can restore the user's + // input instead of silently dropping it. const handleSendPrompt = useCallback( - async (text: string) => { + async (text: string): Promise => { const currentSession = sessionRef.current; const currentEvents = currentSession?.events ?? []; const handled = await tryExecuteCodeCommand(text, { @@ -52,7 +55,7 @@ export function useSessionCallbacks({ : null, taskRun: task.latest_run ?? null, }); - if (handled) return; + if (handled) return true; try { markAsViewed(taskId); @@ -65,11 +68,17 @@ export function useSessionCallbacks({ if (isViewingTask) { markAsViewed(taskId); } + return true; } catch (error) { - const message = - error instanceof Error ? error.message : "Failed to send message"; - toast.error(message); + // A connectivity failure already restores the message to the composer + // (the editor re-injects it on a false result), so no toast is needed. + if (!isSendConnectivityError(error)) { + const message = + error instanceof Error ? error.message : "Failed to send message"; + toast.error(message); + } log.error("Failed to send prompt", error); + return false; } }, [taskId, repoPath, markActivity, markAsViewed, task.latest_run], diff --git a/apps/code/src/renderer/features/sessions/service/service.test.ts b/apps/code/src/renderer/features/sessions/service/service.test.ts index 1b7f411b4b..3a407ae63e 100644 --- a/apps/code/src/renderer/features/sessions/service/service.test.ts +++ b/apps/code/src/renderer/features/sessions/service/service.test.ts @@ -1,7 +1,9 @@ import type { ContentBlock } from "@agentclientprotocol/sdk"; import type { AgentSession } from "@features/sessions/stores/sessionStore"; +import { isSendConnectivityError } from "@shared/errors"; import type { Task } from "@shared/types"; import type { AcpMessage } from "@shared/types/session-events"; +import { notifyPromptComplete } from "@utils/notifications"; import { beforeEach, describe, expect, it, vi } from "vitest"; // --- Hoisted Mocks --- @@ -255,7 +257,7 @@ vi.mock("@utils/notifications", () => ({ notifyPromptComplete: vi.fn(), })); vi.mock("@renderer/utils/toast", () => ({ - toast: { error: vi.fn(), info: vi.fn() }, + toast: { error: vi.fn(), info: vi.fn(), warning: vi.fn() }, })); vi.mock("@utils/queryClient", () => ({ queryClient: { @@ -3340,13 +3342,59 @@ describe("SessionService", () => { }); describe("sendPrompt", () => { - it("throws when offline", async () => { - mockGetIsOnline.mockReturnValue(false); - const service = getSessionService(); + it("throws a connectivity error and does not send if still offline after the grace window", async () => { + vi.useFakeTimers(); + try { + mockGetIsOnline.mockReturnValue(false); + const service = getSessionService(); - await expect(service.sendPrompt("task-123", "Hello")).rejects.toThrow( - "No internet connection", - ); + const promise = service.sendPrompt("task-123", "Hello"); + const assertion = expect(promise).rejects.toSatisfy( + isSendConnectivityError, + ); + // Stays offline past the give-up window. + await vi.advanceTimersByTimeAsync(41_000); + await assertion; + + expect(mockTrpcAgent.prompt.mutate).not.toHaveBeenCalled(); + } finally { + vi.useRealTimers(); + } + }); + + it("waits for connectivity and sends once the connection returns", async () => { + vi.useFakeTimers(); + try { + // Offline for the guard + first poll, then back online. + mockGetIsOnline + .mockReturnValueOnce(false) + .mockReturnValueOnce(false) + .mockReturnValue(true); + mockSessionStoreSetters.getSessionByTaskId.mockReturnValue( + createMockSession(), + ); + mockTrpcAgent.prompt.mutate.mockResolvedValue({ + stopReason: "end_turn", + }); + const service = getSessionService(); + + const promise = service.sendPrompt("task-123", "Hello"); + await vi.advanceTimersByTimeAsync(3_000); + const result = await promise; + + expect(result.stopReason).toBe("end_turn"); + expect(mockTrpcAgent.prompt.mutate).toHaveBeenCalled(); + // While offline it shows the reconnecting indicator (pending + timer). + expect(mockSessionStoreSetters.updateSession).toHaveBeenCalledWith( + "run-123", + expect.objectContaining({ + isPromptPending: true, + promptStartedAt: expect.any(Number), + }), + ); + } finally { + vi.useRealTimers(); + } }); it("throws when no session exists", async () => { @@ -4024,9 +4072,9 @@ describe("SessionService", () => { mockSessionStoreSetters.setSession.mockImplementation((next) => { session = next as AgentSession; }); - mockSessionStoreSetters.dequeueMessagesAsText.mockReturnValue( - "follow up", - ); + mockSessionStoreSetters.dequeueMessages.mockReturnValue([ + { id: "q-1", content: "follow up", queuedAt: 1700000000 }, + ]); mockBuildAuthenticatedClient.mockReturnValue({ ...mockAuthenticatedClient, @@ -4092,6 +4140,296 @@ describe("SessionService", () => { ); }); }); + + it("re-enqueues queued local messages when the drain dispatch fails", async () => { + const service = getSessionService(); + + let session: AgentSession | undefined; + mockSessionStoreSetters.getSessionByTaskId.mockImplementation( + () => session, + ); + mockSessionStoreSetters.getSessions.mockImplementation(() => + session ? { "run-123": session } : {}, + ); + mockSessionStoreSetters.updateSession.mockImplementation( + (_taskRunId, updates) => { + if (session) session = { ...session, ...updates }; + }, + ); + mockSessionStoreSetters.setSession.mockImplementation((next) => { + session = next as AgentSession; + }); + + const queuedMessage = { + id: "q-1", + content: "follow up", + queuedAt: 1700000000, + }; + mockSessionStoreSetters.dequeueMessages.mockReturnValue([queuedMessage]); + + mockBuildAuthenticatedClient.mockReturnValue({ + ...mockAuthenticatedClient, + createTaskRun: vi.fn().mockResolvedValue({ id: "run-123" }), + appendTaskRunLog: vi.fn(), + }); + mockTrpcAgent.start.mutate.mockResolvedValue({ + channel: "agent-event:run-123", + configOptions: [], + }); + // The drain dispatch fails on a flaky network. + mockTrpcAgent.prompt.mutate.mockRejectedValue( + new Error("transient network failure"), + ); + + await service.connectToTask({ + task: createMockTask(), + repoPath: "/repo", + }); + + const onData = mockTrpcAgent.onSessionEvent.subscribe.mock.calls.at( + -1, + )?.[1]?.onData as ((payload: unknown) => void) | undefined; + expect(onData).toBeDefined(); + + session = createMockSession({ + taskRunId: "run-123", + taskId: "task-123", + status: "connected", + isCloud: false, + currentPromptId: 42, + isPromptPending: true, + messageQueue: [queuedMessage], + }); + + onData?.({ + type: "acp_message", + ts: 1700000001, + message: { + jsonrpc: "2.0", + method: "_posthog/turn_complete", + params: { sessionId: "acp-session", stopReason: "end_turn" }, + }, + }); + onData?.({ + type: "acp_message", + ts: 1700000002, + message: { jsonrpc: "2.0", id: 42, result: { stopReason: "end_turn" } }, + }); + + await vi.waitFor(() => { + expect( + mockSessionStoreSetters.prependQueuedMessages, + ).toHaveBeenCalledWith("task-123", [queuedMessage]); + }); + }); + }); + + describe("network-interrupted turn completion", () => { + it("flags only in-flight turns as network-dropped", () => { + const pending = createMockSession({ + taskRunId: "run-a", + taskId: "task-a", + isPromptPending: true, + }); + const idle = createMockSession({ + taskRunId: "run-b", + taskId: "task-b", + isPromptPending: false, + }); + mockSessionStoreSetters.getSessions.mockReturnValue({ + "run-a": pending, + "run-b": idle, + }); + const service = getSessionService(); + + service.markInflightTurnsNetworkDropped(); + + expect(mockSessionStoreSetters.updateSession).toHaveBeenCalledWith( + "run-a", + { networkDroppedDuringTurn: true }, + ); + expect(mockSessionStoreSetters.updateSession).not.toHaveBeenCalledWith( + "run-b", + expect.objectContaining({ networkDroppedDuringTurn: true }), + ); + }); + + async function setupLocalTurn(overrides: Partial) { + const service = getSessionService(); + let session: AgentSession | undefined; + mockSessionStoreSetters.getSessionByTaskId.mockImplementation( + () => session, + ); + mockSessionStoreSetters.getSessions.mockImplementation(() => + session ? { "run-123": session } : {}, + ); + mockSessionStoreSetters.updateSession.mockImplementation( + (_taskRunId, updates) => { + if (session) session = { ...session, ...updates }; + }, + ); + mockSessionStoreSetters.setSession.mockImplementation((next) => { + session = next as AgentSession; + }); + + mockBuildAuthenticatedClient.mockReturnValue({ + ...mockAuthenticatedClient, + createTaskRun: vi.fn().mockResolvedValue({ id: "run-123" }), + appendTaskRunLog: vi.fn(), + }); + mockTrpcAgent.start.mutate.mockResolvedValue({ + channel: "agent-event:run-123", + configOptions: [], + }); + + await service.connectToTask({ + task: createMockTask(), + repoPath: "/repo", + }); + + const onData = mockTrpcAgent.onSessionEvent.subscribe.mock.calls.at( + -1, + )?.[1]?.onData as ((payload: unknown) => void) | undefined; + + session = createMockSession({ + taskRunId: "run-123", + taskId: "task-123", + status: "connected", + isCloud: false, + currentPromptId: 7, + isPromptPending: true, + ...overrides, + }); + + const deliverResult = (stopReason = "end_turn") => + onData?.({ + type: "acp_message", + ts: 1700000002, + message: { + jsonrpc: "2.0", + id: 7, + result: { stopReason }, + }, + }); + + return { deliverResult }; + } + + it("suppresses the finished notification and flags the turn (no toast) when the network dropped mid-turn", async () => { + const { deliverResult } = await setupLocalTurn({ + networkDroppedDuringTurn: true, + }); + + deliverResult(); + + // No "finished" notification, no toast — the turn is flagged for the + // inline "Response may be incomplete. Failed due to network issue" footer. + expect(notifyPromptComplete).not.toHaveBeenCalled(); + expect(toast.warning).not.toHaveBeenCalled(); + expect(mockSessionStoreSetters.updateSession).toHaveBeenCalledWith( + "run-123", + expect.objectContaining({ connectionLostPromptIds: [7] }), + ); + }); + + it("does not warn when a network-dropped turn was cancelled by the user", async () => { + const { deliverResult } = await setupLocalTurn({ + networkDroppedDuringTurn: true, + }); + + deliverResult("cancelled"); + + // A user-initiated cancel was never surfaced as a completion, so the + // incomplete-warning would just be noise. + expect(toast.warning).not.toHaveBeenCalled(); + expect(notifyPromptComplete).not.toHaveBeenCalledWith( + expect.anything(), + "end_turn", + expect.anything(), + ); + }); + + it("notifies normally when the network stayed up through the turn", async () => { + const { deliverResult } = await setupLocalTurn({ + networkDroppedDuringTurn: false, + }); + + deliverResult(); + + expect(notifyPromptComplete).toHaveBeenCalledWith( + "Test Task", + "end_turn", + "task-123", + ); + expect(toast.warning).not.toHaveBeenCalled(); + }); + }); + + describe("offline turn give-up", () => { + function setupOfflinePending(isCloud: boolean) { + mockGetIsOnline.mockReturnValue(false); + const session = createMockSession({ + taskRunId: "run-1", + taskId: "task-1", + isCloud, + isPromptPending: true, + }); + mockSessionStoreSetters.getSessions.mockReturnValue({ "run-1": session }); + mockSessionStoreSetters.getSessionByTaskId.mockReturnValue(session); + return getSessionService(); + } + + it("cancels with a network reason (no toast) when a local turn stays offline past the cap", async () => { + vi.useFakeTimers(); + try { + const service = setupOfflinePending(false); + + service.markInflightTurnsNetworkDropped(); + await vi.advanceTimersByTimeAsync(41_000); + + // Cancel is tagged as a connection loss so the turn renders "Failed due + // to network issue" with a Retry button, not "interrupted by user". + expect(mockTrpcAgent.cancelPrompt.mutate).toHaveBeenCalledWith({ + sessionId: "run-1", + reason: "connection_lost", + }); + // Feedback is the inline label + Retry button — no toast. + expect(toast.warning).not.toHaveBeenCalled(); + } finally { + vi.useRealTimers(); + } + }); + + it("does not give up if the connection returns before the cap", async () => { + vi.useFakeTimers(); + try { + const service = setupOfflinePending(false); + + service.markInflightTurnsNetworkDropped(); + mockGetIsOnline.mockReturnValue(true); + service.clearOfflineTurnGiveupTimers(); + await vi.advanceTimersByTimeAsync(41_000); + + expect(mockTrpcAgent.cancelPrompt.mutate).not.toHaveBeenCalled(); + expect(toast.warning).not.toHaveBeenCalled(); + } finally { + vi.useRealTimers(); + } + }); + + it("does not give up on cloud turns (they resume via SSE on reconnect)", async () => { + vi.useFakeTimers(); + try { + const service = setupOfflinePending(true); + + service.markInflightTurnsNetworkDropped(); + await vi.advanceTimersByTimeAsync(41_000); + + expect(toast.warning).not.toHaveBeenCalled(); + } finally { + vi.useRealTimers(); + } + }); }); describe("cancelPrompt", () => { diff --git a/apps/code/src/renderer/features/sessions/service/service.ts b/apps/code/src/renderer/features/sessions/service/service.ts index c0903429bd..2456f6da05 100644 --- a/apps/code/src/renderer/features/sessions/service/service.ts +++ b/apps/code/src/renderer/features/sessions/service/service.ts @@ -41,6 +41,7 @@ import { getIsOnline } from "@renderer/stores/connectivityStore"; import { trpc } from "@renderer/trpc"; import { trpcClient } from "@renderer/trpc/client"; import { toast } from "@renderer/utils/toast"; +import { SendConnectivityError } from "@shared/errors"; import { type CloudTaskPermissionRequestUpdate, type CloudTaskUpdatePayload, @@ -97,6 +98,16 @@ const LOCAL_SESSION_RECOVERY_FAILED_MESSAGE = const GITHUB_AUTHORIZATION_REQUIRED_CODE = "github_authorization_required"; const AUTO_RETRY_MAX_ATTEMPTS = 2; const AUTO_RETRY_DELAY_MS = 10_000; +// Grace window: when a send is attempted while offline, wait this long for the +// connection to come back before giving up, so a brief drop doesn't instantly +// fail the message. We only dispatch once connectivity returns — nothing is +// sent speculatively — so this never risks delivering a message twice. +const SEND_CONNECTIVITY_WAIT_MS = 10_000; +const SEND_CONNECTIVITY_POLL_MS = 1_000; +// How long a local turn may stay offline before we stop waiting, cancel it and +// surface a connection-lost error. The connection returning before this cancels +// the timer (the turn auto-resumes), so this only fires on a sustained outage. +const OFFLINE_TURN_GIVEUP_MS = 40_000; class GitHubAuthorizationRequiredForCloudHandoffError extends Error { constructor( @@ -282,6 +293,11 @@ export function resetSessionService(): void { export class SessionService { private connectingTasks = new Map>(); private localRepoPaths = new Map(); + // Per-turn timers that give up on a local turn stuck offline (keyed by taskRunId). + private offlineTurnGiveupTimers = new Map< + string, + ReturnType + >(); private localRecoveryAttempts = new Map>(); /** Re-entrance guard for cloud queue dispatch (per taskId). */ private dispatchingCloudQueues = new Set(); @@ -1196,11 +1212,7 @@ export class SessionService { if (isLive) { // Queued messages will start a new turn — suppress the "done" notification in that case. if (session.messageQueue.length === 0) { - notifyPromptComplete( - session.taskTitle, - "end_turn", - session.taskId, - ); + this.notifyTurnComplete(taskRunId, "end_turn"); } taskViewedApi.markActivity(session.taskId); } @@ -1316,7 +1328,7 @@ export class SessionService { // Only notify when queue is empty - queued messages will start a new turn if (stopReason && !hasQueuedMessages) { - notifyPromptComplete(session.taskTitle, stopReason, session.taskId); + this.notifyTurnComplete(taskRunId, stopReason, msg.id); } taskViewedApi.markActivity(session.taskId); @@ -1503,14 +1515,55 @@ export class SessionService { * Send a prompt to the agent. * Queues if a prompt is already pending. */ + /** + * Poll for connectivity up to `timeoutMs`. Returns true as soon as the + * connection is back, false if the window elapses while still offline. The + * connectivity service re-checks in the background every few seconds, so + * polling the cached status is enough. + */ + private async waitForConnectivity( + timeoutMs = SEND_CONNECTIVITY_WAIT_MS, + ): Promise { + if (getIsOnline()) return true; + const deadline = Date.now() + timeoutMs; + while (Date.now() < deadline) { + await new Promise((resolve) => + setTimeout(resolve, SEND_CONNECTIVITY_POLL_MS), + ); + if (getIsOnline()) return true; + } + return getIsOnline(); + } + async sendPrompt( taskId: string, prompt: string | ContentBlock[], ): Promise<{ stopReason: string }> { if (!getIsOnline()) { - throw new Error( - "No internet connection. Please check your connection and try again.", - ); + // Don't fail instantly on a drop. Surface the same "Connection lost — + // waiting to reconnect…" indicator (with timer) as a mid-turn drop while + // we wait for the connection, then send once it's back. If it's still + // down after the give-up window, surface a connectivity error. + const offlineSession = sessionStoreSetters.getSessionByTaskId(taskId); + if (offlineSession) { + sessionStoreSetters.updateSession(offlineSession.taskRunId, { + isPromptPending: true, + promptStartedAt: Date.now(), + pausedDurationMs: 0, + }); + } + const recovered = await this.waitForConnectivity(OFFLINE_TURN_GIVEUP_MS); + // Clear the transient reconnecting state; the normal flow re-applies + // pending when it actually dispatches. + if (offlineSession) { + sessionStoreSetters.updateSession(offlineSession.taskRunId, { + isPromptPending: false, + promptStartedAt: null, + }); + } + if (!recovered) { + throw new SendConnectivityError(); + } } let session = sessionStoreSetters.getSessionByTaskId(taskId); @@ -1609,22 +1662,27 @@ export class SessionService { /** * Send all queued messages as a single prompt. * Called internally when a turn completes and there are queued messages. - * Queue is cleared atomically before sending - if sending fails, messages are lost - * (this is acceptable since the user can re-type; avoiding complex retry logic). + * The queue is drained atomically before sending; if the dispatch fails + * (e.g. a flaky network), the messages are prepended back onto the queue so + * they aren't silently lost and drain naturally on the next attempt. */ private async sendQueuedMessages( taskId: string, ): Promise<{ stopReason: string }> { - const combinedText = sessionStoreSetters.dequeueMessagesAsText(taskId); - if (!combinedText) { + const messages = sessionStoreSetters.dequeueMessages(taskId); + if (messages.length === 0) { return { stopReason: "skipped" }; } + const combinedText = messages.map((msg) => msg.content).join("\n\n"); const session = sessionStoreSetters.getSessionByTaskId(taskId); if (!session) { - log.warn("No session found for queued messages, messages lost", { + // No live session to dispatch to — put the messages back so a later + // reconnect can drain them rather than dropping the user's input. + sessionStoreSetters.prependQueuedMessages(taskId, messages); + log.warn("No session found for queued messages, re-enqueued", { taskId, - lostMessageLength: combinedText.length, + requeuedCount: messages.length, }); return { stopReason: "no_session" }; } @@ -1652,10 +1710,12 @@ export class SessionService { try { return await this.sendLocalPrompt(session, blocks, combinedText); } catch (error) { - // Log that queued messages were lost due to send failure - log.error("Failed to send queued messages, messages lost", { + // Roll the drain back so the queued messages survive a transient send + // failure and retry on the next drain instead of being lost. + sessionStoreSetters.prependQueuedMessages(taskId, messages); + log.warn("Failed to send queued messages, re-enqueued for retry", { taskId, - lostMessageLength: combinedText.length, + requeuedCount: messages.length, error, }); throw error; @@ -1671,6 +1731,7 @@ export class SessionService { isPromptPending: true, promptStartedAt: Date.now(), pausedDurationMs: 0, + networkDroppedDuringTurn: false, }); const skillButtonId = extractSkillButtonId(blocks); @@ -1757,7 +1818,10 @@ export class SessionService { /** * Cancel the current prompt. */ - async cancelPrompt(taskId: string): Promise { + async cancelPrompt( + taskId: string, + reason?: "user_request" | "moving_to_worktree" | "connection_lost", + ): Promise { const session = sessionStoreSetters.getSessionByTaskId(taskId); if (!session) return false; @@ -1773,6 +1837,7 @@ export class SessionService { try { const result = await trpcClient.agent.cancelPrompt.mutate({ sessionId: session.taskRunId, + reason, }); const durationSeconds = Math.round( @@ -1923,6 +1988,7 @@ export class SessionService { promptStartedAt: Date.now(), pausedDurationMs: 0, agentIdleForRunId: undefined, + networkDroppedDuringTurn: false, }); this.cloudRunIdleTracker.markBusy(currentSessionBeforeSend); sessionStoreSetters.appendOptimisticItem(session.taskRunId, { @@ -3367,6 +3433,146 @@ export class SessionService { } } + /** + * Mark every in-flight turn as having seen a connectivity drop. Called on the + * offline edge so that when the turn later reports `end_turn` (the agent SDK + * can finalize a network-truncated response as a clean stop), we treat it as + * possibly-incomplete instead of notifying the user it finished. + */ + public markInflightTurnsNetworkDropped(): void { + const sessions = sessionStoreSetters.getSessions(); + for (const session of Object.values(sessions)) { + if (!session.isPromptPending) continue; + + if (!session.networkDroppedDuringTurn) { + sessionStoreSetters.updateSession(session.taskRunId, { + networkDroppedDuringTurn: true, + }); + } + + // Local turns can't make progress while the device is offline, so bound + // the wait: give up after the cap unless the connection returns first. + // Cloud turns keep running server-side and resume via SSE on reconnect, + // so we don't force-cancel those. + if ( + !session.isCloud && + !this.offlineTurnGiveupTimers.has(session.taskRunId) + ) { + const { taskRunId, taskId } = session; + const timer = setTimeout(() => { + this.failStalledOfflineTurn(taskRunId, taskId); + }, OFFLINE_TURN_GIVEUP_MS); + this.offlineTurnGiveupTimers.set(taskRunId, timer); + } + } + } + + private clearOfflineTurnGiveupTimer(taskRunId: string): void { + const timer = this.offlineTurnGiveupTimers.get(taskRunId); + if (timer) { + clearTimeout(timer); + this.offlineTurnGiveupTimers.delete(taskRunId); + } + } + + /** + * Cancel all pending offline give-up timers — called when connectivity is + * restored so in-flight turns resume rather than being cancelled. + */ + public clearOfflineTurnGiveupTimers(): void { + for (const timer of this.offlineTurnGiveupTimers.values()) { + clearTimeout(timer); + } + this.offlineTurnGiveupTimers.clear(); + } + + /** + * Stop waiting on a local turn that has been offline past the cap: cancel it + * (clearing the pending spinner) and tell the user the response couldn't + * finish. No-ops if the connection came back or the turn already ended. + */ + private failStalledOfflineTurn(taskRunId: string, taskId: string): void { + this.clearOfflineTurnGiveupTimer(taskRunId); + if (getIsOnline()) return; + const session = sessionStoreSetters.getSessions()[taskRunId]; + if (!session?.isPromptPending) return; + + log.warn("Giving up on a turn stalled offline past the cap", { + taskRunId, + taskId, + }); + + // Tag this turn's prompt id so the conversation builder labels it "Failed + // due to network issue" with a Retry — independent of whatever reason the + // agent's cancelled signal ends up carrying. Accumulated so the label + // survives later turns (e.g. the Retry itself). + if (typeof session.currentPromptId === "number") { + sessionStoreSetters.updateSession(taskRunId, { + connectionLostPromptIds: [ + ...(session.connectionLostPromptIds ?? []), + session.currentPromptId, + ], + }); + } + + // Cancel the hung turn. The inline "Failed due to network issue" label + + // Retry button is the feedback; the message isn't pulled into the composer. + void this.cancelPrompt(taskId, "connection_lost").catch((error) => { + log.warn("Failed to cancel stalled offline turn", { taskId, error }); + }); + } + + /** + * Fire the turn-complete notification unless the connection dropped during + * the turn — in that case the reported completion is suspect (the response + * may be truncated), so we flag the turn for the inline "Response may be + * incomplete. Failed due to network issue" footer instead of signalling + * success. + */ + private notifyTurnComplete( + taskRunId: string, + stopReason: string, + promptId?: number, + ): void { + // The turn ended, so any offline give-up timer for it is moot. + this.clearOfflineTurnGiveupTimer(taskRunId); + + const session = sessionStoreSetters.getSessions()[taskRunId]; + if (!session) return; + + const droppedDuringTurn = session.networkDroppedDuringTurn === true; + if (droppedDuringTurn) { + sessionStoreSetters.updateSession(taskRunId, { + networkDroppedDuringTurn: false, + }); + } + + // A turn the agent reported as a normal completion (`end_turn`) but which + // saw a network drop is suspect — the response may be truncated. Flag its + // prompt id so the conversation renders the inline "Response may be + // incomplete. Failed due to network issue" footer (with Retry), and don't + // signal a clean finish. (Cancelled/errored turns were never notified.) + if ( + stopReason === "end_turn" && + droppedDuringTurn && + typeof promptId === "number" + ) { + log.warn( + "Turn completed after a network drop; flagging as possibly incomplete", + { taskRunId, taskId: session.taskId, promptId }, + ); + sessionStoreSetters.updateSession(taskRunId, { + connectionLostPromptIds: [ + ...(session.connectionLostPromptIds ?? []), + promptId, + ], + }); + return; + } + + notifyPromptComplete(session.taskTitle, stopReason, session.taskId); + } + public updateSessionTaskTitle(taskId: string, taskTitle: string): void { const session = sessionStoreSetters.getSessionByTaskId(taskId); if (!session) return; diff --git a/apps/code/src/renderer/features/sessions/stores/sessionStore.ts b/apps/code/src/renderer/features/sessions/stores/sessionStore.ts index 718206228b..d64f2d4368 100644 --- a/apps/code/src/renderer/features/sessions/stores/sessionStore.ts +++ b/apps/code/src/renderer/features/sessions/stores/sessionStore.ts @@ -53,6 +53,15 @@ export interface AgentSession { isPromptPending: boolean; isCompacting: boolean; promptStartedAt: number | null; + /** Set when connectivity dropped while this turn was in flight, so its + * eventual completion is treated as possibly-incomplete rather than a clean + * finish (the agent may have finalized a truncated response as `end_turn`). */ + networkDroppedDuringTurn?: boolean; + /** Prompt ids of turns abandoned/truncated by a network outage. The + * conversation builder labels each "Failed due to network issue" (with a + * Retry) regardless of the reason the agent's signal carried. Persists for + * the session so the label survives later turns (e.g. a Retry). */ + connectionLostPromptIds?: number[]; /** JSON-RPC id of the currently in-flight session/prompt request. Used to * correlate late-arriving responses (e.g. from a cancelled prior turn) so * they don't clear the pending state of a newer turn. */ diff --git a/apps/code/src/shared/errors.ts b/apps/code/src/shared/errors.ts index 37a9c727d9..d888f63d4d 100644 --- a/apps/code/src/shared/errors.ts +++ b/apps/code/src/shared/errors.ts @@ -13,6 +13,29 @@ export function isNotAuthenticatedError(error: unknown): boolean { ); } +/** + * Thrown when a send can't be delivered because the device is offline (after a + * bounded grace wait for the connection to return). The renderer treats this + * specially: it restores the user's input and shows a connection-failure toast + * rather than dropping the message. + */ +export class SendConnectivityError extends Error { + constructor( + message = "No internet connection. Please check your connection and try again.", + ) { + super(message); + this.name = "SendConnectivityError"; + } +} + +export function isSendConnectivityError(error: unknown): boolean { + return ( + typeof error === "object" && + error !== null && + (error as { name?: unknown }).name === "SendConnectivityError" + ); +} + const AUTH_ERROR_PATTERNS = [ "authentication required", "failed to authenticate",