Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/workflow-executor/CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ src/
- **Pre-recorded AI decisions** — Record step executors support `preRecordedArgs` in the step definition to bypass AI calls. When provided, executors use the pre-recorded values (display names) directly instead of invoking the AI. Each record step type has its own typed `preRecordedArgs` shape. Validation happens via schema resolution — invalid display names throw `InvalidPreRecordedArgsError`. Partial args are supported: only the provided fields skip AI, the rest still use AI.
- **Graceful shutdown** — `stop()` drains in-flight steps before closing resources. The `state` getter exposes the lifecycle: `idle → running → draining → stopped`. `stopTimeoutMs` (default 30s) prevents `stop()` from hanging forever if a step is stuck. The HTTP server stays up during drain so the frontend can still query run status. Signal handling (`SIGTERM`/`SIGINT`) is the consumer's responsibility — the Runner is a library class.
- **Structured log context** — `BaseStepExecutor.execute()` stamps every log line with a shared `logCtx` (`runId`, `stepId`, `stepIndex`, `stepType`). Executors with type-specific identifiers add them via the `getExtraLogContext()` hook (default `{}`), keeping the base class free of step-specific knowledge — e.g. `McpStepExecutor` returns `{ mcpServerId, mcpServerName }` so MCP step logs unambiguously identify the targeted server (`mcpServerId` is canonical; `mcpServerName` is the human-readable Record key, not guaranteed unique at the DB level). `mcpServerName` is resolved by `RemoteToolFetcher.fetch()` from the scoped config Record key and forwarded to the executor constructor.
- **Logger shape** — `Logger = (level, message, context?) => void` with `LoggerLevel = 'Debug' | 'Info' | 'Warn' | 'Error'` (signature aligned with `@forestadmin/agent`; the executor extends it with a structured `context` object). Adapters `createConsoleLogger(minLevel)` (JSON) and `createPrettyLogger(minLevel)` (colorized) are factories that close over a min-level filter — calls below the threshold are dropped at the source. CLI minimum level comes from `LOG_LEVEL` env var (default `Info`); library consumers pass `loggerLevel` in `ExecutorOptions`.
- **Boundary validation** — Types that cross a trust boundary (wire from the orchestrator, or mapper output) live under `src/types/validated/` as zod schemas with TS types inferred via `z.infer<>`. Strictness depends on origin: schemas the executor **produces** (mapper output) and **frontend** HTTP bodies use `.strict()` (catch our own bugs / input hygiene); the **orchestrator collection schema** instead **strips** unknown keys and requires only structural fields, with step-specific props optional and asserted at use-time by the consuming executor. This keeps the executor resilient to independent orchestrator drift — we fail at step execution, only when a step genuinely lacks what it needs, never in bulk up front for an unrelated add/remove. Validation runs where data enters (`forest-server-workflow-port.getCollectionSchema`, `run-to-available-step-mapper.toAvailableStepExecution`). On parse failure: throw `DomainValidationError` (extends `WorkflowExecutorError`) → bucketized as malformed (dispatch) or surfaced as a step error (execution). Types outside `validated/` are internal runtime state and not zod-validated. Note: `StepOutcome` is validated when it arrives as input via `previousSteps`; executor outputs are trusted by construction.

## Commands
Expand Down
5 changes: 4 additions & 1 deletion packages/workflow-executor/example/.env.executors.example
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,7 @@ NODE_TLS_REJECT_UNAUTHORIZED=0


# Optional — default shown.
POLLING_INTERVAL_MS=5000
POLLING_INTERVAL_MS=5000

# Optional — minimum log level (Debug | Info | Warn | Error). Default: Info.
# LOG_LEVEL=Info
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ x-executor-env: &executor-env
POLLING_INTERVAL_MS: ${POLLING_INTERVAL_MS:-5000}
HTTP_PORT: "3400"
NODE_TLS_REJECT_UNAUTHORIZED: 0
LOG_LEVEL: ${LOG_LEVEL}

x-executor-common: &executor-common
image: forest-workflow-executor:local
Expand Down
29 changes: 18 additions & 11 deletions packages/workflow-executor/src/adapters/console-logger.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,22 @@
import type { Logger } from '../ports/logger-port';
import type { Logger, LoggerLevel } from '../ports/logger-port';

export default class ConsoleLogger implements Logger {
error(message: string, context: Record<string, unknown>): void {
console.error(JSON.stringify({ message, timestamp: new Date().toISOString(), ...context }));
}
const ORDER: LoggerLevel[] = ['Debug', 'Info', 'Warn', 'Error'];

warn(message: string, context: Record<string, unknown>): void {
console.warn(JSON.stringify({ message, timestamp: new Date().toISOString(), ...context }));
}
export default function createConsoleLogger(minLevel: LoggerLevel = 'Info'): Logger {
const minIndex = ORDER.indexOf(minLevel);

info(message: string, context: Record<string, unknown>): void {
console.info(JSON.stringify({ message, timestamp: new Date().toISOString(), ...context }));
}
return (level, message, context) => {
if (ORDER.indexOf(level) < minIndex) return;

const payload = JSON.stringify({
level,
message,
timestamp: new Date().toISOString(),
...(context ?? {}),
});

if (level === 'Error') console.error(payload);
else if (level === 'Warn') console.warn(payload);
else console.info(payload);
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import type { HttpOptions } from '@forestadmin/forestadmin-client';
import { ServerUtils } from '@forestadmin/forestadmin-client';
import { z } from 'zod';

import ConsoleLogger from './console-logger';
import createConsoleLogger from './console-logger';
import toAvailableStepExecution from './run-to-available-step-mapper';
import toUpdateStepRequest from './step-outcome-to-update-step-mapper';
import withRetry from './with-retry';
Expand Down Expand Up @@ -55,7 +55,7 @@ export default class ForestServerWorkflowPort implements WorkflowPort {

constructor(params: { envSecret: string; forestServerUrl: string; logger?: Logger }) {
this.options = { envSecret: params.envSecret, forestServerUrl: params.forestServerUrl };
this.logger = params.logger ?? new ConsoleLogger();
this.logger = params.logger ?? createConsoleLogger();
}

async getAvailableRuns(): Promise<AvailableRunsBatch> {
Expand All @@ -74,7 +74,7 @@ export default class ForestServerWorkflowPort implements WorkflowPort {
if (error instanceof WorkflowExecutorError) {
malformed.push(this.toMalformedInfo(run, error));
} else {
this.logger.error('Failed to hydrate pending run — unexpected error', {
this.logger('Error', 'Failed to hydrate pending run — unexpected error', {
runId: run.id,
error: extractErrorMessage(error),
});
Expand Down Expand Up @@ -171,7 +171,7 @@ export default class ForestServerWorkflowPort implements WorkflowPort {
} catch (error) {
// The outcome was recorded server-side; only the chain parse failed. Fall back to the
// next poll cycle — don't let a malformed chain response mask the successful update.
this.logger.error('Failed to parse chained next step from /update-step response', {
this.logger('Error', 'Failed to parse chained next step from /update-step response', {
runId: String(run.id),
error: extractErrorMessage(error),
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ export default class ForestadminClientActivityLogPort implements ActivityLogPort

return { id: response.id, index: response.attributes.index };
} catch (cause) {
this.logger.error('activity log create failed', {
this.logger('Error', 'activity log create failed', {
action: args.action,
collectionId: args.collectionId,
status: (cause as { status?: number }).status,
Expand All @@ -68,7 +68,7 @@ export default class ForestadminClientActivityLogPort implements ActivityLogPort
{ logger: this.logger, extraRetryStatuses: [404] },
);
} catch (err) {
this.logger.error('activity log mark-as-completed failed', {
this.logger('Error', 'activity log mark-as-completed failed', {
handleId: handle.id,
error: extractErrorMessage(err),
});
Expand All @@ -90,7 +90,7 @@ export default class ForestadminClientActivityLogPort implements ActivityLogPort
{ logger: this.logger, extraRetryStatuses: [404] },
);
} catch (err) {
this.logger.error('activity log mark-as-failed failed', {
this.logger('Error', 'activity log mark-as-failed failed', {
handleId: handle.id,
stepErrorMessage: errorMessage,
error: extractErrorMessage(err),
Expand Down
70 changes: 38 additions & 32 deletions packages/workflow-executor/src/adapters/pretty-logger.ts
Original file line number Diff line number Diff line change
@@ -1,38 +1,44 @@
import type { Logger } from '../ports/logger-port';
import type { Logger, LoggerLevel } from '../ports/logger-port';

import pc from 'picocolors';

const ORDER: LoggerLevel[] = ['Debug', 'Info', 'Warn', 'Error'];

const LABEL: Record<LoggerLevel, string> = {
Debug: pc.blue('debug'),
Info: pc.cyan('info '),
Warn: pc.yellow('warn '),
Error: pc.red('error'),
};

function formatContext(context: Record<string, unknown>): string {
const parts = Object.entries(context).map(([key, value]) => `${key}=${JSON.stringify(value)}`);
if (parts.length === 0) return '';

return pc.dim(parts.join(' '));
}

function format(level: LoggerLevel, message: string, context: Record<string, unknown>): string {
const timestamp = pc.dim(new Date().toISOString().substring(11, 19));
const contextStr = formatContext(context);

return contextStr
? `${timestamp} ${LABEL[level]} ${message} ${contextStr}`
: `${timestamp} ${LABEL[level]} ${message}`;
}

// Colorized logger for TTY/dev. Pair with ConsoleLogger for piped output.
// CLI auto-picks via process.stdout.isTTY + --pretty/--json flags. NO_COLOR is honored.
export default class PrettyLogger implements Logger {
info(message: string, context: Record<string, unknown>): void {
// eslint-disable-next-line no-console
console.info(this.format(pc.cyan('info '), message, context));
}

warn(message: string, context: Record<string, unknown>): void {
// eslint-disable-next-line no-console
console.warn(this.format(pc.yellow('warn '), message, context));
}

error(message: string, context: Record<string, unknown>): void {
// eslint-disable-next-line no-console
console.error(this.format(pc.red('error'), message, context));
}

private format(level: string, message: string, context: Record<string, unknown>): string {
const timestamp = pc.dim(new Date().toISOString().substring(11, 19));
const contextStr = this.formatContext(context);

return contextStr
? `${timestamp} ${level} ${message} ${contextStr}`
: `${timestamp} ${level} ${message}`;
}

private formatContext(context: Record<string, unknown>): string {
const parts = Object.entries(context).map(([key, value]) => `${key}=${JSON.stringify(value)}`);
if (parts.length === 0) return '';

return pc.dim(parts.join(' '));
}
export default function createPrettyLogger(minLevel: LoggerLevel = 'Info'): Logger {
const minIndex = ORDER.indexOf(minLevel);

return (level, message, context) => {
if (ORDER.indexOf(level) < minIndex) return;

const line = format(level, message, context ?? {});

if (level === 'Error') console.error(line);
else if (level === 'Warn') console.warn(line);
else console.info(line);
};
}
2 changes: 1 addition & 1 deletion packages/workflow-executor/src/adapters/with-retry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ export default async function withRetry<T>(
} catch (err) {
lastError = err;
if (!isRetryable(err, extraRetryStatuses) || attempt === RETRY_DELAYS_MS.length) throw err;
logger.warn(`"${label}" failed, retrying`, {
logger('Warn', `"${label}" failed, retrying`, {
attempt: attempt + 1,
status: (err as { status?: number }).status,
error: extractErrorMessage(err),
Expand Down
22 changes: 12 additions & 10 deletions packages/workflow-executor/src/build-workflow-executor.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { Logger } from './ports/logger-port';
import type { Logger, LoggerLevel } from './ports/logger-port';
import type { RunnerState } from './runner';
import type { AiConfiguration } from '@forestadmin/ai-proxy';
import type { Options as SequelizeOptions } from 'sequelize';
Expand All @@ -9,13 +9,14 @@ import { Sequelize } from 'sequelize';
import AgentClientAgentPort from './adapters/agent-client-agent-port';
import AiClientAdapter from './adapters/ai-client-adapter';
import AlwaysErrorAiModelPort from './adapters/always-error-ai-model-port';
import ConsoleLogger from './adapters/console-logger';
import createConsoleLogger from './adapters/console-logger';
import ForestServerWorkflowPort from './adapters/forest-server-workflow-port';
import ForestadminClientActivityLogPortFactory from './adapters/forestadmin-client-activity-log-port-factory';
import ServerAiAdapter from './adapters/server-ai-adapter';
import {
DEFAULT_AI_INVOKE_TIMEOUT_MS,
DEFAULT_FOREST_SERVER_URL,
DEFAULT_LOGGER_LEVEL,
DEFAULT_POLLING_INTERVAL_MS,
DEFAULT_SCHEMA_CACHE_TTL_MS,
DEFAULT_STEP_TIMEOUT_MS,
Expand Down Expand Up @@ -43,6 +44,7 @@ export interface ExecutorOptions {
aiConfigurations?: AiConfiguration[];
pollingIntervalMs?: number;
logger?: Logger;
loggerLevel?: LoggerLevel;
stopTimeoutMs?: number;
stepTimeoutMs?: number;
aiInvokeTimeoutMs?: number;
Expand All @@ -65,7 +67,7 @@ function positiveOrDefault(value: number | undefined, fallback: number): number

function buildCommonDependencies(options: ExecutorOptions) {
const forestServerUrl = options.forestServerUrl ?? DEFAULT_FOREST_SERVER_URL;
const logger = options.logger ?? new ConsoleLogger();
const logger = options.logger ?? createConsoleLogger(options.loggerLevel ?? DEFAULT_LOGGER_LEVEL);

const workflowPort = new ForestServerWorkflowPort({
envSecret: options.envSecret,
Expand All @@ -76,12 +78,12 @@ function buildCommonDependencies(options: ExecutorOptions) {
const forceAiError = options.forceAiError && process.env.NODE_ENV !== 'production';

if (forceAiError) {
logger.info(
logger(
'Info',
'FORCE_AI_ERROR is enabled — AI calls will always fail. Do not use in production.',
{},
);
} else if (options.forceAiError && process.env.NODE_ENV === 'production') {
logger.info('FORCE_AI_ERROR is set but ignored in production.', {});
logger('Info', 'FORCE_AI_ERROR is set but ignored in production.');
}

let aiModelPort;
Expand Down Expand Up @@ -142,7 +144,7 @@ function createWorkflowExecutor(
try {
await server.stop();
} catch (err) {
logger.error('HTTP server close failed during shutdown', {
logger('Error', 'HTTP server close failed during shutdown', {
error: err instanceof Error ? err.message : String(err),
});
}
Expand All @@ -151,14 +153,14 @@ function createWorkflowExecutor(
};

const onSignal = async () => {
logger.info?.('Received shutdown signal, stopping gracefully...', {});
logger('Info', 'Received shutdown signal, stopping gracefully...');

try {
if (!shutdownPromise) shutdownPromise = shutdown();
await shutdownPromise;
process.exitCode = 0;
} catch (error) {
logger.error('Graceful shutdown failed', {
logger('Error', 'Graceful shutdown failed', {
error: error instanceof Error ? error.message : String(error),
});
process.exitCode = 1;
Expand All @@ -167,7 +169,7 @@ function createWorkflowExecutor(
// Safety net: force exit if the event loop doesn't drain
// eslint-disable-next-line no-console
setTimeout(() => {
logger.error('Process did not exit after shutdown — forcing exit', {});
logger('Error', 'Process did not exit after shutdown — forcing exit');
process.exit(process.exitCode ?? 1);
}, FORCE_EXIT_DELAY_MS).unref();
};
Expand Down
Loading
Loading