Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/workflow-executor/CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ src/
- **Pre-recorded AI decisions** — Record step executors support `preRecordedArgs` in the step definition to bypass AI calls. When provided, executors use the pre-recorded values (display names) directly instead of invoking the AI. Each record step type has its own typed `preRecordedArgs` shape. Validation happens via schema resolution — invalid display names throw `InvalidPreRecordedArgsError`. Partial args are supported: only the provided fields skip AI, the rest still use AI.
- **Graceful shutdown** — `stop()` drains in-flight steps before closing resources. The `state` getter exposes the lifecycle: `idle → running → draining → stopped`. `stopTimeoutMs` (default 30s) prevents `stop()` from hanging forever if a step is stuck. The HTTP server stays up during drain so the frontend can still query run status. Signal handling (`SIGTERM`/`SIGINT`) is the consumer's responsibility — the Runner is a library class.
- **Structured log context** — `BaseStepExecutor.execute()` stamps every log line with a shared `logCtx` (`runId`, `stepId`, `stepIndex`, `stepType`). Executors with type-specific identifiers add them via the `getExtraLogContext()` hook (default `{}`), keeping the base class free of step-specific knowledge — e.g. `McpStepExecutor` returns `{ mcpServerId, mcpServerName }` so MCP step logs unambiguously identify the targeted server (`mcpServerId` is canonical; `mcpServerName` is the human-readable Record key, not guaranteed unique at the DB level). `mcpServerName` is resolved by `RemoteToolFetcher.fetch()` from the scoped config Record key and forwarded to the executor constructor.
- **Revision-aware history reads** — On revision the orchestrator (server-side) marks the pivot card `revised` and every later entry `cancelled`, then appends clones of the still-valid steps (each clone's `originalStepIndex` points at the step it copies) plus a fresh re-execution of the revised step. Any consumer of `workflowHistory` must restrict to the live path (`!revised && !cancelled`) — skipping this leaks a superseded branch's context into a re-run. To find a step's RunStore execution, resolve own `stepIndex` first, then fall back to `originalStepIndex` (a clone the executor never ran inherits the copied step's record — mirrors the frontend's `carryForwardExecutorDataForCopiedSteps`). Own-index-first is essential: a re-executed step has its own entry, so it must never inherit the superseded original's record. Never key on `stepName` — LinkTo loops can put the same name on the live path twice.
- **Boundary validation** — Types that cross a trust boundary (wire from the orchestrator, or mapper output) live under `src/types/validated/` as zod schemas with TS types inferred via `z.infer<>`. Strictness depends on origin: schemas the executor **produces** (mapper output) and **frontend** HTTP bodies use `.strict()` (catch our own bugs / input hygiene); the **orchestrator collection schema** instead **strips** unknown keys and requires only structural fields, with step-specific props optional and asserted at use-time by the consuming executor. This keeps the executor resilient to independent orchestrator drift — we fail at step execution, only when a step genuinely lacks what it needs, never in bulk up front for an unrelated add/remove. Validation runs where data enters (`forest-server-workflow-port.getCollectionSchema`, `run-to-available-step-mapper.toAvailableStepExecution`). On parse failure: throw `DomainValidationError` (extends `WorkflowExecutorError`) → bucketized as malformed (dispatch) or surfaced as a step error (execution). Types outside `validated/` are internal runtime state and not zod-validated. Note: `StepOutcome` is validated when it arrives as input via `previousSteps`; executor outputs are trusted by construction.

## Commands
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ function tryMapStep(s: ServerStepHistory): Step | null {
return {
stepDefinition: toStepDefinition(s.stepDefinition),
stepOutcome: toStepOutcome(s),
...(s.originalStepIndex !== undefined && { originalStepIndex: s.originalStepIndex }),
};
} catch (err) {
// Sub-workflow navigation steps (start-sub-workflow, close-sub-workflow) are not
Expand All @@ -90,12 +91,14 @@ function tryMapStep(s: ServerStepHistory): Step | null {
}
}

// Mirrors the orchestrator's own read filter: revised and cancelled entries are not on the
// live path.
function toPreviousSteps(
history: ServerStepHistory[],
pendingStepIndex: number,
): ReadonlyArray<Step> {
return history
.filter(s => s.done && s.stepIndex < pendingStepIndex)
.filter(s => s.done && !s.revised && !s.cancelled && s.stepIndex < pendingStepIndex)
.map(s => tryMapStep(s))
.filter((s): s is Step => s !== null);
}
Expand Down
2 changes: 2 additions & 0 deletions packages/workflow-executor/src/adapters/server-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,8 @@ export interface ServerStepHistory {
done: boolean;
revised?: boolean;
cancelled?: boolean;
// On a revision clone, the index of the step it copies — where that step's record lives.
originalStepIndex?: number;
context?: Record<string, unknown>;
childrenWorkflowId?: string;
stepDefinition: ServerWorkflowStep;
Expand Down
25 changes: 22 additions & 3 deletions packages/workflow-executor/src/executors/base-step-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import type {
StepExecutionResult,
} from '../types/execution-context';
import type { ConfirmableStepExecutionData, StepExecutionData } from '../types/step-execution-data';
import type { Step } from '../types/validated/execution';
import type { StepDefinition } from '../types/validated/step-definition';
import type { StepStatus } from '../types/validated/step-outcome';
import type {
Expand Down Expand Up @@ -283,16 +284,34 @@ export default abstract class BaseStepExecutor<TStep extends StepDefinition = St

const allStepExecutions = await this.context.runStore.getStepExecutions(this.context.runId);
const summary = this.context.previousSteps
.map(({ stepDefinition, stepOutcome }) => {
const execution = allStepExecutions.find(e => e.stepIndex === stepOutcome.stepIndex);
.map(step => {
const execution = BaseStepExecutor.resolveStepExecution(step, allStepExecutions);

return StepSummaryBuilder.build(stepDefinition, stepOutcome, execution);
return StepSummaryBuilder.build(step.stepDefinition, step.stepOutcome, execution);
})
.join('\n\n');

return [new SystemMessage(summary)];
}

// A step the executor ran has its execution at its own stepIndex. A revision clone never ran,
// so it inherits the record of the step it copied (originalStepIndex). Own-index-first is what
// stops a re-executed step (which has its own entry) from resurfacing the superseded
// original's record.
protected static resolveStepExecution(
step: Step,
executions: StepExecutionData[],
): StepExecutionData | undefined {
const own = executions.find(e => e.stepIndex === step.stepOutcome.stepIndex);
if (own) return own;

if (step.originalStepIndex !== undefined) {
return executions.find(e => e.stepIndex === step.originalStepIndex);
}

return undefined;
}

private static mergeLeadingSystemMessages(messages: BaseMessage[]): BaseMessage[] {
let i = 0;
while (i < messages.length && messages[i] instanceof SystemMessage) i += 1;
Expand Down
18 changes: 13 additions & 5 deletions packages/workflow-executor/src/executors/record-step-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import { z } from 'zod';

import { InvalidAIResponseError, InvalidPreRecordedArgsError, NoRecordsError } from '../errors';
import BaseStepExecutor from './base-step-executor';
import { StepType } from '../types/validated/step-definition';

export default abstract class RecordStepExecutor<
TStep extends StepDefinition = StepDefinition,
Expand Down Expand Up @@ -46,15 +47,22 @@ export default abstract class RecordStepExecutor<
return this.selectRecordRef(records, prompt);
}

// Candidate sources for the AI: the base record plus the record each live prior
// load-related step resolved — own stepIndex first, falling back to a clone's
// originalStepIndex.
protected async getAvailableRecordRefs(): Promise<RecordRef[]> {
const stepExecutions = await this.context.runStore.getStepExecutions(this.context.runId);
const relatedRecords = stepExecutions.flatMap(e => {
const relatedRecords = this.context.previousSteps.flatMap(step => {
if (step.stepDefinition.type !== StepType.LoadRelatedRecord) return [];
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should filter cancelled and revised steps here

Copy link
Copy Markdown
Author

@hercemer42 hercemer42 Jun 4, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a filter in the adaptor, it should be enough ?


const execution = BaseStepExecutor.resolveStepExecution(step, stepExecutions);

if (
e.type === 'load-related-record' &&
e.executionResult !== undefined &&
'record' in e.executionResult
execution?.type === 'load-related-record' &&
execution.executionResult !== undefined &&
'record' in execution.executionResult
) {
return [e.executionResult.record];
return [execution.executionResult.record];
}

return [];
Expand Down
4 changes: 4 additions & 0 deletions packages/workflow-executor/src/types/validated/execution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,10 @@ export const StepSchema = z
.object({
stepDefinition: StepDefinitionSchema,
stepOutcome: StepOutcomeSchema,
// Set on a revision clone (a still-valid step the orchestrator re-injects); points at the
// step it copies. The executor never ran the clone, so its record lives at that index.
// Absent for steps the executor ran itself.
originalStepIndex: z.number().int().nonnegative().optional(),
})
.strict();
export type Step = z.infer<typeof StepSchema>;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -501,6 +501,132 @@ describe('toAvailableStepExecution', () => {
});
});

describe('revision handling', () => {
// Revision model (orchestrator): the pivot card is stamped revised:true, every step after
// it is stamped cancelled:true, and clones of the still-valid card sub-steps are appended
// at the tail with originalStepIndex chaining to the FIRST original. The live path is
// filter(!revised && !cancelled).

function makeClonedStepHistory(
overrides: Partial<ServerStepHistory>,
originalStepIndex: number,
): ServerStepHistory {
return { ...makeStepHistory(overrides), originalStepIndex };
}

/**
* Canonical single-revision scenario (sub B1 of card B revised):
* idx 0 trunk task done ← live
* idx 1 card B done, revised ← pivot anchor (dead)
* idx 2 sub B1 done, cancelled ← dead branch
* idx 3 card C done, cancelled ← dead branch
* idx 4 card B clone done, originalStepIndex 1 ← live
* idx 5 sub B1 re-exec pending ← current
*/
function makeRevisedRun(): ServerHydratedWorkflowRun {
return makeRun({
workflowHistory: [
makeStepHistory({ stepName: 'trunk', stepIndex: 0, done: true }),
makeStepHistory({ stepName: 'card-b', stepIndex: 1, done: true, revised: true }),
makeStepHistory({ stepName: 'sub-b1', stepIndex: 2, done: true, cancelled: true }),
makeStepHistory({ stepName: 'card-c', stepIndex: 3, done: true, cancelled: true }),
makeClonedStepHistory({ stepName: 'card-b', stepIndex: 4, done: true }, 1),
makeClonedStepHistory({ stepName: 'sub-b1', stepIndex: 5, done: false }, 2),
],
});
}

it('excludes cancelled steps from previousSteps', () => {
const run = makeRun({
workflowHistory: [
makeStepHistory({ stepName: 's0', stepIndex: 0, done: true }),
makeStepHistory({ stepName: 's1', stepIndex: 1, done: true, cancelled: true }),
makeStepHistory({ stepName: 's2', stepIndex: 2, done: false }),
],
});

const result = toAvailableStepExecution(run);

expect(result?.previousSteps).toHaveLength(1);
expect(result?.previousSteps[0].stepOutcome.stepId).toBe('s0');
});

it('excludes the revised anchor while keeping its live clone', () => {
const result = toAvailableStepExecution(makeRevisedRun());

const indexes = result?.previousSteps.map(s => s.stepOutcome.stepIndex);
expect(indexes).toEqual([0, 4]);
});

it('returns empty previousSteps when the entry point is revised', () => {
// Entry-point revision: the pivot IS the first step, so nothing valid precedes the
// re-execution — context must collapse to a clean slate.
const run = makeRun({
workflowHistory: [
makeStepHistory({ stepName: 'entry', stepIndex: 0, done: true, revised: true }),
makeStepHistory({ stepName: 's1', stepIndex: 1, done: true, cancelled: true }),
makeClonedStepHistory({ stepName: 'entry', stepIndex: 2, done: false }, 0),
],
});

const result = toAvailableStepExecution(run);

expect(result?.previousSteps).toEqual([]);
});

it('does not attempt to map dead-branch steps (filtering precedes mapping)', () => {
// A cancelled step with an unmappable definition must not fail the run — it is dead.
const run = makeRun({
workflowHistory: [
makeStepHistory({
stepName: 's0',
stepIndex: 0,
done: true,
cancelled: true,
stepDefinition: makeTaskStepDef({
taskType: 'unknown-future-type' as ServerTaskTypeEnum,
title: 't',
}),
}),
makeStepHistory({ stepName: 's1', stepIndex: 1, done: false }),
],
});

const result = toAvailableStepExecution(run);

expect(result?.previousSteps).toEqual([]);
});

describe('originalStepIndex', () => {
it('is absent for a step the executor ran itself (no revision)', () => {
const run = makeRun({
workflowHistory: [
makeStepHistory({ stepName: 's0', stepIndex: 0, done: true }),
makeStepHistory({ stepName: 's1', stepIndex: 1, done: false }),
],
});

const result = toAvailableStepExecution(run);

expect(result?.previousSteps).toHaveLength(1);
expect(result?.previousSteps[0]).not.toHaveProperty('originalStepIndex');
});

it('is set on a clone, pointing at the step it copies', () => {
const result = toAvailableStepExecution(makeRevisedRun());

// previousSteps = [trunk (ran, no original), card-b clone (copy of idx 1)]
expect(result?.previousSteps[0]).not.toHaveProperty('originalStepIndex');
expect(result?.previousSteps[1]).toEqual(
expect.objectContaining({
stepOutcome: expect.objectContaining({ stepIndex: 4 }),
originalStepIndex: 1,
}),
);
});
});
});

describe('user mapping', () => {
it('should map server userProfile to StepUser with null → empty string', () => {
const profile: ServerUserProfile = {
Expand Down
114 changes: 114 additions & 0 deletions packages/workflow-executor/test/executors/base-step-executor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import type { RunStore } from '../../src/ports/run-store';
import type { ExecutionContext, StepExecutionResult } from '../../src/types/execution-context';
import type { StepExecutionData } from '../../src/types/step-execution-data';
import type { RecordRef } from '../../src/types/validated/collection';
import type { Step } from '../../src/types/validated/execution';
import type { StepDefinition } from '../../src/types/validated/step-definition';
import type { BaseStepStatus, StepOutcome } from '../../src/types/validated/step-outcome';
import type { BaseMessage, DynamicStructuredTool } from '@forestadmin/ai-proxy';
Expand Down Expand Up @@ -211,6 +212,119 @@ describe('BaseStepExecutor', () => {
});
});

describe('previous-steps summary after revision', () => {
// A revision clone never ran under its own (new) stepIndex — its execution lives at the
// copied step's index (originalStepIndex). The summary resolves own-index first, then the
// copied step; own-first is what stops a re-executed step from resurfacing the superseded
// original's execution detail.

function makeCloneEntry(
overrides: { stepId?: string; stepIndex?: number; prompt?: string },
originalStepIndex: number,
): Step {
return { ...makeHistoryEntry(overrides), originalStepIndex };
}

it("resolves a clone summary from the copied step's execution", async () => {
const runStore = makeMockRunStore([
{
type: 'condition',
stepIndex: 3,
executionParams: { answer: 'Yes', reasoning: 'REASONING-AT-IDX-3' },
executionResult: { answer: 'Yes' },
},
]);
const executor = new TestableExecutor(
makeContext({
// clone runs under idx 7 but copies idx 3; the executor never ran idx 7
previousSteps: [makeCloneEntry({ stepId: 'cond-1', stepIndex: 7 }, 3)],
runStore,
}),
);

const messages = await executor.buildPreviousStepsMessages();
const content = messages[0].content as string;

expect(content).toContain('Step "cond-1"');
expect(content).toContain('REASONING-AT-IDX-3');
});

it("uses the step's own execution, never the copied original's, when both exist", async () => {
const runStore = makeMockRunStore([
{
type: 'condition',
stepIndex: 3,
executionParams: { answer: 'No', reasoning: 'SUPERSEDED-ORIGINAL' },
executionResult: { answer: 'No' },
},
{
type: 'condition',
stepIndex: 7,
executionParams: { answer: 'Yes', reasoning: 'OWN-FRESH' },
executionResult: { answer: 'Yes' },
},
]);
const executor = new TestableExecutor(
makeContext({
// a re-executed step carries originalStepIndex but has its OWN execution at idx 7
previousSteps: [makeCloneEntry({ stepId: 'cond-1', stepIndex: 7 }, 3)],
runStore,
}),
);

const messages = await executor.buildPreviousStepsMessages();
const content = messages[0].content as string;

expect(content).toContain('OWN-FRESH');
expect(content).not.toContain('SUPERSEDED-ORIGINAL');
});

it('falls back to outcome History when neither own nor copied step has an execution', async () => {
const runStore = makeMockRunStore([]);
const executor = new TestableExecutor(
makeContext({
previousSteps: [makeCloneEntry({ stepId: 'cond-1', stepIndex: 7 }, 3)],
runStore,
}),
);

const messages = await executor.buildPreviousStepsMessages();
const content = messages[0].content as string;

expect(content).toContain('Step "cond-1"');
expect(content).toContain('History:');
});

it('does not surface executions of steps absent from previousSteps (dead branch)', async () => {
const runStore = makeMockRunStore([
{
type: 'condition',
stepIndex: 0,
executionParams: { answer: 'Yes', reasoning: 'LIVE-INPUT' },
executionResult: { answer: 'Yes' },
},
{
type: 'condition',
stepIndex: 5,
executionParams: { answer: 'No', reasoning: 'DEAD-INPUT' },
executionResult: { answer: 'No' },
},
]);
const executor = new TestableExecutor(
makeContext({
previousSteps: [makeHistoryEntry({ stepId: 'cond-1', stepIndex: 0 })],
runStore,
}),
);

const messages = await executor.buildPreviousStepsMessages();
const content = messages[0].content as string;

expect(content).toContain('LIVE-INPUT');
expect(content).not.toContain('DEAD-INPUT');
});
});

describe('execute error handling', () => {
it('converts NoRecordsError to error outcome', async () => {
const executor = new TestableExecutor(makeContext(), new NoRecordsError());
Expand Down
Loading
Loading