diff --git a/packages/workflow-executor/CLAUDE.md b/packages/workflow-executor/CLAUDE.md index 9a3dabe201..1acc1e94f7 100644 --- a/packages/workflow-executor/CLAUDE.md +++ b/packages/workflow-executor/CLAUDE.md @@ -94,6 +94,7 @@ src/ - **Pre-recorded AI decisions** — Record step executors support `preRecordedArgs` in the step definition to bypass AI calls. When provided, executors use the pre-recorded values (display names) directly instead of invoking the AI. Each record step type has its own typed `preRecordedArgs` shape. Validation happens via schema resolution — invalid display names throw `InvalidPreRecordedArgsError`. Partial args are supported: only the provided fields skip AI, the rest still use AI. - **Graceful shutdown** — `stop()` drains in-flight steps before closing resources. The `state` getter exposes the lifecycle: `idle → running → draining → stopped`. `stopTimeoutMs` (default 30s) prevents `stop()` from hanging forever if a step is stuck. The HTTP server stays up during drain so the frontend can still query run status. Signal handling (`SIGTERM`/`SIGINT`) is the consumer's responsibility — the Runner is a library class. - **Structured log context** — `BaseStepExecutor.execute()` stamps every log line with a shared `logCtx` (`runId`, `stepId`, `stepIndex`, `stepType`). Executors with type-specific identifiers add them via the `getExtraLogContext()` hook (default `{}`), keeping the base class free of step-specific knowledge — e.g. `McpStepExecutor` returns `{ mcpServerId, mcpServerName }` so MCP step logs unambiguously identify the targeted server (`mcpServerId` is canonical; `mcpServerName` is the human-readable Record key, not guaranteed unique at the DB level). `mcpServerName` is resolved by `RemoteToolFetcher.fetch()` from the scoped config Record key and forwarded to the executor constructor. +- **Revision-aware history reads** — On revision the orchestrator (server-side) marks the pivot card `revised` and every later entry `cancelled`, then appends clones of the still-valid steps (each clone's `originalStepIndex` points at the step it copies) plus a fresh re-execution of the revised step. Any consumer of `workflowHistory` must restrict to the live path (`!revised && !cancelled`) — skipping this leaks a superseded branch's context into a re-run. To find a step's RunStore execution, resolve own `stepIndex` first, then fall back to `originalStepIndex` (a clone the executor never ran inherits the copied step's record — mirrors the frontend's `carryForwardExecutorDataForCopiedSteps`). Own-index-first is essential: a re-executed step has its own entry, so it must never inherit the superseded original's record. Never key on `stepName` — LinkTo loops can put the same name on the live path twice. - **Boundary validation** — Types that cross a trust boundary (wire from the orchestrator, or mapper output) live under `src/types/validated/` as zod schemas with TS types inferred via `z.infer<>`. Strictness depends on origin: schemas the executor **produces** (mapper output) and **frontend** HTTP bodies use `.strict()` (catch our own bugs / input hygiene); the **orchestrator collection schema** instead **strips** unknown keys and requires only structural fields, with step-specific props optional and asserted at use-time by the consuming executor. This keeps the executor resilient to independent orchestrator drift — we fail at step execution, only when a step genuinely lacks what it needs, never in bulk up front for an unrelated add/remove. Validation runs where data enters (`forest-server-workflow-port.getCollectionSchema`, `run-to-available-step-mapper.toAvailableStepExecution`). On parse failure: throw `DomainValidationError` (extends `WorkflowExecutorError`) → bucketized as malformed (dispatch) or surfaced as a step error (execution). Types outside `validated/` are internal runtime state and not zod-validated. Note: `StepOutcome` is validated when it arrives as input via `previousSteps`; executor outputs are trusted by construction. ## Commands diff --git a/packages/workflow-executor/src/adapters/run-to-available-step-mapper.ts b/packages/workflow-executor/src/adapters/run-to-available-step-mapper.ts index 02c2d51f4d..ba90c1a671 100644 --- a/packages/workflow-executor/src/adapters/run-to-available-step-mapper.ts +++ b/packages/workflow-executor/src/adapters/run-to-available-step-mapper.ts @@ -81,6 +81,7 @@ function tryMapStep(s: ServerStepHistory): Step | null { return { stepDefinition: toStepDefinition(s.stepDefinition), stepOutcome: toStepOutcome(s), + ...(s.originalStepIndex !== undefined && { originalStepIndex: s.originalStepIndex }), }; } catch (err) { // Sub-workflow navigation steps (start-sub-workflow, close-sub-workflow) are not @@ -90,12 +91,14 @@ function tryMapStep(s: ServerStepHistory): Step | null { } } +// Mirrors the orchestrator's own read filter: revised and cancelled entries are not on the +// live path. function toPreviousSteps( history: ServerStepHistory[], pendingStepIndex: number, ): ReadonlyArray { return history - .filter(s => s.done && s.stepIndex < pendingStepIndex) + .filter(s => s.done && !s.revised && !s.cancelled && s.stepIndex < pendingStepIndex) .map(s => tryMapStep(s)) .filter((s): s is Step => s !== null); } diff --git a/packages/workflow-executor/src/adapters/server-types.ts b/packages/workflow-executor/src/adapters/server-types.ts index de4960b388..f0d56a802a 100644 --- a/packages/workflow-executor/src/adapters/server-types.ts +++ b/packages/workflow-executor/src/adapters/server-types.ts @@ -169,6 +169,8 @@ export interface ServerStepHistory { done: boolean; revised?: boolean; cancelled?: boolean; + // On a revision clone, the index of the step it copies — where that step's record lives. + originalStepIndex?: number; context?: Record; childrenWorkflowId?: string; stepDefinition: ServerWorkflowStep; diff --git a/packages/workflow-executor/src/executors/base-step-executor.ts b/packages/workflow-executor/src/executors/base-step-executor.ts index fb91e1549f..90c9bf1f0c 100644 --- a/packages/workflow-executor/src/executors/base-step-executor.ts +++ b/packages/workflow-executor/src/executors/base-step-executor.ts @@ -6,6 +6,7 @@ import type { StepExecutionResult, } from '../types/execution-context'; import type { ConfirmableStepExecutionData, StepExecutionData } from '../types/step-execution-data'; +import type { Step } from '../types/validated/execution'; import type { StepDefinition } from '../types/validated/step-definition'; import type { StepStatus } from '../types/validated/step-outcome'; import type { @@ -283,16 +284,34 @@ export default abstract class BaseStepExecutor { - const execution = allStepExecutions.find(e => e.stepIndex === stepOutcome.stepIndex); + .map(step => { + const execution = BaseStepExecutor.resolveStepExecution(step, allStepExecutions); - return StepSummaryBuilder.build(stepDefinition, stepOutcome, execution); + return StepSummaryBuilder.build(step.stepDefinition, step.stepOutcome, execution); }) .join('\n\n'); return [new SystemMessage(summary)]; } + // A step the executor ran has its execution at its own stepIndex. A revision clone never ran, + // so it inherits the record of the step it copied (originalStepIndex). Own-index-first is what + // stops a re-executed step (which has its own entry) from resurfacing the superseded + // original's record. + protected static resolveStepExecution( + step: Step, + executions: StepExecutionData[], + ): StepExecutionData | undefined { + const own = executions.find(e => e.stepIndex === step.stepOutcome.stepIndex); + if (own) return own; + + if (step.originalStepIndex !== undefined) { + return executions.find(e => e.stepIndex === step.originalStepIndex); + } + + return undefined; + } + private static mergeLeadingSystemMessages(messages: BaseMessage[]): BaseMessage[] { let i = 0; while (i < messages.length && messages[i] instanceof SystemMessage) i += 1; diff --git a/packages/workflow-executor/src/executors/record-step-executor.ts b/packages/workflow-executor/src/executors/record-step-executor.ts index 4430dfacc4..79d7a55c58 100644 --- a/packages/workflow-executor/src/executors/record-step-executor.ts +++ b/packages/workflow-executor/src/executors/record-step-executor.ts @@ -8,6 +8,7 @@ import { z } from 'zod'; import { InvalidAIResponseError, InvalidPreRecordedArgsError, NoRecordsError } from '../errors'; import BaseStepExecutor from './base-step-executor'; +import { StepType } from '../types/validated/step-definition'; export default abstract class RecordStepExecutor< TStep extends StepDefinition = StepDefinition, @@ -46,15 +47,22 @@ export default abstract class RecordStepExecutor< return this.selectRecordRef(records, prompt); } + // Candidate sources for the AI: the base record plus the record each live prior + // load-related step resolved — own stepIndex first, falling back to a clone's + // originalStepIndex. protected async getAvailableRecordRefs(): Promise { const stepExecutions = await this.context.runStore.getStepExecutions(this.context.runId); - const relatedRecords = stepExecutions.flatMap(e => { + const relatedRecords = this.context.previousSteps.flatMap(step => { + if (step.stepDefinition.type !== StepType.LoadRelatedRecord) return []; + + const execution = BaseStepExecutor.resolveStepExecution(step, stepExecutions); + if ( - e.type === 'load-related-record' && - e.executionResult !== undefined && - 'record' in e.executionResult + execution?.type === 'load-related-record' && + execution.executionResult !== undefined && + 'record' in execution.executionResult ) { - return [e.executionResult.record]; + return [execution.executionResult.record]; } return []; diff --git a/packages/workflow-executor/src/types/validated/execution.ts b/packages/workflow-executor/src/types/validated/execution.ts index 43b5c0c06a..0d215c84d3 100644 --- a/packages/workflow-executor/src/types/validated/execution.ts +++ b/packages/workflow-executor/src/types/validated/execution.ts @@ -25,6 +25,10 @@ export const StepSchema = z .object({ stepDefinition: StepDefinitionSchema, stepOutcome: StepOutcomeSchema, + // Set on a revision clone (a still-valid step the orchestrator re-injects); points at the + // step it copies. The executor never ran the clone, so its record lives at that index. + // Absent for steps the executor ran itself. + originalStepIndex: z.number().int().nonnegative().optional(), }) .strict(); export type Step = z.infer; diff --git a/packages/workflow-executor/test/adapters/run-to-available-step-mapper.test.ts b/packages/workflow-executor/test/adapters/run-to-available-step-mapper.test.ts index 8919712a43..dacf007274 100644 --- a/packages/workflow-executor/test/adapters/run-to-available-step-mapper.test.ts +++ b/packages/workflow-executor/test/adapters/run-to-available-step-mapper.test.ts @@ -501,6 +501,132 @@ describe('toAvailableStepExecution', () => { }); }); + describe('revision handling', () => { + // Revision model (orchestrator): the pivot card is stamped revised:true, every step after + // it is stamped cancelled:true, and clones of the still-valid card sub-steps are appended + // at the tail with originalStepIndex chaining to the FIRST original. The live path is + // filter(!revised && !cancelled). + + function makeClonedStepHistory( + overrides: Partial, + originalStepIndex: number, + ): ServerStepHistory { + return { ...makeStepHistory(overrides), originalStepIndex }; + } + + /** + * Canonical single-revision scenario (sub B1 of card B revised): + * idx 0 trunk task done ← live + * idx 1 card B done, revised ← pivot anchor (dead) + * idx 2 sub B1 done, cancelled ← dead branch + * idx 3 card C done, cancelled ← dead branch + * idx 4 card B clone done, originalStepIndex 1 ← live + * idx 5 sub B1 re-exec pending ← current + */ + function makeRevisedRun(): ServerHydratedWorkflowRun { + return makeRun({ + workflowHistory: [ + makeStepHistory({ stepName: 'trunk', stepIndex: 0, done: true }), + makeStepHistory({ stepName: 'card-b', stepIndex: 1, done: true, revised: true }), + makeStepHistory({ stepName: 'sub-b1', stepIndex: 2, done: true, cancelled: true }), + makeStepHistory({ stepName: 'card-c', stepIndex: 3, done: true, cancelled: true }), + makeClonedStepHistory({ stepName: 'card-b', stepIndex: 4, done: true }, 1), + makeClonedStepHistory({ stepName: 'sub-b1', stepIndex: 5, done: false }, 2), + ], + }); + } + + it('excludes cancelled steps from previousSteps', () => { + const run = makeRun({ + workflowHistory: [ + makeStepHistory({ stepName: 's0', stepIndex: 0, done: true }), + makeStepHistory({ stepName: 's1', stepIndex: 1, done: true, cancelled: true }), + makeStepHistory({ stepName: 's2', stepIndex: 2, done: false }), + ], + }); + + const result = toAvailableStepExecution(run); + + expect(result?.previousSteps).toHaveLength(1); + expect(result?.previousSteps[0].stepOutcome.stepId).toBe('s0'); + }); + + it('excludes the revised anchor while keeping its live clone', () => { + const result = toAvailableStepExecution(makeRevisedRun()); + + const indexes = result?.previousSteps.map(s => s.stepOutcome.stepIndex); + expect(indexes).toEqual([0, 4]); + }); + + it('returns empty previousSteps when the entry point is revised', () => { + // Entry-point revision: the pivot IS the first step, so nothing valid precedes the + // re-execution — context must collapse to a clean slate. + const run = makeRun({ + workflowHistory: [ + makeStepHistory({ stepName: 'entry', stepIndex: 0, done: true, revised: true }), + makeStepHistory({ stepName: 's1', stepIndex: 1, done: true, cancelled: true }), + makeClonedStepHistory({ stepName: 'entry', stepIndex: 2, done: false }, 0), + ], + }); + + const result = toAvailableStepExecution(run); + + expect(result?.previousSteps).toEqual([]); + }); + + it('does not attempt to map dead-branch steps (filtering precedes mapping)', () => { + // A cancelled step with an unmappable definition must not fail the run — it is dead. + const run = makeRun({ + workflowHistory: [ + makeStepHistory({ + stepName: 's0', + stepIndex: 0, + done: true, + cancelled: true, + stepDefinition: makeTaskStepDef({ + taskType: 'unknown-future-type' as ServerTaskTypeEnum, + title: 't', + }), + }), + makeStepHistory({ stepName: 's1', stepIndex: 1, done: false }), + ], + }); + + const result = toAvailableStepExecution(run); + + expect(result?.previousSteps).toEqual([]); + }); + + describe('originalStepIndex', () => { + it('is absent for a step the executor ran itself (no revision)', () => { + const run = makeRun({ + workflowHistory: [ + makeStepHistory({ stepName: 's0', stepIndex: 0, done: true }), + makeStepHistory({ stepName: 's1', stepIndex: 1, done: false }), + ], + }); + + const result = toAvailableStepExecution(run); + + expect(result?.previousSteps).toHaveLength(1); + expect(result?.previousSteps[0]).not.toHaveProperty('originalStepIndex'); + }); + + it('is set on a clone, pointing at the step it copies', () => { + const result = toAvailableStepExecution(makeRevisedRun()); + + // previousSteps = [trunk (ran, no original), card-b clone (copy of idx 1)] + expect(result?.previousSteps[0]).not.toHaveProperty('originalStepIndex'); + expect(result?.previousSteps[1]).toEqual( + expect.objectContaining({ + stepOutcome: expect.objectContaining({ stepIndex: 4 }), + originalStepIndex: 1, + }), + ); + }); + }); + }); + describe('user mapping', () => { it('should map server userProfile to StepUser with null → empty string', () => { const profile: ServerUserProfile = { diff --git a/packages/workflow-executor/test/executors/base-step-executor.test.ts b/packages/workflow-executor/test/executors/base-step-executor.test.ts index bef89c9760..0b210265f7 100644 --- a/packages/workflow-executor/test/executors/base-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/base-step-executor.test.ts @@ -4,6 +4,7 @@ import type { RunStore } from '../../src/ports/run-store'; import type { ExecutionContext, StepExecutionResult } from '../../src/types/execution-context'; import type { StepExecutionData } from '../../src/types/step-execution-data'; import type { RecordRef } from '../../src/types/validated/collection'; +import type { Step } from '../../src/types/validated/execution'; import type { StepDefinition } from '../../src/types/validated/step-definition'; import type { BaseStepStatus, StepOutcome } from '../../src/types/validated/step-outcome'; import type { BaseMessage, DynamicStructuredTool } from '@forestadmin/ai-proxy'; @@ -211,6 +212,119 @@ describe('BaseStepExecutor', () => { }); }); + describe('previous-steps summary after revision', () => { + // A revision clone never ran under its own (new) stepIndex — its execution lives at the + // copied step's index (originalStepIndex). The summary resolves own-index first, then the + // copied step; own-first is what stops a re-executed step from resurfacing the superseded + // original's execution detail. + + function makeCloneEntry( + overrides: { stepId?: string; stepIndex?: number; prompt?: string }, + originalStepIndex: number, + ): Step { + return { ...makeHistoryEntry(overrides), originalStepIndex }; + } + + it("resolves a clone summary from the copied step's execution", async () => { + const runStore = makeMockRunStore([ + { + type: 'condition', + stepIndex: 3, + executionParams: { answer: 'Yes', reasoning: 'REASONING-AT-IDX-3' }, + executionResult: { answer: 'Yes' }, + }, + ]); + const executor = new TestableExecutor( + makeContext({ + // clone runs under idx 7 but copies idx 3; the executor never ran idx 7 + previousSteps: [makeCloneEntry({ stepId: 'cond-1', stepIndex: 7 }, 3)], + runStore, + }), + ); + + const messages = await executor.buildPreviousStepsMessages(); + const content = messages[0].content as string; + + expect(content).toContain('Step "cond-1"'); + expect(content).toContain('REASONING-AT-IDX-3'); + }); + + it("uses the step's own execution, never the copied original's, when both exist", async () => { + const runStore = makeMockRunStore([ + { + type: 'condition', + stepIndex: 3, + executionParams: { answer: 'No', reasoning: 'SUPERSEDED-ORIGINAL' }, + executionResult: { answer: 'No' }, + }, + { + type: 'condition', + stepIndex: 7, + executionParams: { answer: 'Yes', reasoning: 'OWN-FRESH' }, + executionResult: { answer: 'Yes' }, + }, + ]); + const executor = new TestableExecutor( + makeContext({ + // a re-executed step carries originalStepIndex but has its OWN execution at idx 7 + previousSteps: [makeCloneEntry({ stepId: 'cond-1', stepIndex: 7 }, 3)], + runStore, + }), + ); + + const messages = await executor.buildPreviousStepsMessages(); + const content = messages[0].content as string; + + expect(content).toContain('OWN-FRESH'); + expect(content).not.toContain('SUPERSEDED-ORIGINAL'); + }); + + it('falls back to outcome History when neither own nor copied step has an execution', async () => { + const runStore = makeMockRunStore([]); + const executor = new TestableExecutor( + makeContext({ + previousSteps: [makeCloneEntry({ stepId: 'cond-1', stepIndex: 7 }, 3)], + runStore, + }), + ); + + const messages = await executor.buildPreviousStepsMessages(); + const content = messages[0].content as string; + + expect(content).toContain('Step "cond-1"'); + expect(content).toContain('History:'); + }); + + it('does not surface executions of steps absent from previousSteps (dead branch)', async () => { + const runStore = makeMockRunStore([ + { + type: 'condition', + stepIndex: 0, + executionParams: { answer: 'Yes', reasoning: 'LIVE-INPUT' }, + executionResult: { answer: 'Yes' }, + }, + { + type: 'condition', + stepIndex: 5, + executionParams: { answer: 'No', reasoning: 'DEAD-INPUT' }, + executionResult: { answer: 'No' }, + }, + ]); + const executor = new TestableExecutor( + makeContext({ + previousSteps: [makeHistoryEntry({ stepId: 'cond-1', stepIndex: 0 })], + runStore, + }), + ); + + const messages = await executor.buildPreviousStepsMessages(); + const content = messages[0].content as string; + + expect(content).toContain('LIVE-INPUT'); + expect(content).not.toContain('DEAD-INPUT'); + }); + }); + describe('execute error handling', () => { it('converts NoRecordsError to error outcome', async () => { const executor = new TestableExecutor(makeContext(), new NoRecordsError()); diff --git a/packages/workflow-executor/test/executors/load-related-record-step-executor.test.ts b/packages/workflow-executor/test/executors/load-related-record-step-executor.test.ts index 5ffe45226c..e8097e85cf 100644 --- a/packages/workflow-executor/test/executors/load-related-record-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/load-related-record-step-executor.test.ts @@ -4,6 +4,7 @@ import type { WorkflowPort } from '../../src/ports/workflow-port'; import type { ExecutionContext } from '../../src/types/execution-context'; import type { LoadRelatedRecordStepExecutionData } from '../../src/types/step-execution-data'; import type { CollectionSchema, RecordData, RecordRef } from '../../src/types/validated/collection'; +import type { Step } from '../../src/types/validated/execution'; import type { LoadRelatedRecordStepDefinition } from '../../src/types/validated/step-definition'; import { AgentPortError, RunStorePortError } from '../../src/errors'; @@ -196,6 +197,23 @@ function makePendingExecution( }; } +function makeLoadRelatedPreviousStep(stepIndex: number, originalStepIndex?: number): Step { + return { + stepDefinition: { + type: StepType.LoadRelatedRecord, + executionType: StepExecutionMode.FullyAutomated, + prompt: 'Load the order', + }, + stepOutcome: { + type: 'record', + stepId: `load-${stepIndex}`, + stepIndex, + status: 'success', + }, + ...(originalStepIndex !== undefined && { originalStepIndex }), + }; +} + describe('LoadRelatedRecordStepExecutor', () => { describe('executionType=FullyAutomated: BelongsTo — load direct (Branch B)', () => { it('fetches 1 related record and returns success', async () => { @@ -2398,7 +2416,14 @@ describe('LoadRelatedRecordStepExecutor', () => { customers: makeCollectionSchema(), orders: ordersSchema, }); - const context = makeContext({ baseRecordRef, model, runStore, workflowPort, agentPort }); + const context = makeContext({ + baseRecordRef, + model, + runStore, + workflowPort, + agentPort, + previousSteps: [makeLoadRelatedPreviousStep(2)], + }); const executor = new LoadRelatedRecordStepExecutor(context); const result = await executor.execute(); @@ -2709,7 +2734,13 @@ describe('LoadRelatedRecordStepExecutor', () => { customers: makeCollectionSchema(), orders: ordersSchema, }); - const context = makeContext({ baseRecordRef, model, runStore, workflowPort }); + const context = makeContext({ + baseRecordRef, + model, + runStore, + workflowPort, + previousSteps: [makeLoadRelatedPreviousStep(2), makeLoadRelatedPreviousStep(3)], + }); const executor = new LoadRelatedRecordStepExecutor(context); await executor.execute(); @@ -2829,4 +2860,55 @@ describe('LoadRelatedRecordStepExecutor', () => { expect(bindTools).toHaveBeenCalled(); }); }); + + describe('record pool after revision', () => { + it('re-executes a revised load step from the base record, not from the dead branch record', async () => { + // Given: the run loaded an owner before the user revised the "Load store" step. The + // owner's execution survives in the RunStore (dead branch), but the cleaned + // previousSteps no longer claims it. + const mockModel = makeMockModel({ relationName: 'Order', reasoning: 'reload' }); + const agentPort = makeMockAgentPort(); + const runStore = makeMockRunStore({ + getStepExecutions: jest.fn().mockResolvedValue([ + { + type: 'load-related-record', + stepIndex: 2, + executionResult: { + relation: { name: 'owner', displayName: 'Owner' }, + record: makeRecordRef({ collectionName: 'owners', recordId: [7], stepIndex: 2 }), + }, + selectedRecordRef: makeRecordRef(), + }, + ]), + }); + const context = makeContext({ + model: mockModel.model, + agentPort, + runStore, + previousSteps: [], + stepDefinition: makeStep({ executionType: StepExecutionMode.FullyAutomated }), + }); + const executor = new LoadRelatedRecordStepExecutor(context); + + // When + const result = await executor.execute(); + + // Then: the pool collapsed to the base record — no select-record round; the relation + // was selected and loaded directly from the base record (previously the dead branch's + // owner was offered as a source instead). + expect(result.stepOutcome.status).toBe('success'); + expect(mockModel.bindTools).toHaveBeenCalledTimes(1); + expect((mockModel.bindTools.mock.calls[0][0][0] as { name: string }).name).toBe( + 'select-relation', + ); + expect(agentPort.getSingleRelatedData).toHaveBeenCalledWith( + expect.objectContaining({ + collection: 'customers', + id: [42], + relation: 'order', + }), + expect.objectContaining({ id: 1 }), + ); + }); + }); }); diff --git a/packages/workflow-executor/test/executors/read-record-step-executor.test.ts b/packages/workflow-executor/test/executors/read-record-step-executor.test.ts index d483ce2d37..e5d789bc2f 100644 --- a/packages/workflow-executor/test/executors/read-record-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/read-record-step-executor.test.ts @@ -3,6 +3,7 @@ import type { RunStore } from '../../src/ports/run-store'; import type { WorkflowPort } from '../../src/ports/workflow-port'; import type { ExecutionContext } from '../../src/types/execution-context'; import type { CollectionSchema, RecordRef } from '../../src/types/validated/collection'; +import type { Step } from '../../src/types/validated/execution'; import type { ReadRecordStepDefinition } from '../../src/types/validated/step-definition'; import { AgentPortError, NoRecordsError, RecordNotFoundError } from '../../src/errors'; @@ -142,6 +143,23 @@ function makeContext( }; } +function makeLoadRelatedPreviousStep(stepIndex: number, originalStepIndex?: number): Step { + return { + stepDefinition: { + type: StepType.LoadRelatedRecord, + executionType: StepExecutionMode.FullyAutomated, + prompt: 'Load the order', + }, + stepOutcome: { + type: 'record', + stepId: `load-${stepIndex}`, + stepIndex, + status: 'success', + }, + ...(originalStepIndex !== undefined && { originalStepIndex }), + }; +} + describe('ReadRecordStepExecutor', () => { describe('single record, single field', () => { it('reads a single field and returns success', async () => { @@ -408,7 +426,13 @@ describe('ReadRecordStepExecutor', () => { customers: makeCollectionSchema(), orders: ordersSchema, }); - const context = makeContext({ baseRecordRef, model, runStore, workflowPort }); + const context = makeContext({ + baseRecordRef, + model, + runStore, + workflowPort, + previousSteps: [makeLoadRelatedPreviousStep(2)], + }); const executor = new ReadRecordStepExecutor(context); const result = await executor.execute(); @@ -501,7 +525,14 @@ describe('ReadRecordStepExecutor', () => { const agentPort = makeMockAgentPort({ orders: { values: { total: 150 } }, }); - const context = makeContext({ baseRecordRef, model, runStore, workflowPort, agentPort }); + const context = makeContext({ + baseRecordRef, + model, + runStore, + workflowPort, + agentPort, + previousSteps: [makeLoadRelatedPreviousStep(2)], + }); const executor = new ReadRecordStepExecutor(context); const result = await executor.execute(); @@ -576,7 +607,13 @@ describe('ReadRecordStepExecutor', () => { orders: ordersSchema, }); const executor = new ReadRecordStepExecutor( - makeContext({ baseRecordRef, model, runStore, workflowPort }), + makeContext({ + baseRecordRef, + model, + runStore, + workflowPort, + previousSteps: [makeLoadRelatedPreviousStep(5)], + }), ); await executor.execute(); @@ -631,7 +668,13 @@ describe('ReadRecordStepExecutor', () => { customers: makeCollectionSchema(), orders: ordersSchema, }); - const context = makeContext({ baseRecordRef, model, runStore, workflowPort }); + const context = makeContext({ + baseRecordRef, + model, + runStore, + workflowPort, + previousSteps: [makeLoadRelatedPreviousStep(1)], + }); const executor = new ReadRecordStepExecutor(context); const result = await executor.execute(); @@ -922,13 +965,18 @@ describe('ReadRecordStepExecutor', () => { { type: 'load-related-record', stepIndex: 1, - executionResult: { record: relatedRef }, + executionResult: { + relation: { name: 'order', displayName: 'Order' }, + record: relatedRef, + }, + selectedRecordRef: makeRecordRef(), }, ]), }); const context = makeContext({ model: mockModel.model, runStore, + previousSteps: [makeLoadRelatedPreviousStep(1)], stepDefinition: makeStep({ preRecordedArgs: { selectedRecordStepIndex: 1 }, }), @@ -1005,4 +1053,197 @@ describe('ReadRecordStepExecutor', () => { expect(mockModel.bindTools).toHaveBeenCalledTimes(1); }); }); + + describe('record pool after revision', () => { + // After a revision, dead-branch records must leave the pool. A live clone (new stepIndex, + // never run by the executor) inherits its record from the copied step (originalStepIndex); + // a re-executed step uses its OWN entry and never resurfaces the superseded original's + // record — own-index-first resolution. + function makeLoadRelatedExecution(stepIndex: number, recordId: number) { + return { + type: 'load-related-record', + stepIndex, + executionResult: { + relation: { name: 'order', displayName: 'Order' }, + record: makeRecordRef({ collectionName: 'orders', recordId: [recordId], stepIndex }), + }, + selectedRecordRef: makeRecordRef(), + }; + } + + const ordersSchema = () => + makeCollectionSchema({ + collectionName: 'orders', + collectionDisplayName: 'Orders', + fields: [{ fieldName: 'total', displayName: 'Total', isRelationship: false }], + }); + + it('ignores records persisted by dead-branch steps when no live step claims them', async () => { + // Given: the RunStore still holds a record loaded before the revision, but the cleaned + // previousSteps no longer contains any load-related step. + const mockModel = makeMockModel({ fieldNames: ['email'] }); + const runStore = makeMockRunStore({ + getStepExecutions: jest.fn().mockResolvedValue([makeLoadRelatedExecution(2, 99)]), + }); + const agentPort = makeMockAgentPort(); + const context = makeContext({ + model: mockModel.model, + runStore, + agentPort, + previousSteps: [], + }); + const executor = new ReadRecordStepExecutor(context); + + // When + const result = await executor.execute(); + + // Then: the pool collapsed to the base record — no select-record AI round happened, + // and the read targeted the base record, not the stale order. + expect(result.stepOutcome.status).toBe('success'); + expect(mockModel.bindTools).toHaveBeenCalledTimes(1); + expect((mockModel.bindTools.mock.calls[0][0][0] as { name: string }).name).toBe( + 'read-selected-record-fields', + ); + expect(agentPort.getRecord).toHaveBeenCalledTimes(1); + expect(agentPort.getRecord).toHaveBeenCalledWith( + expect.objectContaining({ collection: 'customers', id: [42] }), + expect.objectContaining({ id: 1 }), + ); + }); + + it("resolves a clone's record from the copied step's original RunStore entry", async () => { + // Given: the live previous step is a clone at idx 7 (never run by the executor) that + // copies idx 3, where its execution and record are stored. + const invoke = jest + .fn() + .mockResolvedValueOnce({ + tool_calls: [ + { name: 'select-record', args: { recordIdentifier: 'Step 3 - Orders #99' }, id: 'c1' }, + ], + }) + .mockResolvedValueOnce({ + tool_calls: [ + { name: 'read-selected-record-fields', args: { fieldNames: ['total'] }, id: 'c2' }, + ], + }); + const bindTools = jest.fn().mockReturnValue({ invoke }); + const model = { bindTools } as unknown as ExecutionContext['model']; + + const runStore = makeMockRunStore({ + getStepExecutions: jest.fn().mockResolvedValue([makeLoadRelatedExecution(3, 99)]), + }); + const agentPort = makeMockAgentPort({ orders: { values: { total: 150 } } }); + const context = makeContext({ + model, + runStore, + agentPort, + workflowPort: makeMockWorkflowPort({ + customers: makeCollectionSchema(), + orders: ordersSchema(), + }), + previousSteps: [makeLoadRelatedPreviousStep(7, 3)], + }); + const executor = new ReadRecordStepExecutor(context); + + // When + const result = await executor.execute(); + + // Then: the clone's record was offered and readable. + expect(result.stepOutcome.status).toBe('success'); + expect(agentPort.getRecord).toHaveBeenCalledWith( + expect.objectContaining({ collection: 'orders', id: [99] }), + expect.objectContaining({ id: 1 }), + ); + }); + + it('does not resurface the copied original record when the re-executed step has no record (Alban repro)', async () => { + // Given: a re-executed load step at idx 10 that copies idx 3 (originalStepIndex 3, where a + // record was loaded before the revision) but produced no record of its own (skipped / + // handled manually — its own entry has no executionResult.record). + const mockModel = makeMockModel({ fieldNames: ['email'] }); + const runStore = makeMockRunStore({ + getStepExecutions: jest + .fn() + .mockResolvedValue([ + makeLoadRelatedExecution(3, 99), + { type: 'load-related-record', stepIndex: 10, executionResult: { skipped: true } }, + ]), + }); + const agentPort = makeMockAgentPort(); + const context = makeContext({ + model: mockModel.model, + runStore, + agentPort, + previousSteps: [makeLoadRelatedPreviousStep(10, 3)], + }); + const executor = new ReadRecordStepExecutor(context); + + // When + const result = await executor.execute(); + + // Then: own-index-first resolves idx 10 (record-less), so the dead idx-3 record never + // enters the pool — it collapses to the base record, no select-record round. + expect(result.stepOutcome.status).toBe('success'); + expect(mockModel.bindTools).toHaveBeenCalledTimes(1); + expect((mockModel.bindTools.mock.calls[0][0][0] as { name: string }).name).toBe( + 'read-selected-record-fields', + ); + expect(agentPort.getRecord).toHaveBeenCalledWith( + expect.objectContaining({ collection: 'customers', id: [42] }), + expect.objectContaining({ id: 1 }), + ); + }); + + it('offers both records when the same step ran twice on the live path (LinkTo loop)', async () => { + // Given: a loop executed the same load step at idx 0 and idx 2 — two distinct live + // instances, two distinct records, each resolved from its own index; neither shadows the other. + const invoke = jest + .fn() + .mockResolvedValueOnce({ + tool_calls: [ + { name: 'select-record', args: { recordIdentifier: 'Step 2 - Orders #77' }, id: 'c1' }, + ], + }) + .mockResolvedValueOnce({ + tool_calls: [ + { name: 'read-selected-record-fields', args: { fieldNames: ['total'] }, id: 'c2' }, + ], + }); + const bindTools = jest.fn().mockReturnValue({ invoke }); + const model = { bindTools } as unknown as ExecutionContext['model']; + + const runStore = makeMockRunStore({ + getStepExecutions: jest + .fn() + .mockResolvedValue([makeLoadRelatedExecution(0, 99), makeLoadRelatedExecution(2, 77)]), + }); + const agentPort = makeMockAgentPort({ orders: { values: { total: 150 } } }); + const context = makeContext({ + model, + runStore, + agentPort, + workflowPort: makeMockWorkflowPort({ + customers: makeCollectionSchema(), + orders: ordersSchema(), + }), + previousSteps: [makeLoadRelatedPreviousStep(0), makeLoadRelatedPreviousStep(2)], + baseRecordRef: makeRecordRef({ stepIndex: 4 }), + }); + const executor = new ReadRecordStepExecutor(context); + + // When + const result = await executor.execute(); + + // Then + const selectTool = bindTools.mock.calls[0][0][0] as { + schema: { shape: { recordIdentifier: { options: string[] } } }; + }; + expect(selectTool.schema.shape.recordIdentifier.options).toEqual([ + 'Step 4 - Customers #42', + 'Step 0 - Orders #99', + 'Step 2 - Orders #77', + ]); + expect(result.stepOutcome.status).toBe('success'); + }); + }); }); diff --git a/packages/workflow-executor/test/executors/trigger-record-action-step-executor.test.ts b/packages/workflow-executor/test/executors/trigger-record-action-step-executor.test.ts index 59648d2751..5c4d417809 100644 --- a/packages/workflow-executor/test/executors/trigger-record-action-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/trigger-record-action-step-executor.test.ts @@ -4,6 +4,7 @@ import type { WorkflowPort } from '../../src/ports/workflow-port'; import type { ExecutionContext } from '../../src/types/execution-context'; import type { TriggerRecordActionStepExecutionData } from '../../src/types/step-execution-data'; import type { CollectionSchema, RecordRef } from '../../src/types/validated/collection'; +import type { Step } from '../../src/types/validated/execution'; import type { TriggerActionStepDefinition } from '../../src/types/validated/step-definition'; import { AgentPortError, RunStorePortError, StepStateError } from '../../src/errors'; @@ -144,6 +145,23 @@ function makeContext( }; } +function makeLoadRelatedPreviousStep(stepIndex: number, originalStepIndex?: number): Step { + return { + stepDefinition: { + type: StepType.LoadRelatedRecord, + executionType: StepExecutionMode.FullyAutomated, + prompt: 'Load the order', + }, + stepOutcome: { + type: 'record', + stepId: `load-${stepIndex}`, + stepIndex, + status: 'success', + }, + ...(originalStepIndex !== undefined && { originalStepIndex }), + }; +} + describe('TriggerRecordActionStepExecutor', () => { describe('executionType=FullyAutomated: trigger direct (Branch B)', () => { it('triggers the action and returns success', async () => { @@ -770,7 +788,14 @@ describe('TriggerRecordActionStepExecutor', () => { orders: ordersSchema, }); const agentPort = makeMockAgentPort(); - const context = makeContext({ baseRecordRef, model, runStore, workflowPort, agentPort }); + const context = makeContext({ + baseRecordRef, + model, + runStore, + workflowPort, + agentPort, + previousSteps: [makeLoadRelatedPreviousStep(2)], + }); const executor = new TriggerRecordActionStepExecutor(context); const result = await executor.execute(); diff --git a/packages/workflow-executor/test/executors/update-record-step-executor.test.ts b/packages/workflow-executor/test/executors/update-record-step-executor.test.ts index 48becbae9f..6b6cc4e759 100644 --- a/packages/workflow-executor/test/executors/update-record-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/update-record-step-executor.test.ts @@ -4,6 +4,7 @@ import type { WorkflowPort } from '../../src/ports/workflow-port'; import type { ExecutionContext } from '../../src/types/execution-context'; import type { UpdateRecordStepExecutionData } from '../../src/types/step-execution-data'; import type { CollectionSchema, RecordRef } from '../../src/types/validated/collection'; +import type { Step } from '../../src/types/validated/execution'; import type { UpdateRecordStepDefinition } from '../../src/types/validated/step-definition'; import { AgentPortError, RunStorePortError, StepStateError } from '../../src/errors'; @@ -141,6 +142,23 @@ function makeContext( }; } +function makeLoadRelatedPreviousStep(stepIndex: number, originalStepIndex?: number): Step { + return { + stepDefinition: { + type: StepType.LoadRelatedRecord, + executionType: StepExecutionMode.FullyAutomated, + prompt: 'Load the order', + }, + stepOutcome: { + type: 'record', + stepId: `load-${stepIndex}`, + stepIndex, + status: 'success', + }, + ...(originalStepIndex !== undefined && { originalStepIndex }), + }; +} + describe('UpdateRecordStepExecutor', () => { describe('executionType=FullyAutomated: update direct (Branch B)', () => { it('updates the record and returns success', async () => { @@ -546,7 +564,13 @@ describe('UpdateRecordStepExecutor', () => { customers: makeCollectionSchema(), orders: ordersSchema, }); - const context = makeContext({ baseRecordRef, model, runStore, workflowPort }); + const context = makeContext({ + baseRecordRef, + model, + runStore, + workflowPort, + previousSteps: [makeLoadRelatedPreviousStep(2)], + }); const executor = new UpdateRecordStepExecutor(context); const result = await executor.execute();