Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion packages/workflow-executor/src/build-workflow-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import {
DEFAULT_AI_INVOKE_TIMEOUT_MS,
DEFAULT_FOREST_SERVER_URL,
DEFAULT_POLLING_INTERVAL_MS,
DEFAULT_SCHEMA_CACHE_TTL_MS,
DEFAULT_STEP_TIMEOUT_MS,
} from './defaults';
import ExecutorHttpServer from './http/executor-http-server';
Expand Down Expand Up @@ -47,6 +48,8 @@ export interface ExecutorOptions {
aiInvokeTimeoutMs?: number;
// Max auto-chained steps per entry (see RunnerConfig.maxChainDepth). 0 disables chaining.
maxChainDepth?: number;
// Collection schema cache TTL in ms. Lower it to pick up orchestrator schema changes sooner.
schemaCacheTtlMs?: number;
// Dev only: makes every AI call fail immediately so error paths can be exercised locally.
forceAiError?: boolean;
}
Expand Down Expand Up @@ -91,7 +94,10 @@ function buildCommonDependencies(options: ExecutorOptions) {
aiModelPort = new ServerAiAdapter({ forestServerUrl, envSecret: options.envSecret });
}

const schemaCache = new SchemaCache();
// A TTL of 0/negative/non-finite would silently make the cache always-stale, so fall back.
const schemaCache = new SchemaCache(
positiveOrDefault(options.schemaCacheTtlMs, DEFAULT_SCHEMA_CACHE_TTL_MS),
);

const agentPort = new AgentClientAgentPort({
agentUrl: options.agentUrl,
Expand Down
3 changes: 3 additions & 0 deletions packages/workflow-executor/src/cli-core.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import {
DEFAULT_HTTP_PORT,
DEFAULT_MAX_CHAIN_DEPTH,
DEFAULT_POLLING_INTERVAL_MS,
DEFAULT_SCHEMA_CACHE_TTL_MS,
DEFAULT_STEP_TIMEOUT_MS,
DEFAULT_STOP_TIMEOUT_MS,
} from './defaults';
Expand Down Expand Up @@ -161,6 +162,7 @@ export function readEnvConfig(env: NodeJS.ProcessEnv, args: CliArgs): CliConfig
stepTimeoutMs: parsePositiveIntEnv('STEP_TIMEOUT_MS', env.STEP_TIMEOUT_MS),
aiInvokeTimeoutMs: parsePositiveIntEnv('AI_INVOKE_TIMEOUT_MS', env.AI_INVOKE_TIMEOUT_MS),
maxChainDepth: parsePositiveIntEnv('MAX_CHAIN_DEPTH', env.MAX_CHAIN_DEPTH),
schemaCacheTtlMs: parsePositiveIntEnv('SCHEMA_CACHE_TTL_MS', env.SCHEMA_CACHE_TTL_MS),
...(aiConfigurations && { aiConfigurations }),
...(env.FORCE_AI_ERROR === 'true' && { forceAiError: true }),
};
Expand Down Expand Up @@ -198,6 +200,7 @@ Optional environment variables:
STEP_TIMEOUT_MS Max duration of a step in ms (default: ${DEFAULT_STEP_TIMEOUT_MS})
AI_INVOKE_TIMEOUT_MS Max duration of a single AI provider invocation in ms (default: ${DEFAULT_AI_INVOKE_TIMEOUT_MS})
MAX_CHAIN_DEPTH Max steps auto-executed per run before yielding (default: ${DEFAULT_MAX_CHAIN_DEPTH})
SCHEMA_CACHE_TTL_MS Collection schema cache TTL in ms (default: ${DEFAULT_SCHEMA_CACHE_TTL_MS})
Comment thread
hercemer42 marked this conversation as resolved.
NO_COLOR Set to any value to disable ANSI colors in pretty logs
FORCE_AI_ERROR Set to "true" to make every AI call fail (dev only, to test error paths)

Expand Down
1 change: 1 addition & 0 deletions packages/workflow-executor/src/defaults.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ export const DEFAULT_STEP_TIMEOUT_MS = 5 * 60_000;
export const DEFAULT_AI_INVOKE_TIMEOUT_MS = 30_000;
export const DEFAULT_STOP_TIMEOUT_MS = 30_000;
export const DEFAULT_MAX_CHAIN_DEPTH = 50;
export const DEFAULT_SCHEMA_CACHE_TTL_MS = 10 * 60_000;
Comment thread
hercemer42 marked this conversation as resolved.
4 changes: 2 additions & 2 deletions packages/workflow-executor/src/schema-cache.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import type { CollectionSchema } from './types/validated/collection';

const DEFAULT_TTL_MS = 10 * 60 * 1000; // 10 minutes
import { DEFAULT_SCHEMA_CACHE_TTL_MS } from './defaults';

export default class SchemaCache {
private readonly store = new Map<string, { schema: CollectionSchema; fetchedAt: number }>();
private readonly ttlMs: number;
private readonly now: () => number;

constructor(ttlMs: number = DEFAULT_TTL_MS, now: () => number = Date.now) {
constructor(ttlMs: number = DEFAULT_SCHEMA_CACHE_TTL_MS, now: () => number = Date.now) {
this.ttlMs = ttlMs;
this.now = now;
}
Expand Down
22 changes: 22 additions & 0 deletions packages/workflow-executor/test/build-workflow-executor.test.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import ForestServerWorkflowPort from '../src/adapters/forest-server-workflow-port';
import { buildDatabaseExecutor, buildInMemoryExecutor } from '../src/build-workflow-executor';
import { DEFAULT_SCHEMA_CACHE_TTL_MS } from '../src/defaults';
import Runner from '../src/runner';
import SchemaCache from '../src/schema-cache';
import DatabaseStore from '../src/stores/database-store';
Expand All @@ -9,6 +10,7 @@ jest.mock('../src/runner');
jest.mock('../src/stores/in-memory-store');
jest.mock('../src/stores/database-store');
jest.mock('../src/adapters/agent-client-agent-port');
jest.mock('../src/schema-cache');
jest.mock('../src/adapters/forest-server-workflow-port');
jest.mock('../src/http/executor-http-server');
jest.mock('../src/adapters/ai-client-adapter');
Expand All @@ -20,6 +22,7 @@ jest.mock('sequelize', () => ({
}));

const MockedRunner = Runner as jest.MockedClass<typeof Runner>;
const MockedSchemaCache = SchemaCache as jest.MockedClass<typeof SchemaCache>;

const BASE_OPTIONS = {
envSecret: 'a'.repeat(64),
Expand Down Expand Up @@ -176,6 +179,25 @@ describe('buildInMemoryExecutor', () => {
expect(MockedRunner).toHaveBeenCalledWith(expect.objectContaining({ stepTimeoutMs: 30_000 }));
});

it('builds the SchemaCache with the default TTL when schemaCacheTtlMs is not configured', () => {
buildInMemoryExecutor(BASE_OPTIONS);

expect(MockedSchemaCache).toHaveBeenCalledWith(DEFAULT_SCHEMA_CACHE_TTL_MS);
});

it('builds the SchemaCache with a caller-provided schemaCacheTtlMs over the default', () => {
buildInMemoryExecutor({ ...BASE_OPTIONS, schemaCacheTtlMs: 5_000 });

expect(MockedSchemaCache).toHaveBeenCalledWith(5_000);
});

it('falls back to the default TTL for a non-positive or non-finite schemaCacheTtlMs', () => {
buildInMemoryExecutor({ ...BASE_OPTIONS, schemaCacheTtlMs: 0 });

// A 0/negative/Infinity TTL must not silently make the cache always-stale.
expect(MockedSchemaCache).toHaveBeenCalledWith(DEFAULT_SCHEMA_CACHE_TTL_MS);
});

it('falls back to the default timeouts for non-positive or non-finite values', () => {
buildInMemoryExecutor({
...BASE_OPTIONS,
Expand Down
16 changes: 16 additions & 0 deletions packages/workflow-executor/test/cli.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import {
DEFAULT_HTTP_PORT,
DEFAULT_MAX_CHAIN_DEPTH,
DEFAULT_POLLING_INTERVAL_MS,
DEFAULT_SCHEMA_CACHE_TTL_MS,
DEFAULT_STEP_TIMEOUT_MS,
DEFAULT_STOP_TIMEOUT_MS,
} from '../src/defaults';
Expand Down Expand Up @@ -141,6 +142,7 @@ describe('readEnvConfig', () => {
STOP_TIMEOUT_MS: '10000',
STEP_TIMEOUT_MS: '60000',
MAX_CHAIN_DEPTH: '10',
SCHEMA_CACHE_TTL_MS: '120000',
},
args,
);
Expand All @@ -150,6 +152,19 @@ describe('readEnvConfig', () => {
expect(config.executorOptions.stopTimeoutMs).toBe(10000);
expect(config.executorOptions.stepTimeoutMs).toBe(60000);
expect(config.executorOptions.maxChainDepth).toBe(10);
expect(config.executorOptions.schemaCacheTtlMs).toBe(120000);
});

it('leaves schemaCacheTtlMs undefined when SCHEMA_CACHE_TTL_MS is unset (default applied downstream in build)', () => {
const config = readEnvConfig(baseEnv, args);

expect(config.executorOptions.schemaCacheTtlMs).toBeUndefined();
});

it('throws ConfigurationError when SCHEMA_CACHE_TTL_MS is non-numeric', () => {
expect(() => readEnvConfig({ ...baseEnv, SCHEMA_CACHE_TTL_MS: 'abc' }, args)).toThrow(
/SCHEMA_CACHE_TTL_MS must be a positive integer/,
);
});

it('leaves stepTimeoutMs undefined when STEP_TIMEOU_MS is unset (default applied downstream in build)', () => {
Expand Down Expand Up @@ -292,6 +307,7 @@ describe('printHelp / printVersion', () => {
expect(output).toContain(`Default: ${DEFAULT_STOP_TIMEOUT_MS}`);
expect(output).toContain(`default: ${DEFAULT_STEP_TIMEOUT_MS}`);
expect(output).toContain(`default: ${DEFAULT_MAX_CHAIN_DEPTH}`);
expect(output).toContain(`default: ${DEFAULT_SCHEMA_CACHE_TTL_MS}`);
});

it('printVersion prints a version string', () => {
Expand Down
Loading