diff --git a/packages/workflow-executor/src/build-workflow-executor.ts b/packages/workflow-executor/src/build-workflow-executor.ts index dbb6f64cf5..c5f44cd43c 100644 --- a/packages/workflow-executor/src/build-workflow-executor.ts +++ b/packages/workflow-executor/src/build-workflow-executor.ts @@ -17,6 +17,7 @@ import { DEFAULT_AI_INVOKE_TIMEOUT_MS, DEFAULT_FOREST_SERVER_URL, DEFAULT_POLLING_INTERVAL_MS, + DEFAULT_SCHEMA_CACHE_TTL_MS, DEFAULT_STEP_TIMEOUT_MS, } from './defaults'; import ExecutorHttpServer from './http/executor-http-server'; @@ -47,6 +48,8 @@ export interface ExecutorOptions { aiInvokeTimeoutMs?: number; // Max auto-chained steps per entry (see RunnerConfig.maxChainDepth). 0 disables chaining. maxChainDepth?: number; + // Collection schema cache TTL in ms. Lower it to pick up orchestrator schema changes sooner. + schemaCacheTtlMs?: number; // Dev only: makes every AI call fail immediately so error paths can be exercised locally. forceAiError?: boolean; } @@ -91,7 +94,10 @@ function buildCommonDependencies(options: ExecutorOptions) { aiModelPort = new ServerAiAdapter({ forestServerUrl, envSecret: options.envSecret }); } - const schemaCache = new SchemaCache(); + // A TTL of 0/negative/non-finite would silently make the cache always-stale, so fall back. + const schemaCache = new SchemaCache( + positiveOrDefault(options.schemaCacheTtlMs, DEFAULT_SCHEMA_CACHE_TTL_MS), + ); const agentPort = new AgentClientAgentPort({ agentUrl: options.agentUrl, diff --git a/packages/workflow-executor/src/cli-core.ts b/packages/workflow-executor/src/cli-core.ts index 88e1458ff5..6d4e442a74 100644 --- a/packages/workflow-executor/src/cli-core.ts +++ b/packages/workflow-executor/src/cli-core.ts @@ -18,6 +18,7 @@ import { DEFAULT_HTTP_PORT, DEFAULT_MAX_CHAIN_DEPTH, DEFAULT_POLLING_INTERVAL_MS, + DEFAULT_SCHEMA_CACHE_TTL_MS, DEFAULT_STEP_TIMEOUT_MS, DEFAULT_STOP_TIMEOUT_MS, } from './defaults'; @@ -161,6 +162,7 @@ export function readEnvConfig(env: NodeJS.ProcessEnv, args: CliArgs): CliConfig stepTimeoutMs: parsePositiveIntEnv('STEP_TIMEOUT_MS', env.STEP_TIMEOUT_MS), aiInvokeTimeoutMs: parsePositiveIntEnv('AI_INVOKE_TIMEOUT_MS', env.AI_INVOKE_TIMEOUT_MS), maxChainDepth: parsePositiveIntEnv('MAX_CHAIN_DEPTH', env.MAX_CHAIN_DEPTH), + schemaCacheTtlMs: parsePositiveIntEnv('SCHEMA_CACHE_TTL_MS', env.SCHEMA_CACHE_TTL_MS), ...(aiConfigurations && { aiConfigurations }), ...(env.FORCE_AI_ERROR === 'true' && { forceAiError: true }), }; @@ -198,6 +200,7 @@ Optional environment variables: STEP_TIMEOUT_MS Max duration of a step in ms (default: ${DEFAULT_STEP_TIMEOUT_MS}) AI_INVOKE_TIMEOUT_MS Max duration of a single AI provider invocation in ms (default: ${DEFAULT_AI_INVOKE_TIMEOUT_MS}) MAX_CHAIN_DEPTH Max steps auto-executed per run before yielding (default: ${DEFAULT_MAX_CHAIN_DEPTH}) + SCHEMA_CACHE_TTL_MS Collection schema cache TTL in ms (default: ${DEFAULT_SCHEMA_CACHE_TTL_MS}) NO_COLOR Set to any value to disable ANSI colors in pretty logs FORCE_AI_ERROR Set to "true" to make every AI call fail (dev only, to test error paths) diff --git a/packages/workflow-executor/src/defaults.ts b/packages/workflow-executor/src/defaults.ts index 746077301f..b0e9784dda 100644 --- a/packages/workflow-executor/src/defaults.ts +++ b/packages/workflow-executor/src/defaults.ts @@ -5,3 +5,4 @@ export const DEFAULT_STEP_TIMEOUT_MS = 5 * 60_000; export const DEFAULT_AI_INVOKE_TIMEOUT_MS = 30_000; export const DEFAULT_STOP_TIMEOUT_MS = 30_000; export const DEFAULT_MAX_CHAIN_DEPTH = 50; +export const DEFAULT_SCHEMA_CACHE_TTL_MS = 10 * 60_000; diff --git a/packages/workflow-executor/src/schema-cache.ts b/packages/workflow-executor/src/schema-cache.ts index 68b1a3db0b..d6f04056e7 100644 --- a/packages/workflow-executor/src/schema-cache.ts +++ b/packages/workflow-executor/src/schema-cache.ts @@ -1,13 +1,13 @@ import type { CollectionSchema } from './types/validated/collection'; -const DEFAULT_TTL_MS = 10 * 60 * 1000; // 10 minutes +import { DEFAULT_SCHEMA_CACHE_TTL_MS } from './defaults'; export default class SchemaCache { private readonly store = new Map(); private readonly ttlMs: number; private readonly now: () => number; - constructor(ttlMs: number = DEFAULT_TTL_MS, now: () => number = Date.now) { + constructor(ttlMs: number = DEFAULT_SCHEMA_CACHE_TTL_MS, now: () => number = Date.now) { this.ttlMs = ttlMs; this.now = now; } diff --git a/packages/workflow-executor/test/build-workflow-executor.test.ts b/packages/workflow-executor/test/build-workflow-executor.test.ts index 31459263df..8666a691a7 100644 --- a/packages/workflow-executor/test/build-workflow-executor.test.ts +++ b/packages/workflow-executor/test/build-workflow-executor.test.ts @@ -1,5 +1,6 @@ import ForestServerWorkflowPort from '../src/adapters/forest-server-workflow-port'; import { buildDatabaseExecutor, buildInMemoryExecutor } from '../src/build-workflow-executor'; +import { DEFAULT_SCHEMA_CACHE_TTL_MS } from '../src/defaults'; import Runner from '../src/runner'; import SchemaCache from '../src/schema-cache'; import DatabaseStore from '../src/stores/database-store'; @@ -9,6 +10,7 @@ jest.mock('../src/runner'); jest.mock('../src/stores/in-memory-store'); jest.mock('../src/stores/database-store'); jest.mock('../src/adapters/agent-client-agent-port'); +jest.mock('../src/schema-cache'); jest.mock('../src/adapters/forest-server-workflow-port'); jest.mock('../src/http/executor-http-server'); jest.mock('../src/adapters/ai-client-adapter'); @@ -20,6 +22,7 @@ jest.mock('sequelize', () => ({ })); const MockedRunner = Runner as jest.MockedClass; +const MockedSchemaCache = SchemaCache as jest.MockedClass; const BASE_OPTIONS = { envSecret: 'a'.repeat(64), @@ -176,6 +179,25 @@ describe('buildInMemoryExecutor', () => { expect(MockedRunner).toHaveBeenCalledWith(expect.objectContaining({ stepTimeoutMs: 30_000 })); }); + it('builds the SchemaCache with the default TTL when schemaCacheTtlMs is not configured', () => { + buildInMemoryExecutor(BASE_OPTIONS); + + expect(MockedSchemaCache).toHaveBeenCalledWith(DEFAULT_SCHEMA_CACHE_TTL_MS); + }); + + it('builds the SchemaCache with a caller-provided schemaCacheTtlMs over the default', () => { + buildInMemoryExecutor({ ...BASE_OPTIONS, schemaCacheTtlMs: 5_000 }); + + expect(MockedSchemaCache).toHaveBeenCalledWith(5_000); + }); + + it('falls back to the default TTL for a non-positive or non-finite schemaCacheTtlMs', () => { + buildInMemoryExecutor({ ...BASE_OPTIONS, schemaCacheTtlMs: 0 }); + + // A 0/negative/Infinity TTL must not silently make the cache always-stale. + expect(MockedSchemaCache).toHaveBeenCalledWith(DEFAULT_SCHEMA_CACHE_TTL_MS); + }); + it('falls back to the default timeouts for non-positive or non-finite values', () => { buildInMemoryExecutor({ ...BASE_OPTIONS, diff --git a/packages/workflow-executor/test/cli.test.ts b/packages/workflow-executor/test/cli.test.ts index 2a74307cd2..a09b44fb39 100644 --- a/packages/workflow-executor/test/cli.test.ts +++ b/packages/workflow-executor/test/cli.test.ts @@ -17,6 +17,7 @@ import { DEFAULT_HTTP_PORT, DEFAULT_MAX_CHAIN_DEPTH, DEFAULT_POLLING_INTERVAL_MS, + DEFAULT_SCHEMA_CACHE_TTL_MS, DEFAULT_STEP_TIMEOUT_MS, DEFAULT_STOP_TIMEOUT_MS, } from '../src/defaults'; @@ -141,6 +142,7 @@ describe('readEnvConfig', () => { STOP_TIMEOUT_MS: '10000', STEP_TIMEOUT_MS: '60000', MAX_CHAIN_DEPTH: '10', + SCHEMA_CACHE_TTL_MS: '120000', }, args, ); @@ -150,6 +152,19 @@ describe('readEnvConfig', () => { expect(config.executorOptions.stopTimeoutMs).toBe(10000); expect(config.executorOptions.stepTimeoutMs).toBe(60000); expect(config.executorOptions.maxChainDepth).toBe(10); + expect(config.executorOptions.schemaCacheTtlMs).toBe(120000); + }); + + it('leaves schemaCacheTtlMs undefined when SCHEMA_CACHE_TTL_MS is unset (default applied downstream in build)', () => { + const config = readEnvConfig(baseEnv, args); + + expect(config.executorOptions.schemaCacheTtlMs).toBeUndefined(); + }); + + it('throws ConfigurationError when SCHEMA_CACHE_TTL_MS is non-numeric', () => { + expect(() => readEnvConfig({ ...baseEnv, SCHEMA_CACHE_TTL_MS: 'abc' }, args)).toThrow( + /SCHEMA_CACHE_TTL_MS must be a positive integer/, + ); }); it('leaves stepTimeoutMs undefined when STEP_TIMEOU_MS is unset (default applied downstream in build)', () => { @@ -292,6 +307,7 @@ describe('printHelp / printVersion', () => { expect(output).toContain(`Default: ${DEFAULT_STOP_TIMEOUT_MS}`); expect(output).toContain(`default: ${DEFAULT_STEP_TIMEOUT_MS}`); expect(output).toContain(`default: ${DEFAULT_MAX_CHAIN_DEPTH}`); + expect(output).toContain(`default: ${DEFAULT_SCHEMA_CACHE_TTL_MS}`); }); it('printVersion prints a version string', () => {