From 1718cb45e0c5ce75919bba930d16b8ad910e529c Mon Sep 17 00:00:00 2001 From: Matt Date: Thu, 28 May 2026 16:51:20 +0200 Subject: [PATCH 1/5] fix(workflow-executor): add per-invocation AI timeout to surface hanging provider errors [PRD-409] MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When the AI provider hangs (no response, internal retries, or holds the connection open), the previous code relied on the global STEP_TIMEOUT_MS (default 5 min) to fail the step. From the user's perspective this looks like an infinite spinner. Add a dedicated timeout on each AI invocation (default 60s, configurable via AI_INVOKE_TIMEOUT_MS) using AbortController + signal so the underlying HTTP request is actually cancelled. On timeout, throws the new AiInvokeTimeoutError, which BaseStepExecutor.execute() converts to an error outcome with a user-friendly message — the orchestrator then sets context.error on the step and the frontend exits its isLoading state immediately. fixes PRD-409 Co-Authored-By: Claude Opus 4.7 (1M context) --- .../src/build-workflow-executor.ts | 3 + packages/workflow-executor/src/cli-core.ts | 3 + packages/workflow-executor/src/defaults.ts | 1 + packages/workflow-executor/src/errors.ts | 12 ++ .../src/executors/base-step-executor.ts | 27 ++++- .../src/executors/step-executor-factory.ts | 2 + packages/workflow-executor/src/runner.ts | 5 + .../src/types/execution-context.ts | 1 + .../test/executors/base-step-executor.test.ts | 108 ++++++++++++++++++ 9 files changed, 159 insertions(+), 3 deletions(-) diff --git a/packages/workflow-executor/src/build-workflow-executor.ts b/packages/workflow-executor/src/build-workflow-executor.ts index 8087d22d0d..65976bd3d7 100644 --- a/packages/workflow-executor/src/build-workflow-executor.ts +++ b/packages/workflow-executor/src/build-workflow-executor.ts @@ -14,6 +14,7 @@ import ForestServerWorkflowPort from './adapters/forest-server-workflow-port'; import ForestadminClientActivityLogPortFactory from './adapters/forestadmin-client-activity-log-port-factory'; import ServerAiAdapter from './adapters/server-ai-adapter'; import { + DEFAULT_AI_INVOKE_TIMEOUT_MS, DEFAULT_FOREST_SERVER_URL, DEFAULT_POLLING_INTERVAL_MS, DEFAULT_STEP_TIMEOUT_MS, @@ -43,6 +44,7 @@ export interface ExecutorOptions { logger?: Logger; stopTimeoutMs?: number; stepTimeoutMs?: number; + aiInvokeTimeoutMs?: number; // Max auto-chained steps per entry (see RunnerConfig.maxChainDepth). 0 disables chaining. maxChainDepth?: number; // Dev only: makes every AI call fail immediately so error paths can be exercised locally. @@ -112,6 +114,7 @@ function buildCommonDependencies(options: ExecutorOptions) { authSecret: options.authSecret, stopTimeoutMs: options.stopTimeoutMs, stepTimeoutMs: options.stepTimeoutMs ?? DEFAULT_STEP_TIMEOUT_MS, + aiInvokeTimeoutMs: options.aiInvokeTimeoutMs ?? DEFAULT_AI_INVOKE_TIMEOUT_MS, maxChainDepth: options.maxChainDepth, }; } diff --git a/packages/workflow-executor/src/cli-core.ts b/packages/workflow-executor/src/cli-core.ts index 1c683af027..88e1458ff5 100644 --- a/packages/workflow-executor/src/cli-core.ts +++ b/packages/workflow-executor/src/cli-core.ts @@ -13,6 +13,7 @@ import { type WorkflowExecutor, } from './build-workflow-executor'; import { + DEFAULT_AI_INVOKE_TIMEOUT_MS, DEFAULT_FOREST_SERVER_URL, DEFAULT_HTTP_PORT, DEFAULT_MAX_CHAIN_DEPTH, @@ -158,6 +159,7 @@ export function readEnvConfig(env: NodeJS.ProcessEnv, args: CliArgs): CliConfig pollingIntervalMs: parsePositiveIntEnv('POLLING_INTERVAL_MS', env.POLLING_INTERVAL_MS), stopTimeoutMs: parsePositiveIntEnv('STOP_TIMEOUT_MS', env.STOP_TIMEOUT_MS), stepTimeoutMs: parsePositiveIntEnv('STEP_TIMEOUT_MS', env.STEP_TIMEOUT_MS), + aiInvokeTimeoutMs: parsePositiveIntEnv('AI_INVOKE_TIMEOUT_MS', env.AI_INVOKE_TIMEOUT_MS), maxChainDepth: parsePositiveIntEnv('MAX_CHAIN_DEPTH', env.MAX_CHAIN_DEPTH), ...(aiConfigurations && { aiConfigurations }), ...(env.FORCE_AI_ERROR === 'true' && { forceAiError: true }), @@ -194,6 +196,7 @@ Optional environment variables: POLLING_INTERVAL_MS Default: ${DEFAULT_POLLING_INTERVAL_MS} STOP_TIMEOUT_MS Default: ${DEFAULT_STOP_TIMEOUT_MS} STEP_TIMEOUT_MS Max duration of a step in ms (default: ${DEFAULT_STEP_TIMEOUT_MS}) + AI_INVOKE_TIMEOUT_MS Max duration of a single AI provider invocation in ms (default: ${DEFAULT_AI_INVOKE_TIMEOUT_MS}) MAX_CHAIN_DEPTH Max steps auto-executed per run before yielding (default: ${DEFAULT_MAX_CHAIN_DEPTH}) NO_COLOR Set to any value to disable ANSI colors in pretty logs FORCE_AI_ERROR Set to "true" to make every AI call fail (dev only, to test error paths) diff --git a/packages/workflow-executor/src/defaults.ts b/packages/workflow-executor/src/defaults.ts index b03e4877e4..10b278fd9c 100644 --- a/packages/workflow-executor/src/defaults.ts +++ b/packages/workflow-executor/src/defaults.ts @@ -2,5 +2,6 @@ export const DEFAULT_HTTP_PORT = 3400; export const DEFAULT_FOREST_SERVER_URL = 'https://api.forestadmin.com'; export const DEFAULT_POLLING_INTERVAL_MS = 30_000; export const DEFAULT_STEP_TIMEOUT_MS = 5 * 60_000; +export const DEFAULT_AI_INVOKE_TIMEOUT_MS = 60_000; export const DEFAULT_STOP_TIMEOUT_MS = 30_000; export const DEFAULT_MAX_CHAIN_DEPTH = 50; diff --git a/packages/workflow-executor/src/errors.ts b/packages/workflow-executor/src/errors.ts index 4a8038a610..8ac3e6ee1c 100644 --- a/packages/workflow-executor/src/errors.ts +++ b/packages/workflow-executor/src/errors.ts @@ -209,6 +209,18 @@ export class StepTimeoutError extends WorkflowExecutorError { } } +// Thrown when the AI provider does not respond within the configured timeout — distinct from +// StepTimeoutError so we can surface a provider-specific message and tune the AI timeout +// independently of the step timeout (AI hangs are common; record fetches are not). +export class AiInvokeTimeoutError extends WorkflowExecutorError { + constructor(timeoutMs: number) { + super( + `AI provider did not respond within ${timeoutMs}ms`, + 'The AI provider did not respond in time. Please try again, or contact your administrator if the problem persists.', + ); + } +} + export class NoMcpToolsError extends WorkflowExecutorError { constructor(requestedMcpServerId?: string, loadedMcpServerIds?: readonly string[]) { const technical = requestedMcpServerId diff --git a/packages/workflow-executor/src/executors/base-step-executor.ts b/packages/workflow-executor/src/executors/base-step-executor.ts index a7d76228c2..88643c3a2b 100644 --- a/packages/workflow-executor/src/executors/base-step-executor.ts +++ b/packages/workflow-executor/src/executors/base-step-executor.ts @@ -17,6 +17,7 @@ import type { import { SystemMessage } from '@forestadmin/ai-proxy'; import { + AiInvokeTimeoutError, InvalidAiRequestError, MalformedToolCallError, MissingToolCallError, @@ -328,9 +329,29 @@ export default abstract class BaseStepExecutor { BaseStepExecutor.assertNoMidArraySystemMessages(messages); const modelWithTools = this.context.model.bindTools(tools, { tool_choice: 'any' }); - const response = await modelWithTools.invoke( - BaseStepExecutor.mergeLeadingSystemMessages(messages), - ); + const preparedMessages = BaseStepExecutor.mergeLeadingSystemMessages(messages); + const aiTimeoutMs = this.context.aiInvokeTimeoutMs; + + let response; + + if (!aiTimeoutMs || aiTimeoutMs <= 0) { + response = await modelWithTools.invoke(preparedMessages); + } else { + const controller = new AbortController(); + const timer = setTimeout(() => controller.abort(), aiTimeoutMs); + + try { + response = await modelWithTools.invoke(preparedMessages, { + signal: controller.signal, + }); + } catch (err) { + if (controller.signal.aborted) throw new AiInvokeTimeoutError(aiTimeoutMs); + throw err; + } finally { + clearTimeout(timer); + } + } + const toolCall = response.tool_calls?.[0]; if (toolCall !== undefined) { diff --git a/packages/workflow-executor/src/executors/step-executor-factory.ts b/packages/workflow-executor/src/executors/step-executor-factory.ts index 736b633215..fa7c24a649 100644 --- a/packages/workflow-executor/src/executors/step-executor-factory.ts +++ b/packages/workflow-executor/src/executors/step-executor-factory.ts @@ -41,6 +41,7 @@ export interface StepContextConfig { schemaCache: SchemaCache; logger: Logger; stepTimeoutMs?: number; + aiInvokeTimeoutMs?: number; } export default class StepExecutorFactory { @@ -135,6 +136,7 @@ export default class StepExecutorFactory { logger: cfg.logger, incomingPendingData, stepTimeoutMs: cfg.stepTimeoutMs, + aiInvokeTimeoutMs: cfg.aiInvokeTimeoutMs, activityLogPort, }; } diff --git a/packages/workflow-executor/src/runner.ts b/packages/workflow-executor/src/runner.ts index ba8e423e23..86ee4178b9 100644 --- a/packages/workflow-executor/src/runner.ts +++ b/packages/workflow-executor/src/runner.ts @@ -42,6 +42,10 @@ export interface RunnerConfig { // On timeout the step reports status:error; the underlying work is not aborted (Promise.race // limitation). Late rejections are caught and logged; late resolutions are silently discarded. stepTimeoutMs?: number; + // Per-AI-invocation timeout (used by BaseStepExecutor.invokeWithTools). Aborts the underlying + // HTTP request via AbortSignal so a hanging provider is killed quickly, before stepTimeoutMs + // would fire. 0/undefined disables. + aiInvokeTimeoutMs?: number; // Max number of ADDITIONAL steps auto-chained via /update-step response before yielding to the // next poll cycle (counted after the initial step). 0 disables chaining entirely. Default 50. maxChainDepth?: number; @@ -416,6 +420,7 @@ export default class Runner { schemaCache: this.config.schemaCache, logger: this.logger, stepTimeoutMs: this.config.stepTimeoutMs, + aiInvokeTimeoutMs: this.config.aiInvokeTimeoutMs, }; } } diff --git a/packages/workflow-executor/src/types/execution-context.ts b/packages/workflow-executor/src/types/execution-context.ts index 94ea4b3a6d..4a1ca06128 100644 --- a/packages/workflow-executor/src/types/execution-context.ts +++ b/packages/workflow-executor/src/types/execution-context.ts @@ -41,5 +41,6 @@ export interface ExecutionContext readonly logger: Logger; readonly incomingPendingData?: unknown; readonly stepTimeoutMs?: number; + readonly aiInvokeTimeoutMs?: number; readonly activityLogPort: ActivityLogPort; } diff --git a/packages/workflow-executor/test/executors/base-step-executor.test.ts b/packages/workflow-executor/test/executors/base-step-executor.test.ts index 035b277d2b..5119696a6b 100644 --- a/packages/workflow-executor/test/executors/base-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/base-step-executor.test.ts @@ -11,6 +11,7 @@ import type { BaseMessage, DynamicStructuredTool } from '@forestadmin/ai-proxy'; import { HumanMessage, SystemMessage } from '@forestadmin/ai-proxy'; import { + AiInvokeTimeoutError, InvalidAiRequestError, MalformedToolCallError, MissingToolCallError, @@ -854,6 +855,113 @@ describe('BaseStepExecutor', () => { ); }); }); + + describe('AI invoke timeout', () => { + // Mocks a model.invoke that never resolves on its own but rejects with AbortError + // when its received AbortSignal fires — mimics LangChain's behavior on signal.abort(). + function makeHangingModel() { + const invoke = jest.fn().mockImplementation( + (_messages, opts) => + new Promise((_resolve, reject) => { + opts?.signal?.addEventListener('abort', () => { + const err = new Error('Aborted'); + err.name = 'AbortError'; + reject(err); + }); + }), + ); + + return { + model: { + bindTools: jest.fn().mockReturnValue({ invoke }), + } as unknown as ExecutionContext['model'], + invoke, + }; + } + + it('throws AiInvokeTimeoutError when model.invoke hangs beyond aiInvokeTimeoutMs', async () => { + jest.useFakeTimers(); + + try { + const { model } = makeHangingModel(); + const executor = new TestableExecutor(makeContext({ model, aiInvokeTimeoutMs: 100 })); + const promise = executor.invokeWithTool(dummyMessages, dummyTool); + // Attach catch synchronously so a late rejection (after advanceTimersByTime) doesn't + // produce an unhandled rejection warning. + const caught = promise.catch(err => err); + jest.advanceTimersByTime(150); + const err = await caught; + + expect(err).toBeInstanceOf(AiInvokeTimeoutError); + expect((err as Error).message).toContain('100ms'); + } finally { + jest.useRealTimers(); + } + }); + + it('passes the AbortSignal as the second arg to model.invoke', async () => { + const { model, invoke } = makeMockModel({ + tool_calls: [{ name: 'tool', args: {}, id: 'c1' }], + }); + const executor = new TestableExecutor(makeContext({ model, aiInvokeTimeoutMs: 5_000 })); + + await executor.invokeWithTool(dummyMessages, dummyTool); + + expect(invoke).toHaveBeenCalledWith( + expect.any(Array), + expect.objectContaining({ signal: expect.any(AbortSignal) }), + ); + }); + + it('does not pass any options to model.invoke when aiInvokeTimeoutMs is unset', async () => { + const { model, invoke } = makeMockModel({ + tool_calls: [{ name: 'tool', args: {}, id: 'c1' }], + }); + const executor = new TestableExecutor(makeContext({ model, aiInvokeTimeoutMs: undefined })); + + await executor.invokeWithTool(dummyMessages, dummyTool); + + expect(invoke).toHaveBeenCalledTimes(1); + expect(invoke.mock.calls[0]).toHaveLength(1); + }); + + it('treats aiInvokeTimeoutMs <= 0 as disabled', async () => { + const { model, invoke } = makeMockModel({ + tool_calls: [{ name: 'tool', args: {}, id: 'c1' }], + }); + const executor = new TestableExecutor(makeContext({ model, aiInvokeTimeoutMs: 0 })); + + await executor.invokeWithTool(dummyMessages, dummyTool); + + expect(invoke.mock.calls[0]).toHaveLength(1); + }); + + it('rethrows non-abort errors without wrapping them', async () => { + const apiError = Object.assign(new Error('OpenAI 503'), { status: 503, name: 'APIError' }); + const invoke = jest.fn().mockRejectedValue(apiError); + const model = { + bindTools: jest.fn().mockReturnValue({ invoke }), + } as unknown as ExecutionContext['model']; + const executor = new TestableExecutor(makeContext({ model, aiInvokeTimeoutMs: 5_000 })); + + await expect(executor.invokeWithTool(dummyMessages, dummyTool)).rejects.toBe(apiError); + }); + + it('clears the timer after a successful invoke (no unref leak)', async () => { + const { model } = makeMockModel({ + tool_calls: [{ name: 'tool', args: {}, id: 'c1' }], + }); + const clearSpy = jest.spyOn(global, 'clearTimeout'); + const executor = new TestableExecutor(makeContext({ model, aiInvokeTimeoutMs: 5_000 })); + + try { + await executor.invokeWithTool(dummyMessages, dummyTool); + expect(clearSpy).toHaveBeenCalled(); + } finally { + clearSpy.mockRestore(); + } + }); + }); }); describe('patchAndReloadPendingData', () => { From b3066e32af10ad1cc1b8d20908fead95da3670d8 Mon Sep 17 00:00:00 2001 From: Matt Date: Tue, 2 Jun 2026 12:11:10 +0200 Subject: [PATCH 2/5] refactor(workflow-executor): delegate AI invoke timeout to LangChain Replace the manual AbortController + setTimeout in invokeWithTools with LangChain's native `timeout` call option, which it converts to an AbortSignal.timeout(ms) and forwards to the underlying HTTP request (real cancellation, not just a race). Lowers invokeWithTools complexity. Map the resulting TimeoutError/AbortError to AiInvokeTimeoutError to keep the user-facing message. Lower the default to 30s. Co-Authored-By: Claude Opus 4.8 (1M context) --- packages/workflow-executor/src/defaults.ts | 2 +- .../src/executors/base-step-executor.ts | 30 +++--- .../test/executors/base-step-executor.test.ts | 99 ++++++++----------- 3 files changed, 56 insertions(+), 75 deletions(-) diff --git a/packages/workflow-executor/src/defaults.ts b/packages/workflow-executor/src/defaults.ts index 10b278fd9c..746077301f 100644 --- a/packages/workflow-executor/src/defaults.ts +++ b/packages/workflow-executor/src/defaults.ts @@ -2,6 +2,6 @@ export const DEFAULT_HTTP_PORT = 3400; export const DEFAULT_FOREST_SERVER_URL = 'https://api.forestadmin.com'; export const DEFAULT_POLLING_INTERVAL_MS = 30_000; export const DEFAULT_STEP_TIMEOUT_MS = 5 * 60_000; -export const DEFAULT_AI_INVOKE_TIMEOUT_MS = 60_000; +export const DEFAULT_AI_INVOKE_TIMEOUT_MS = 30_000; export const DEFAULT_STOP_TIMEOUT_MS = 30_000; export const DEFAULT_MAX_CHAIN_DEPTH = 50; diff --git a/packages/workflow-executor/src/executors/base-step-executor.ts b/packages/workflow-executor/src/executors/base-step-executor.ts index 88643c3a2b..594b5a039d 100644 --- a/packages/workflow-executor/src/executors/base-step-executor.ts +++ b/packages/workflow-executor/src/executors/base-step-executor.ts @@ -331,25 +331,27 @@ export default abstract class BaseStepExecutor 0); let response; - if (!aiTimeoutMs || aiTimeoutMs <= 0) { - response = await modelWithTools.invoke(preparedMessages); - } else { - const controller = new AbortController(); - const timer = setTimeout(() => controller.abort(), aiTimeoutMs); + try { + // LangChain turns the `timeout` call option into an AbortSignal (AbortSignal.timeout) and + // forwards it down to the underlying HTTP request, so a hanging provider is actually + // cancelled — not merely raced. 0/undefined leaves the call un-timed. + response = timeoutEnabled + ? await modelWithTools.invoke(preparedMessages, { timeout: aiTimeoutMs }) + : await modelWithTools.invoke(preparedMessages); + } catch (err) { + // On timeout the abort surfaces as TimeoutError (from AbortSignal.timeout) or AbortError. + // No other abort source exists on this path, so map either to our user-facing error. + const name = (err as { name?: string } | undefined)?.name; - try { - response = await modelWithTools.invoke(preparedMessages, { - signal: controller.signal, - }); - } catch (err) { - if (controller.signal.aborted) throw new AiInvokeTimeoutError(aiTimeoutMs); - throw err; - } finally { - clearTimeout(timer); + if (timeoutEnabled && (name === 'TimeoutError' || name === 'AbortError')) { + throw new AiInvokeTimeoutError(aiTimeoutMs as number); } + + throw err; } const toolCall = response.tool_calls?.[0]; diff --git a/packages/workflow-executor/test/executors/base-step-executor.test.ts b/packages/workflow-executor/test/executors/base-step-executor.test.ts index 5119696a6b..1c8ef0dd3f 100644 --- a/packages/workflow-executor/test/executors/base-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/base-step-executor.test.ts @@ -857,19 +857,10 @@ describe('BaseStepExecutor', () => { }); describe('AI invoke timeout', () => { - // Mocks a model.invoke that never resolves on its own but rejects with AbortError - // when its received AbortSignal fires — mimics LangChain's behavior on signal.abort(). - function makeHangingModel() { - const invoke = jest.fn().mockImplementation( - (_messages, opts) => - new Promise((_resolve, reject) => { - opts?.signal?.addEventListener('abort', () => { - const err = new Error('Aborted'); - err.name = 'AbortError'; - reject(err); - }); - }), - ); + // Builds a model whose invoke rejects with the given error — mimics LangChain surfacing the + // abort it raises when the `timeout` call option fires (AbortSignal.timeout). + function makeRejectingModel(error: unknown) { + const invoke = jest.fn().mockRejectedValue(error); return { model: { @@ -879,27 +870,29 @@ describe('BaseStepExecutor', () => { }; } - it('throws AiInvokeTimeoutError when model.invoke hangs beyond aiInvokeTimeoutMs', async () => { - jest.useFakeTimers(); - - try { - const { model } = makeHangingModel(); - const executor = new TestableExecutor(makeContext({ model, aiInvokeTimeoutMs: 100 })); - const promise = executor.invokeWithTool(dummyMessages, dummyTool); - // Attach catch synchronously so a late rejection (after advanceTimersByTime) doesn't - // produce an unhandled rejection warning. - const caught = promise.catch(err => err); - jest.advanceTimersByTime(150); - const err = await caught; - - expect(err).toBeInstanceOf(AiInvokeTimeoutError); - expect((err as Error).message).toContain('100ms'); - } finally { - jest.useRealTimers(); - } + it('maps a TimeoutError from invoke to AiInvokeTimeoutError', async () => { + const timeoutErr = Object.assign(new Error('Aborted'), { name: 'TimeoutError' }); + const { model } = makeRejectingModel(timeoutErr); + const executor = new TestableExecutor(makeContext({ model, aiInvokeTimeoutMs: 100 })); + + const err = await executor.invokeWithTool(dummyMessages, dummyTool).catch(e => e); + + expect(err).toBeInstanceOf(AiInvokeTimeoutError); + expect((err as Error).message).toContain('100ms'); + }); + + it('maps an AbortError from invoke to AiInvokeTimeoutError', async () => { + const abortErr = Object.assign(new Error('Aborted'), { name: 'AbortError' }); + const { model } = makeRejectingModel(abortErr); + const executor = new TestableExecutor(makeContext({ model, aiInvokeTimeoutMs: 100 })); + + const err = await executor.invokeWithTool(dummyMessages, dummyTool).catch(e => e); + + expect(err).toBeInstanceOf(AiInvokeTimeoutError); + expect((err as Error).message).toContain('100ms'); }); - it('passes the AbortSignal as the second arg to model.invoke', async () => { + it('passes { timeout: aiInvokeTimeoutMs } as the second arg to model.invoke', async () => { const { model, invoke } = makeMockModel({ tool_calls: [{ name: 'tool', args: {}, id: 'c1' }], }); @@ -907,10 +900,7 @@ describe('BaseStepExecutor', () => { await executor.invokeWithTool(dummyMessages, dummyTool); - expect(invoke).toHaveBeenCalledWith( - expect.any(Array), - expect.objectContaining({ signal: expect.any(AbortSignal) }), - ); + expect(invoke).toHaveBeenCalledWith(expect.any(Array), { timeout: 5_000 }); }); it('does not pass any options to model.invoke when aiInvokeTimeoutMs is unset', async () => { @@ -925,42 +915,31 @@ describe('BaseStepExecutor', () => { expect(invoke.mock.calls[0]).toHaveLength(1); }); - it('treats aiInvokeTimeoutMs <= 0 as disabled', async () => { - const { model, invoke } = makeMockModel({ - tool_calls: [{ name: 'tool', args: {}, id: 'c1' }], - }); + it('treats aiInvokeTimeoutMs <= 0 as disabled (no options, abort not mapped)', async () => { + const abortErr = Object.assign(new Error('Aborted'), { name: 'AbortError' }); + const invoke = jest + .fn() + .mockResolvedValueOnce({ tool_calls: [{ name: 'tool', args: {}, id: 'c1' }] }) + .mockRejectedValueOnce(abortErr); + const model = { + bindTools: jest.fn().mockReturnValue({ invoke }), + } as unknown as ExecutionContext['model']; const executor = new TestableExecutor(makeContext({ model, aiInvokeTimeoutMs: 0 })); await executor.invokeWithTool(dummyMessages, dummyTool); - expect(invoke.mock.calls[0]).toHaveLength(1); + + // With the timeout disabled, an abort is not ours to translate — it bubbles up untouched. + await expect(executor.invokeWithTool(dummyMessages, dummyTool)).rejects.toBe(abortErr); }); it('rethrows non-abort errors without wrapping them', async () => { const apiError = Object.assign(new Error('OpenAI 503'), { status: 503, name: 'APIError' }); - const invoke = jest.fn().mockRejectedValue(apiError); - const model = { - bindTools: jest.fn().mockReturnValue({ invoke }), - } as unknown as ExecutionContext['model']; + const { model } = makeRejectingModel(apiError); const executor = new TestableExecutor(makeContext({ model, aiInvokeTimeoutMs: 5_000 })); await expect(executor.invokeWithTool(dummyMessages, dummyTool)).rejects.toBe(apiError); }); - - it('clears the timer after a successful invoke (no unref leak)', async () => { - const { model } = makeMockModel({ - tool_calls: [{ name: 'tool', args: {}, id: 'c1' }], - }); - const clearSpy = jest.spyOn(global, 'clearTimeout'); - const executor = new TestableExecutor(makeContext({ model, aiInvokeTimeoutMs: 5_000 })); - - try { - await executor.invokeWithTool(dummyMessages, dummyTool); - expect(clearSpy).toHaveBeenCalled(); - } finally { - clearSpy.mockRestore(); - } - }); }); }); From de2ca047098b79e690e26367f35af8d80dfcee57 Mon Sep 17 00:00:00 2001 From: alban bertolini Date: Wed, 3 Jun 2026 12:53:34 +0200 Subject: [PATCH 3/5] refactor(workflow-executor): detect AI invoke timeout via our own AbortSignal MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Instead of guessing the thrown error's name (providers wrap an aborted request differently — AbortError, TimeoutError, APIUserAbortError, APIConnectionTimeoutError…), pass an AbortSignal.timeout we own and detect the timeout via signal.aborted. Provider-agnostic, and LangChain forwards the signal so a hanging provider is genuinely cancelled. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../src/executors/base-step-executor.ts | 18 ++---- .../test/executors/base-step-executor.test.ts | 63 +++++++++++++------ 2 files changed, 47 insertions(+), 34 deletions(-) diff --git a/packages/workflow-executor/src/executors/base-step-executor.ts b/packages/workflow-executor/src/executors/base-step-executor.ts index 594b5a039d..5dcff8d506 100644 --- a/packages/workflow-executor/src/executors/base-step-executor.ts +++ b/packages/workflow-executor/src/executors/base-step-executor.ts @@ -332,25 +332,15 @@ export default abstract class BaseStepExecutor 0); + const signal = timeoutEnabled ? AbortSignal.timeout(aiTimeoutMs as number) : undefined; let response; try { - // LangChain turns the `timeout` call option into an AbortSignal (AbortSignal.timeout) and - // forwards it down to the underlying HTTP request, so a hanging provider is actually - // cancelled — not merely raced. 0/undefined leaves the call un-timed. - response = timeoutEnabled - ? await modelWithTools.invoke(preparedMessages, { timeout: aiTimeoutMs }) - : await modelWithTools.invoke(preparedMessages); + // `signal: undefined` is equivalent to passing no options — LangChain leaves the call un-timed. + response = await modelWithTools.invoke(preparedMessages, { signal }); } catch (err) { - // On timeout the abort surfaces as TimeoutError (from AbortSignal.timeout) or AbortError. - // No other abort source exists on this path, so map either to our user-facing error. - const name = (err as { name?: string } | undefined)?.name; - - if (timeoutEnabled && (name === 'TimeoutError' || name === 'AbortError')) { - throw new AiInvokeTimeoutError(aiTimeoutMs as number); - } - + if (signal?.aborted) throw new AiInvokeTimeoutError(aiTimeoutMs as number); throw err; } diff --git a/packages/workflow-executor/test/executors/base-step-executor.test.ts b/packages/workflow-executor/test/executors/base-step-executor.test.ts index 1c8ef0dd3f..acfa7cbc73 100644 --- a/packages/workflow-executor/test/executors/base-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/base-step-executor.test.ts @@ -821,7 +821,7 @@ describe('BaseStepExecutor', () => { await executor.invokeWithTool(messages, dummyTool); - expect(invoke).toHaveBeenCalledWith(messages); + expect(invoke).toHaveBeenCalledWith(messages, { signal: undefined }); }); it('leaves messages unchanged when there is no SystemMessage', async () => { @@ -833,7 +833,7 @@ describe('BaseStepExecutor', () => { await executor.invokeWithTool(messages, dummyTool); - expect(invoke).toHaveBeenCalledWith(messages); + expect(invoke).toHaveBeenCalledWith(messages, { signal: undefined }); }); it('throws when a SystemMessage appears after a non-system message', async () => { @@ -857,8 +857,7 @@ describe('BaseStepExecutor', () => { }); describe('AI invoke timeout', () => { - // Builds a model whose invoke rejects with the given error — mimics LangChain surfacing the - // abort it raises when the `timeout` call option fires (AbortSignal.timeout). + // Model whose invoke rejects immediately with the given error (no abort involved). function makeRejectingModel(error: unknown) { const invoke = jest.fn().mockRejectedValue(error); @@ -870,29 +869,51 @@ describe('BaseStepExecutor', () => { }; } - it('maps a TimeoutError from invoke to AiInvokeTimeoutError', async () => { - const timeoutErr = Object.assign(new Error('Aborted'), { name: 'TimeoutError' }); - const { model } = makeRejectingModel(timeoutErr); - const executor = new TestableExecutor(makeContext({ model, aiInvokeTimeoutMs: 100 })); + // Model whose invoke hangs until the call's AbortSignal fires, then rejects — like a provider + // SDK forwarding an aborted request. `rejectWith` simulates the provider wrapping the abort + // under its own error type; defaults to the signal's reason (our AiInvokeTimeoutError). + function makeAbortAwareModel(rejectWith?: unknown) { + const invoke = jest.fn().mockImplementation( + (_messages, options?: { signal?: AbortSignal }) => + new Promise((_resolve, reject) => { + const { signal } = options ?? {}; + signal?.addEventListener('abort', () => reject(rejectWith ?? signal.reason)); + }), + ); + + return { + model: { + bindTools: jest.fn().mockReturnValue({ invoke }), + } as unknown as ExecutionContext['model'], + invoke, + }; + } + + it('throws AiInvokeTimeoutError when the model invoke hangs past aiInvokeTimeoutMs', async () => { + const { model } = makeAbortAwareModel(); + const executor = new TestableExecutor(makeContext({ model, aiInvokeTimeoutMs: 20 })); const err = await executor.invokeWithTool(dummyMessages, dummyTool).catch(e => e); expect(err).toBeInstanceOf(AiInvokeTimeoutError); - expect((err as Error).message).toContain('100ms'); + expect((err as Error).message).toContain('20ms'); }); - it('maps an AbortError from invoke to AiInvokeTimeoutError', async () => { - const abortErr = Object.assign(new Error('Aborted'), { name: 'AbortError' }); - const { model } = makeRejectingModel(abortErr); - const executor = new TestableExecutor(makeContext({ model, aiInvokeTimeoutMs: 100 })); + it('maps to AiInvokeTimeoutError even when the provider wraps the abort under its own error name', async () => { + // e.g. Anthropic surfaces an aborted request as APIUserAbortError, not AbortError/TimeoutError — + // detection is by signal.aborted, not by the thrown error's name. + const providerErr = Object.assign(new Error('Request was aborted.'), { + name: 'APIUserAbortError', + }); + const { model } = makeAbortAwareModel(providerErr); + const executor = new TestableExecutor(makeContext({ model, aiInvokeTimeoutMs: 20 })); const err = await executor.invokeWithTool(dummyMessages, dummyTool).catch(e => e); expect(err).toBeInstanceOf(AiInvokeTimeoutError); - expect((err as Error).message).toContain('100ms'); }); - it('passes { timeout: aiInvokeTimeoutMs } as the second arg to model.invoke', async () => { + it('passes an AbortSignal as the second arg to model.invoke', async () => { const { model, invoke } = makeMockModel({ tool_calls: [{ name: 'tool', args: {}, id: 'c1' }], }); @@ -900,10 +921,12 @@ describe('BaseStepExecutor', () => { await executor.invokeWithTool(dummyMessages, dummyTool); - expect(invoke).toHaveBeenCalledWith(expect.any(Array), { timeout: 5_000 }); + expect(invoke).toHaveBeenCalledWith(expect.any(Array), { + signal: expect.any(AbortSignal), + }); }); - it('does not pass any options to model.invoke when aiInvokeTimeoutMs is unset', async () => { + it('passes an undefined signal (un-timed) when aiInvokeTimeoutMs is unset', async () => { const { model, invoke } = makeMockModel({ tool_calls: [{ name: 'tool', args: {}, id: 'c1' }], }); @@ -912,10 +935,10 @@ describe('BaseStepExecutor', () => { await executor.invokeWithTool(dummyMessages, dummyTool); expect(invoke).toHaveBeenCalledTimes(1); - expect(invoke.mock.calls[0]).toHaveLength(1); + expect(invoke).toHaveBeenCalledWith(expect.any(Array), { signal: undefined }); }); - it('treats aiInvokeTimeoutMs <= 0 as disabled (no options, abort not mapped)', async () => { + it('treats aiInvokeTimeoutMs <= 0 as disabled (signal undefined, abort not mapped)', async () => { const abortErr = Object.assign(new Error('Aborted'), { name: 'AbortError' }); const invoke = jest .fn() @@ -927,7 +950,7 @@ describe('BaseStepExecutor', () => { const executor = new TestableExecutor(makeContext({ model, aiInvokeTimeoutMs: 0 })); await executor.invokeWithTool(dummyMessages, dummyTool); - expect(invoke.mock.calls[0]).toHaveLength(1); + expect(invoke.mock.calls[0]).toEqual([expect.any(Array), { signal: undefined }]); // With the timeout disabled, an abort is not ours to translate — it bubbles up untouched. await expect(executor.invokeWithTool(dummyMessages, dummyTool)).rejects.toBe(abortErr); From ad37d16e00364ec808c8e750b5662d36bb4be27c Mon Sep 17 00:00:00 2001 From: alban bertolini Date: Wed, 3 Jun 2026 13:11:17 +0200 Subject: [PATCH 4/5] fix(workflow-executor): bad timeout config falls back to default, never disables - build-workflow-executor: clamp stepTimeoutMs/aiInvokeTimeoutMs to a positive value or the default (`?? default` only caught null/undefined, so a 0/negative programmatic value silently disabled the timeout) - base-step-executor: drop the `as number` casts via inline narrowing; add a WHY comment on the signal.aborted timeout detection - tests: build clamps non-positive timeouts to default; AI invoke timeout now surfaces end-to-end through execute() as an error outcome with its userMessage Co-Authored-By: Claude Opus 4.8 (1M context) --- .../src/build-workflow-executor.ts | 10 +++++++-- .../src/executors/base-step-executor.ts | 8 ++++--- .../test/build-workflow-executor.test.ts | 17 +++++++++++++++ .../test/executors/base-step-executor.test.ts | 21 +++++++++++++++++++ 4 files changed, 51 insertions(+), 5 deletions(-) diff --git a/packages/workflow-executor/src/build-workflow-executor.ts b/packages/workflow-executor/src/build-workflow-executor.ts index 65976bd3d7..eb4edfca5d 100644 --- a/packages/workflow-executor/src/build-workflow-executor.ts +++ b/packages/workflow-executor/src/build-workflow-executor.ts @@ -54,6 +54,12 @@ export interface ExecutorOptions { export type DatabaseExecutorOptions = ExecutorOptions & ({ database: SequelizeOptions & { uri: string } } | { database: SequelizeOptions }); +// A bad timeout config (0, negative, non-finite) must fall back to the default rather than +// silently disabling the timeout — `?? default` only catches null/undefined, not 0/negative. +function positiveOrDefault(value: number | undefined, fallback: number): number { + return typeof value === 'number' && value > 0 ? value : fallback; +} + function buildCommonDependencies(options: ExecutorOptions) { const forestServerUrl = options.forestServerUrl ?? DEFAULT_FOREST_SERVER_URL; const logger = options.logger ?? new ConsoleLogger(); @@ -113,8 +119,8 @@ function buildCommonDependencies(options: ExecutorOptions) { envSecret: options.envSecret, authSecret: options.authSecret, stopTimeoutMs: options.stopTimeoutMs, - stepTimeoutMs: options.stepTimeoutMs ?? DEFAULT_STEP_TIMEOUT_MS, - aiInvokeTimeoutMs: options.aiInvokeTimeoutMs ?? DEFAULT_AI_INVOKE_TIMEOUT_MS, + stepTimeoutMs: positiveOrDefault(options.stepTimeoutMs, DEFAULT_STEP_TIMEOUT_MS), + aiInvokeTimeoutMs: positiveOrDefault(options.aiInvokeTimeoutMs, DEFAULT_AI_INVOKE_TIMEOUT_MS), maxChainDepth: options.maxChainDepth, }; } diff --git a/packages/workflow-executor/src/executors/base-step-executor.ts b/packages/workflow-executor/src/executors/base-step-executor.ts index 5dcff8d506..3acf683b3b 100644 --- a/packages/workflow-executor/src/executors/base-step-executor.ts +++ b/packages/workflow-executor/src/executors/base-step-executor.ts @@ -331,8 +331,8 @@ export default abstract class BaseStepExecutor 0); - const signal = timeoutEnabled ? AbortSignal.timeout(aiTimeoutMs as number) : undefined; + const timeoutMs = aiTimeoutMs && aiTimeoutMs > 0 ? aiTimeoutMs : undefined; + const signal = timeoutMs ? AbortSignal.timeout(timeoutMs) : undefined; let response; @@ -340,7 +340,9 @@ export default abstract class BaseStepExecutor { expect(MockedRunner).toHaveBeenCalledWith(expect.objectContaining({ stepTimeoutMs: 30_000 })); }); + it('falls back to the default timeouts when a non-positive value is configured', () => { + buildInMemoryExecutor({ ...BASE_OPTIONS, stepTimeoutMs: 0, aiInvokeTimeoutMs: -1 }); + + // A bad config must never silently disable the timeout — it falls back to the default. + expect(MockedRunner).toHaveBeenCalledWith( + expect.objectContaining({ stepTimeoutMs: 5 * 60_000, aiInvokeTimeoutMs: 30_000 }), + ); + }); + + it('applies the 30s default when aiInvokeTimeoutMs is not configured', () => { + buildInMemoryExecutor(BASE_OPTIONS); + + expect(MockedRunner).toHaveBeenCalledWith( + expect.objectContaining({ aiInvokeTimeoutMs: 30_000 }), + ); + }); + it('passes secrets to Runner config', () => { buildInMemoryExecutor(BASE_OPTIONS); diff --git a/packages/workflow-executor/test/executors/base-step-executor.test.ts b/packages/workflow-executor/test/executors/base-step-executor.test.ts index acfa7cbc73..d9e3da6efc 100644 --- a/packages/workflow-executor/test/executors/base-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/base-step-executor.test.ts @@ -963,6 +963,27 @@ describe('BaseStepExecutor', () => { await expect(executor.invokeWithTool(dummyMessages, dummyTool)).rejects.toBe(apiError); }); + + // End-to-end: a timed-out AI invoke must surface through execute() as an error outcome + // carrying the AI-specific userMessage (distinct from the step-timeout message). + it('surfaces a timed-out AI invoke as an error outcome with the AI-specific userMessage', async () => { + class InvokingExecutor extends TestableExecutor { + protected override async doExecute(): Promise { + await this.invokeWithTool(dummyMessages, dummyTool); + + return this.buildOutcomeResult({ status: 'success' }); + } + } + const { model } = makeAbortAwareModel(); + const executor = new InvokingExecutor(makeContext({ model, aiInvokeTimeoutMs: 20 })); + + const result = await executor.execute(); + + expect(result.stepOutcome.status).toBe('error'); + expect(result.stepOutcome.error).toBe( + 'The AI provider did not respond in time. Please try again, or contact your administrator if the problem persists.', + ); + }); }); }); From 7e7b418d52689b06f3eb32ffcb4418e874401073 Mon Sep 17 00:00:00 2001 From: alban bertolini Date: Wed, 3 Jun 2026 13:20:21 +0200 Subject: [PATCH 5/5] fix(workflow-executor): treat non-finite timeout config as bad (fall back to default) positiveOrDefault accepted Infinity (typeof Infinity === 'number' && Infinity > 0), which disabled the timeout instead of using the default. Guard with Number.isFinite. Co-Authored-By: Claude Opus 4.8 (1M context) --- .../workflow-executor/src/build-workflow-executor.ts | 2 +- .../test/build-workflow-executor.test.ts | 10 +++++++--- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/packages/workflow-executor/src/build-workflow-executor.ts b/packages/workflow-executor/src/build-workflow-executor.ts index eb4edfca5d..dbb6f64cf5 100644 --- a/packages/workflow-executor/src/build-workflow-executor.ts +++ b/packages/workflow-executor/src/build-workflow-executor.ts @@ -57,7 +57,7 @@ export type DatabaseExecutorOptions = ExecutorOptions & // A bad timeout config (0, negative, non-finite) must fall back to the default rather than // silently disabling the timeout — `?? default` only catches null/undefined, not 0/negative. function positiveOrDefault(value: number | undefined, fallback: number): number { - return typeof value === 'number' && value > 0 ? value : fallback; + return typeof value === 'number' && Number.isFinite(value) && value > 0 ? value : fallback; } function buildCommonDependencies(options: ExecutorOptions) { diff --git a/packages/workflow-executor/test/build-workflow-executor.test.ts b/packages/workflow-executor/test/build-workflow-executor.test.ts index 4fefce03cb..31459263df 100644 --- a/packages/workflow-executor/test/build-workflow-executor.test.ts +++ b/packages/workflow-executor/test/build-workflow-executor.test.ts @@ -176,10 +176,14 @@ describe('buildInMemoryExecutor', () => { expect(MockedRunner).toHaveBeenCalledWith(expect.objectContaining({ stepTimeoutMs: 30_000 })); }); - it('falls back to the default timeouts when a non-positive value is configured', () => { - buildInMemoryExecutor({ ...BASE_OPTIONS, stepTimeoutMs: 0, aiInvokeTimeoutMs: -1 }); + it('falls back to the default timeouts for non-positive or non-finite values', () => { + buildInMemoryExecutor({ + ...BASE_OPTIONS, + stepTimeoutMs: 0, + aiInvokeTimeoutMs: Number.POSITIVE_INFINITY, + }); - // A bad config must never silently disable the timeout — it falls back to the default. + // A bad config must never silently disable the timeout — 0/negative/Infinity fall back. expect(MockedRunner).toHaveBeenCalledWith( expect.objectContaining({ stepTimeoutMs: 5 * 60_000, aiInvokeTimeoutMs: 30_000 }), );