diff --git a/packages/workflow-executor/src/build-workflow-executor.ts b/packages/workflow-executor/src/build-workflow-executor.ts index 8087d22d0d..dbb6f64cf5 100644 --- a/packages/workflow-executor/src/build-workflow-executor.ts +++ b/packages/workflow-executor/src/build-workflow-executor.ts @@ -14,6 +14,7 @@ import ForestServerWorkflowPort from './adapters/forest-server-workflow-port'; import ForestadminClientActivityLogPortFactory from './adapters/forestadmin-client-activity-log-port-factory'; import ServerAiAdapter from './adapters/server-ai-adapter'; import { + DEFAULT_AI_INVOKE_TIMEOUT_MS, DEFAULT_FOREST_SERVER_URL, DEFAULT_POLLING_INTERVAL_MS, DEFAULT_STEP_TIMEOUT_MS, @@ -43,6 +44,7 @@ export interface ExecutorOptions { logger?: Logger; stopTimeoutMs?: number; stepTimeoutMs?: number; + aiInvokeTimeoutMs?: number; // Max auto-chained steps per entry (see RunnerConfig.maxChainDepth). 0 disables chaining. maxChainDepth?: number; // Dev only: makes every AI call fail immediately so error paths can be exercised locally. @@ -52,6 +54,12 @@ export interface ExecutorOptions { export type DatabaseExecutorOptions = ExecutorOptions & ({ database: SequelizeOptions & { uri: string } } | { database: SequelizeOptions }); +// A bad timeout config (0, negative, non-finite) must fall back to the default rather than +// silently disabling the timeout — `?? default` only catches null/undefined, not 0/negative. +function positiveOrDefault(value: number | undefined, fallback: number): number { + return typeof value === 'number' && Number.isFinite(value) && value > 0 ? value : fallback; +} + function buildCommonDependencies(options: ExecutorOptions) { const forestServerUrl = options.forestServerUrl ?? DEFAULT_FOREST_SERVER_URL; const logger = options.logger ?? new ConsoleLogger(); @@ -111,7 +119,8 @@ function buildCommonDependencies(options: ExecutorOptions) { envSecret: options.envSecret, authSecret: options.authSecret, stopTimeoutMs: options.stopTimeoutMs, - stepTimeoutMs: options.stepTimeoutMs ?? DEFAULT_STEP_TIMEOUT_MS, + stepTimeoutMs: positiveOrDefault(options.stepTimeoutMs, DEFAULT_STEP_TIMEOUT_MS), + aiInvokeTimeoutMs: positiveOrDefault(options.aiInvokeTimeoutMs, DEFAULT_AI_INVOKE_TIMEOUT_MS), maxChainDepth: options.maxChainDepth, }; } diff --git a/packages/workflow-executor/src/cli-core.ts b/packages/workflow-executor/src/cli-core.ts index 1c683af027..88e1458ff5 100644 --- a/packages/workflow-executor/src/cli-core.ts +++ b/packages/workflow-executor/src/cli-core.ts @@ -13,6 +13,7 @@ import { type WorkflowExecutor, } from './build-workflow-executor'; import { + DEFAULT_AI_INVOKE_TIMEOUT_MS, DEFAULT_FOREST_SERVER_URL, DEFAULT_HTTP_PORT, DEFAULT_MAX_CHAIN_DEPTH, @@ -158,6 +159,7 @@ export function readEnvConfig(env: NodeJS.ProcessEnv, args: CliArgs): CliConfig pollingIntervalMs: parsePositiveIntEnv('POLLING_INTERVAL_MS', env.POLLING_INTERVAL_MS), stopTimeoutMs: parsePositiveIntEnv('STOP_TIMEOUT_MS', env.STOP_TIMEOUT_MS), stepTimeoutMs: parsePositiveIntEnv('STEP_TIMEOUT_MS', env.STEP_TIMEOUT_MS), + aiInvokeTimeoutMs: parsePositiveIntEnv('AI_INVOKE_TIMEOUT_MS', env.AI_INVOKE_TIMEOUT_MS), maxChainDepth: parsePositiveIntEnv('MAX_CHAIN_DEPTH', env.MAX_CHAIN_DEPTH), ...(aiConfigurations && { aiConfigurations }), ...(env.FORCE_AI_ERROR === 'true' && { forceAiError: true }), @@ -194,6 +196,7 @@ Optional environment variables: POLLING_INTERVAL_MS Default: ${DEFAULT_POLLING_INTERVAL_MS} STOP_TIMEOUT_MS Default: ${DEFAULT_STOP_TIMEOUT_MS} STEP_TIMEOUT_MS Max duration of a step in ms (default: ${DEFAULT_STEP_TIMEOUT_MS}) + AI_INVOKE_TIMEOUT_MS Max duration of a single AI provider invocation in ms (default: ${DEFAULT_AI_INVOKE_TIMEOUT_MS}) MAX_CHAIN_DEPTH Max steps auto-executed per run before yielding (default: ${DEFAULT_MAX_CHAIN_DEPTH}) NO_COLOR Set to any value to disable ANSI colors in pretty logs FORCE_AI_ERROR Set to "true" to make every AI call fail (dev only, to test error paths) diff --git a/packages/workflow-executor/src/defaults.ts b/packages/workflow-executor/src/defaults.ts index b03e4877e4..746077301f 100644 --- a/packages/workflow-executor/src/defaults.ts +++ b/packages/workflow-executor/src/defaults.ts @@ -2,5 +2,6 @@ export const DEFAULT_HTTP_PORT = 3400; export const DEFAULT_FOREST_SERVER_URL = 'https://api.forestadmin.com'; export const DEFAULT_POLLING_INTERVAL_MS = 30_000; export const DEFAULT_STEP_TIMEOUT_MS = 5 * 60_000; +export const DEFAULT_AI_INVOKE_TIMEOUT_MS = 30_000; export const DEFAULT_STOP_TIMEOUT_MS = 30_000; export const DEFAULT_MAX_CHAIN_DEPTH = 50; diff --git a/packages/workflow-executor/src/errors.ts b/packages/workflow-executor/src/errors.ts index 4a8038a610..8ac3e6ee1c 100644 --- a/packages/workflow-executor/src/errors.ts +++ b/packages/workflow-executor/src/errors.ts @@ -209,6 +209,18 @@ export class StepTimeoutError extends WorkflowExecutorError { } } +// Thrown when the AI provider does not respond within the configured timeout — distinct from +// StepTimeoutError so we can surface a provider-specific message and tune the AI timeout +// independently of the step timeout (AI hangs are common; record fetches are not). +export class AiInvokeTimeoutError extends WorkflowExecutorError { + constructor(timeoutMs: number) { + super( + `AI provider did not respond within ${timeoutMs}ms`, + 'The AI provider did not respond in time. Please try again, or contact your administrator if the problem persists.', + ); + } +} + export class NoMcpToolsError extends WorkflowExecutorError { constructor(requestedMcpServerId?: string, loadedMcpServerIds?: readonly string[]) { const technical = requestedMcpServerId diff --git a/packages/workflow-executor/src/executors/base-step-executor.ts b/packages/workflow-executor/src/executors/base-step-executor.ts index a7d76228c2..3acf683b3b 100644 --- a/packages/workflow-executor/src/executors/base-step-executor.ts +++ b/packages/workflow-executor/src/executors/base-step-executor.ts @@ -17,6 +17,7 @@ import type { import { SystemMessage } from '@forestadmin/ai-proxy'; import { + AiInvokeTimeoutError, InvalidAiRequestError, MalformedToolCallError, MissingToolCallError, @@ -328,9 +329,23 @@ export default abstract class BaseStepExecutor { BaseStepExecutor.assertNoMidArraySystemMessages(messages); const modelWithTools = this.context.model.bindTools(tools, { tool_choice: 'any' }); - const response = await modelWithTools.invoke( - BaseStepExecutor.mergeLeadingSystemMessages(messages), - ); + const preparedMessages = BaseStepExecutor.mergeLeadingSystemMessages(messages); + const aiTimeoutMs = this.context.aiInvokeTimeoutMs; + const timeoutMs = aiTimeoutMs && aiTimeoutMs > 0 ? aiTimeoutMs : undefined; + const signal = timeoutMs ? AbortSignal.timeout(timeoutMs) : undefined; + + let response; + + try { + // `signal: undefined` is equivalent to passing no options — LangChain leaves the call un-timed. + response = await modelWithTools.invoke(preparedMessages, { signal }); + } catch (err) { + // Detect the timeout via our own signal, not the thrown error's name: providers wrap an + // aborted request differently (AbortError, TimeoutError, APIUserAbortError, …). + if (timeoutMs !== undefined && signal?.aborted) throw new AiInvokeTimeoutError(timeoutMs); + throw err; + } + const toolCall = response.tool_calls?.[0]; if (toolCall !== undefined) { diff --git a/packages/workflow-executor/src/executors/step-executor-factory.ts b/packages/workflow-executor/src/executors/step-executor-factory.ts index 736b633215..fa7c24a649 100644 --- a/packages/workflow-executor/src/executors/step-executor-factory.ts +++ b/packages/workflow-executor/src/executors/step-executor-factory.ts @@ -41,6 +41,7 @@ export interface StepContextConfig { schemaCache: SchemaCache; logger: Logger; stepTimeoutMs?: number; + aiInvokeTimeoutMs?: number; } export default class StepExecutorFactory { @@ -135,6 +136,7 @@ export default class StepExecutorFactory { logger: cfg.logger, incomingPendingData, stepTimeoutMs: cfg.stepTimeoutMs, + aiInvokeTimeoutMs: cfg.aiInvokeTimeoutMs, activityLogPort, }; } diff --git a/packages/workflow-executor/src/runner.ts b/packages/workflow-executor/src/runner.ts index ba8e423e23..86ee4178b9 100644 --- a/packages/workflow-executor/src/runner.ts +++ b/packages/workflow-executor/src/runner.ts @@ -42,6 +42,10 @@ export interface RunnerConfig { // On timeout the step reports status:error; the underlying work is not aborted (Promise.race // limitation). Late rejections are caught and logged; late resolutions are silently discarded. stepTimeoutMs?: number; + // Per-AI-invocation timeout (used by BaseStepExecutor.invokeWithTools). Aborts the underlying + // HTTP request via AbortSignal so a hanging provider is killed quickly, before stepTimeoutMs + // would fire. 0/undefined disables. + aiInvokeTimeoutMs?: number; // Max number of ADDITIONAL steps auto-chained via /update-step response before yielding to the // next poll cycle (counted after the initial step). 0 disables chaining entirely. Default 50. maxChainDepth?: number; @@ -416,6 +420,7 @@ export default class Runner { schemaCache: this.config.schemaCache, logger: this.logger, stepTimeoutMs: this.config.stepTimeoutMs, + aiInvokeTimeoutMs: this.config.aiInvokeTimeoutMs, }; } } diff --git a/packages/workflow-executor/src/types/execution-context.ts b/packages/workflow-executor/src/types/execution-context.ts index 94ea4b3a6d..4a1ca06128 100644 --- a/packages/workflow-executor/src/types/execution-context.ts +++ b/packages/workflow-executor/src/types/execution-context.ts @@ -41,5 +41,6 @@ export interface ExecutionContext readonly logger: Logger; readonly incomingPendingData?: unknown; readonly stepTimeoutMs?: number; + readonly aiInvokeTimeoutMs?: number; readonly activityLogPort: ActivityLogPort; } diff --git a/packages/workflow-executor/test/build-workflow-executor.test.ts b/packages/workflow-executor/test/build-workflow-executor.test.ts index a3206aa002..31459263df 100644 --- a/packages/workflow-executor/test/build-workflow-executor.test.ts +++ b/packages/workflow-executor/test/build-workflow-executor.test.ts @@ -176,6 +176,27 @@ describe('buildInMemoryExecutor', () => { expect(MockedRunner).toHaveBeenCalledWith(expect.objectContaining({ stepTimeoutMs: 30_000 })); }); + it('falls back to the default timeouts for non-positive or non-finite values', () => { + buildInMemoryExecutor({ + ...BASE_OPTIONS, + stepTimeoutMs: 0, + aiInvokeTimeoutMs: Number.POSITIVE_INFINITY, + }); + + // A bad config must never silently disable the timeout — 0/negative/Infinity fall back. + expect(MockedRunner).toHaveBeenCalledWith( + expect.objectContaining({ stepTimeoutMs: 5 * 60_000, aiInvokeTimeoutMs: 30_000 }), + ); + }); + + it('applies the 30s default when aiInvokeTimeoutMs is not configured', () => { + buildInMemoryExecutor(BASE_OPTIONS); + + expect(MockedRunner).toHaveBeenCalledWith( + expect.objectContaining({ aiInvokeTimeoutMs: 30_000 }), + ); + }); + it('passes secrets to Runner config', () => { buildInMemoryExecutor(BASE_OPTIONS); diff --git a/packages/workflow-executor/test/executors/base-step-executor.test.ts b/packages/workflow-executor/test/executors/base-step-executor.test.ts index 035b277d2b..d9e3da6efc 100644 --- a/packages/workflow-executor/test/executors/base-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/base-step-executor.test.ts @@ -11,6 +11,7 @@ import type { BaseMessage, DynamicStructuredTool } from '@forestadmin/ai-proxy'; import { HumanMessage, SystemMessage } from '@forestadmin/ai-proxy'; import { + AiInvokeTimeoutError, InvalidAiRequestError, MalformedToolCallError, MissingToolCallError, @@ -820,7 +821,7 @@ describe('BaseStepExecutor', () => { await executor.invokeWithTool(messages, dummyTool); - expect(invoke).toHaveBeenCalledWith(messages); + expect(invoke).toHaveBeenCalledWith(messages, { signal: undefined }); }); it('leaves messages unchanged when there is no SystemMessage', async () => { @@ -832,7 +833,7 @@ describe('BaseStepExecutor', () => { await executor.invokeWithTool(messages, dummyTool); - expect(invoke).toHaveBeenCalledWith(messages); + expect(invoke).toHaveBeenCalledWith(messages, { signal: undefined }); }); it('throws when a SystemMessage appears after a non-system message', async () => { @@ -854,6 +855,136 @@ describe('BaseStepExecutor', () => { ); }); }); + + describe('AI invoke timeout', () => { + // Model whose invoke rejects immediately with the given error (no abort involved). + function makeRejectingModel(error: unknown) { + const invoke = jest.fn().mockRejectedValue(error); + + return { + model: { + bindTools: jest.fn().mockReturnValue({ invoke }), + } as unknown as ExecutionContext['model'], + invoke, + }; + } + + // Model whose invoke hangs until the call's AbortSignal fires, then rejects — like a provider + // SDK forwarding an aborted request. `rejectWith` simulates the provider wrapping the abort + // under its own error type; defaults to the signal's reason (our AiInvokeTimeoutError). + function makeAbortAwareModel(rejectWith?: unknown) { + const invoke = jest.fn().mockImplementation( + (_messages, options?: { signal?: AbortSignal }) => + new Promise((_resolve, reject) => { + const { signal } = options ?? {}; + signal?.addEventListener('abort', () => reject(rejectWith ?? signal.reason)); + }), + ); + + return { + model: { + bindTools: jest.fn().mockReturnValue({ invoke }), + } as unknown as ExecutionContext['model'], + invoke, + }; + } + + it('throws AiInvokeTimeoutError when the model invoke hangs past aiInvokeTimeoutMs', async () => { + const { model } = makeAbortAwareModel(); + const executor = new TestableExecutor(makeContext({ model, aiInvokeTimeoutMs: 20 })); + + const err = await executor.invokeWithTool(dummyMessages, dummyTool).catch(e => e); + + expect(err).toBeInstanceOf(AiInvokeTimeoutError); + expect((err as Error).message).toContain('20ms'); + }); + + it('maps to AiInvokeTimeoutError even when the provider wraps the abort under its own error name', async () => { + // e.g. Anthropic surfaces an aborted request as APIUserAbortError, not AbortError/TimeoutError — + // detection is by signal.aborted, not by the thrown error's name. + const providerErr = Object.assign(new Error('Request was aborted.'), { + name: 'APIUserAbortError', + }); + const { model } = makeAbortAwareModel(providerErr); + const executor = new TestableExecutor(makeContext({ model, aiInvokeTimeoutMs: 20 })); + + const err = await executor.invokeWithTool(dummyMessages, dummyTool).catch(e => e); + + expect(err).toBeInstanceOf(AiInvokeTimeoutError); + }); + + it('passes an AbortSignal as the second arg to model.invoke', async () => { + const { model, invoke } = makeMockModel({ + tool_calls: [{ name: 'tool', args: {}, id: 'c1' }], + }); + const executor = new TestableExecutor(makeContext({ model, aiInvokeTimeoutMs: 5_000 })); + + await executor.invokeWithTool(dummyMessages, dummyTool); + + expect(invoke).toHaveBeenCalledWith(expect.any(Array), { + signal: expect.any(AbortSignal), + }); + }); + + it('passes an undefined signal (un-timed) when aiInvokeTimeoutMs is unset', async () => { + const { model, invoke } = makeMockModel({ + tool_calls: [{ name: 'tool', args: {}, id: 'c1' }], + }); + const executor = new TestableExecutor(makeContext({ model, aiInvokeTimeoutMs: undefined })); + + await executor.invokeWithTool(dummyMessages, dummyTool); + + expect(invoke).toHaveBeenCalledTimes(1); + expect(invoke).toHaveBeenCalledWith(expect.any(Array), { signal: undefined }); + }); + + it('treats aiInvokeTimeoutMs <= 0 as disabled (signal undefined, abort not mapped)', async () => { + const abortErr = Object.assign(new Error('Aborted'), { name: 'AbortError' }); + const invoke = jest + .fn() + .mockResolvedValueOnce({ tool_calls: [{ name: 'tool', args: {}, id: 'c1' }] }) + .mockRejectedValueOnce(abortErr); + const model = { + bindTools: jest.fn().mockReturnValue({ invoke }), + } as unknown as ExecutionContext['model']; + const executor = new TestableExecutor(makeContext({ model, aiInvokeTimeoutMs: 0 })); + + await executor.invokeWithTool(dummyMessages, dummyTool); + expect(invoke.mock.calls[0]).toEqual([expect.any(Array), { signal: undefined }]); + + // With the timeout disabled, an abort is not ours to translate — it bubbles up untouched. + await expect(executor.invokeWithTool(dummyMessages, dummyTool)).rejects.toBe(abortErr); + }); + + it('rethrows non-abort errors without wrapping them', async () => { + const apiError = Object.assign(new Error('OpenAI 503'), { status: 503, name: 'APIError' }); + const { model } = makeRejectingModel(apiError); + const executor = new TestableExecutor(makeContext({ model, aiInvokeTimeoutMs: 5_000 })); + + await expect(executor.invokeWithTool(dummyMessages, dummyTool)).rejects.toBe(apiError); + }); + + // End-to-end: a timed-out AI invoke must surface through execute() as an error outcome + // carrying the AI-specific userMessage (distinct from the step-timeout message). + it('surfaces a timed-out AI invoke as an error outcome with the AI-specific userMessage', async () => { + class InvokingExecutor extends TestableExecutor { + protected override async doExecute(): Promise { + await this.invokeWithTool(dummyMessages, dummyTool); + + return this.buildOutcomeResult({ status: 'success' }); + } + } + const { model } = makeAbortAwareModel(); + const executor = new InvokingExecutor(makeContext({ model, aiInvokeTimeoutMs: 20 })); + + const result = await executor.execute(); + + expect(result.stepOutcome.status).toBe('error'); + expect(result.stepOutcome.error).toBe( + 'The AI provider did not respond in time. Please try again, or contact your administrator if the problem persists.', + ); + }); + }); }); describe('patchAndReloadPendingData', () => {