From 791af148dd9aea615a896f5c748b9d60bab29347 Mon Sep 17 00:00:00 2001 From: Pierre Merlet Date: Tue, 2 Jun 2026 17:49:39 +0200 Subject: [PATCH 1/8] refactor(workflow-executor): rebuild field displayNames dynamically Persist only technical names (fieldName / action name) in step execution data and re-derive displayName from the collection schema at read time, so labels edited in the rendering never go stale on replay or resume. - step-execution-data: strip displayName from persisted refs; add a Displayed*/Hydrated view union for read-time consumption - executors (update/trigger/load-related/read): stop persisting displayName - add hydrateStepExecutionData() + makeSchemaGetter(), used by runner.getRunStepExecutions and the AI step summaries - drop displayName from the load-related-record PATCH validator No DB migration: execution data is a single JSON column and old rows with a leftover displayName still read fine (the technical name is the source of truth). fixes PRD-426 Co-Authored-By: Claude Opus 4.8 --- .../src/executors/base-step-executor.ts | 21 +- .../load-related-record-step-executor.ts | 17 +- .../executors/read-record-step-executor.ts | 5 +- .../summary/step-execution-formatters.ts | 8 +- .../executors/summary/step-summary-builder.ts | 4 +- .../trigger-record-action-step-executor.ts | 15 +- .../executors/update-record-step-executor.ts | 6 +- .../src/http/pending-data-validators.ts | 1 - .../src/hydrate-step-execution-data.ts | 135 +++++++++++ packages/workflow-executor/src/index.ts | 9 + packages/workflow-executor/src/runner.ts | 10 +- .../src/types/step-execution-data.ts | 59 ++++- .../load-related-record-step-executor.test.ts | 25 +- .../read-record-step-executor.test.ts | 43 ++-- .../step-execution-formatters.test.ts | 26 +-- .../executors/step-summary-builder.test.ts | 34 +-- ...rigger-record-action-step-executor.test.ts | 21 +- .../update-record-step-executor.test.ts | 22 +- .../test/hydrate-step-execution-data.test.ts | 213 ++++++++++++++++++ .../integration/workflow-execution.test.ts | 4 +- .../workflow-executor/test/runner.test.ts | 32 +++ .../test/stores/database-store.test.ts | 4 +- 22 files changed, 561 insertions(+), 153 deletions(-) create mode 100644 packages/workflow-executor/src/hydrate-step-execution-data.ts create mode 100644 packages/workflow-executor/test/hydrate-step-execution-data.test.ts diff --git a/packages/workflow-executor/src/executors/base-step-executor.ts b/packages/workflow-executor/src/executors/base-step-executor.ts index 8f6de37859..7d98742704 100644 --- a/packages/workflow-executor/src/executors/base-step-executor.ts +++ b/packages/workflow-executor/src/executors/base-step-executor.ts @@ -25,6 +25,7 @@ import { extractErrorMessage, } from '../errors'; import patchBodySchemas from '../http/pending-data-validators'; +import hydrateStepExecutionData, { makeSchemaGetter } from '../hydrate-step-execution-data'; import StepSummaryBuilder from './summary/step-summary-builder'; type WithPendingData = StepExecutionData & { pendingData?: object }; @@ -288,15 +289,23 @@ export default abstract class BaseStepExecutor { + const getSchema = makeSchemaGetter( + this.context.schemaCache, + this.context.workflowPort, + this.context.runId, + ); + const summaries = await Promise.all( + this.context.previousSteps.map(async ({ stepDefinition, stepOutcome }) => { const execution = allStepExecutions.find(e => e.stepIndex === stepOutcome.stepIndex); + const hydrated = execution + ? await hydrateStepExecutionData(execution, getSchema) + : undefined; - return StepSummaryBuilder.build(stepDefinition, stepOutcome, execution); - }) - .join('\n\n'); + return StepSummaryBuilder.build(stepDefinition, stepOutcome, hydrated); + }), + ); - return [new SystemMessage(summary)]; + return [new SystemMessage(summaries.join('\n\n'))]; } protected async invokeWithTools>( diff --git a/packages/workflow-executor/src/executors/load-related-record-step-executor.ts b/packages/workflow-executor/src/executors/load-related-record-step-executor.ts index 23d42be8c5..3094e91258 100644 --- a/packages/workflow-executor/src/executors/load-related-record-step-executor.ts +++ b/packages/workflow-executor/src/executors/load-related-record-step-executor.ts @@ -100,7 +100,6 @@ export default class LoadRelatedRecordStepExecutor extends RecordStepExecutor { - const { selectedRecordRef, name, displayName } = target; + const { selectedRecordRef, name } = target; const { relatedData, bestIndex, suggestedFields } = await this.selectBestFromRelatedData( target, @@ -121,7 +120,7 @@ export default class LoadRelatedRecordStepExecutor extends RecordStepExecutor, + target: Pick, existingExecution: LoadRelatedRecordStepExecutionData | undefined, ): Promise { - const { selectedRecordRef, name, displayName } = target; + const { selectedRecordRef, name } = target; await this.context.runStore.saveStepExecution(this.context.runId, { ...existingExecution, type: 'load-related-record', stepIndex: this.context.stepIndex, - executionParams: { displayName, name }, - executionResult: { relation: { name, displayName }, record }, + executionParams: { name }, + executionResult: { relation: { name }, record }, selectedRecordRef, }); diff --git a/packages/workflow-executor/src/executors/read-record-step-executor.ts b/packages/workflow-executor/src/executors/read-record-step-executor.ts index 950f6c6bc5..b331f5c12b 100644 --- a/packages/workflow-executor/src/executors/read-record-step-executor.ts +++ b/packages/workflow-executor/src/executors/read-record-step-executor.ts @@ -64,7 +64,7 @@ export default class ReadRecordStepExecutor extends RecordStepExecutor ({ name, displayName })), + fields: fieldResults.map(({ name }) => ({ name })), }, executionResult: { fields: fieldResults }, selectedRecordRef, @@ -129,12 +129,11 @@ export default class ReadRecordStepExecutor extends RecordStepExecutor { const field = this.findField(schema, name); - if (!field) return { error: `Field not found: ${name}`, name, displayName: name }; + if (!field) return { error: `Field not found: ${name}`, name }; return { value: values[field.fieldName], name: field.fieldName, - displayName: field.displayName, }; }); } diff --git a/packages/workflow-executor/src/executors/summary/step-execution-formatters.ts b/packages/workflow-executor/src/executors/summary/step-execution-formatters.ts index c9df62c0a8..5b9dd78e0a 100644 --- a/packages/workflow-executor/src/executors/summary/step-execution-formatters.ts +++ b/packages/workflow-executor/src/executors/summary/step-execution-formatters.ts @@ -1,14 +1,14 @@ import type { + DisplayedLoadRelatedRecordStepExecutionData, GuidanceStepExecutionData, - LoadRelatedRecordStepExecutionData, + HydratedStepExecutionData, McpStepExecutionData, - StepExecutionData, } from '../../types/step-execution-data'; export default class StepExecutionFormatters { // Returns null when no custom format is defined for the step type or when execution data // doesn't satisfy formatter preconditions — caller falls back to generic Input:/Output:. - static format(execution: StepExecutionData): string | null { + static format(execution: HydratedStepExecutionData): string | null { switch (execution.type) { case 'load-related-record': return StepExecutionFormatters.formatLoadRelatedRecord(execution); @@ -42,7 +42,7 @@ export default class StepExecutionFormatters { } private static formatLoadRelatedRecord( - execution: LoadRelatedRecordStepExecutionData, + execution: DisplayedLoadRelatedRecordStepExecutionData, ): string | null { const { executionResult } = execution; diff --git a/packages/workflow-executor/src/executors/summary/step-summary-builder.ts b/packages/workflow-executor/src/executors/summary/step-summary-builder.ts index aafbbb6013..23031e8999 100644 --- a/packages/workflow-executor/src/executors/summary/step-summary-builder.ts +++ b/packages/workflow-executor/src/executors/summary/step-summary-builder.ts @@ -1,4 +1,4 @@ -import type { StepExecutionData } from '../../types/step-execution-data'; +import type { HydratedStepExecutionData } from '../../types/step-execution-data'; import type { StepDefinition } from '../../types/validated/step-definition'; import type { StepOutcome } from '../../types/validated/step-outcome'; @@ -8,7 +8,7 @@ export default class StepSummaryBuilder { static build( step: StepDefinition, stepOutcome: StepOutcome, - execution: StepExecutionData | undefined, + execution: HydratedStepExecutionData | undefined, ): string { const prompt = step.prompt ?? '(no prompt)'; const header = `Step "${stepOutcome.stepId}" (index ${stepOutcome.stepIndex}):`; diff --git a/packages/workflow-executor/src/executors/trigger-record-action-step-executor.ts b/packages/workflow-executor/src/executors/trigger-record-action-step-executor.ts index 4b75633601..4c463e37c7 100644 --- a/packages/workflow-executor/src/executors/trigger-record-action-step-executor.ts +++ b/packages/workflow-executor/src/executors/trigger-record-action-step-executor.ts @@ -81,7 +81,6 @@ export default class TriggerRecordActionStepExecutor extends RecordStepExecutor< const target: ActionTarget = { selectedRecordRef, - displayName: pendingData.displayName, name: pendingData.name, }; @@ -109,7 +108,7 @@ export default class TriggerRecordActionStepExecutor extends RecordStepExecutor< ? { actionName: preRecordedArgs.actionDisplayName } : await this.selectAction(schema, step.prompt); const name = this.resolveActionName(schema, args.actionName); - const target: ActionTarget = { selectedRecordRef, displayName: args.actionName, name }; + const target: ActionTarget = { selectedRecordRef, name }; // Branch B -- automaticExecution: executor runs the action itself, so it cannot // handle forms (no UI to fill them). Reject form-bearing actions here. When the @@ -123,7 +122,7 @@ export default class TriggerRecordActionStepExecutor extends RecordStepExecutor< }, this.context.user, ); - if (hasForm) throw new UnsupportedActionFormError(target.displayName); + if (hasForm) throw new UnsupportedActionFormError(args.actionName); return this.executeOnExecutor(target); } @@ -132,7 +131,7 @@ export default class TriggerRecordActionStepExecutor extends RecordStepExecutor< await this.context.runStore.saveStepExecution(this.context.runId, { type: 'trigger-action', stepIndex: this.context.stepIndex, - pendingData: { displayName: target.displayName, name: target.name }, + pendingData: { name: target.name }, selectedRecordRef: target.selectedRecordRef, }); @@ -141,7 +140,7 @@ export default class TriggerRecordActionStepExecutor extends RecordStepExecutor< /** Branch B — executor runs the action via agentPort, then persists the result. */ private async executeOnExecutor(target: ActionTarget): Promise { - const { selectedRecordRef, displayName, name } = target; + const { selectedRecordRef, name } = target; await this.context.runStore.saveStepExecution(this.context.runId, { type: 'trigger-action', @@ -162,7 +161,7 @@ export default class TriggerRecordActionStepExecutor extends RecordStepExecutor< await this.context.runStore.saveStepExecution(this.context.runId, { type: 'trigger-action', stepIndex: this.context.stepIndex, - executionParams: { displayName, name }, + executionParams: { name }, executionResult: { success: true, actionResult }, selectedRecordRef, idempotencyPhase: 'done', @@ -177,13 +176,13 @@ export default class TriggerRecordActionStepExecutor extends RecordStepExecutor< actionResult: unknown, existingExecution: TriggerRecordActionStepExecutionData, ): Promise { - const { selectedRecordRef, displayName, name } = target; + const { selectedRecordRef, name } = target; await this.context.runStore.saveStepExecution(this.context.runId, { ...existingExecution, type: 'trigger-action', stepIndex: this.context.stepIndex, - executionParams: { displayName, name }, + executionParams: { name }, executionResult: { success: true, actionResult }, selectedRecordRef, }); diff --git a/packages/workflow-executor/src/executors/update-record-step-executor.ts b/packages/workflow-executor/src/executors/update-record-step-executor.ts index f27b17783b..14d9eb2cc2 100644 --- a/packages/workflow-executor/src/executors/update-record-step-executor.ts +++ b/packages/workflow-executor/src/executors/update-record-step-executor.ts @@ -175,7 +175,6 @@ export default class UpdateRecordStepExecutor extends RecordStepExecutor { - const { selectedRecordRef, displayName, name, value } = target; + const { selectedRecordRef, name, value } = target; await this.context.runStore.saveStepExecution(this.context.runId, { ...existingExecution, @@ -228,7 +226,7 @@ export default class UpdateRecordStepExecutor extends RecordStepExecutor> // User may intentionally switch to a different relation than the one the AI selected. // The executor re-derives relatedCollectionName from FieldSchema when processing the confirmation. name: z.string().optional(), - displayName: z.string().optional(), // User may override the AI-selected record; must be non-empty when provided. selectedRecordId: z .array(z.union([z.string(), z.number()])) diff --git a/packages/workflow-executor/src/hydrate-step-execution-data.ts b/packages/workflow-executor/src/hydrate-step-execution-data.ts new file mode 100644 index 0000000000..9fec69cd30 --- /dev/null +++ b/packages/workflow-executor/src/hydrate-step-execution-data.ts @@ -0,0 +1,135 @@ +import type { WorkflowPort } from './ports/workflow-port'; +import type SchemaCache from './schema-cache'; +import type { + DisplayedLoadRelatedRecordStepExecutionData, + HydratedStepExecutionData, + LoadRelatedRecordStepExecutionData, + StepExecutionData, +} from './types/step-execution-data'; +import type { CollectionSchema } from './types/validated/collection'; + +// displayName is not persisted — it is rebuilt from the current schema so labels never go stale. + +export type SchemaGetter = (collectionName: string) => Promise; + +export function makeSchemaGetter( + schemaCache: SchemaCache, + workflowPort: WorkflowPort, + runId: string, +): SchemaGetter { + return async (collectionName: string) => { + const cached = schemaCache.get(collectionName); + if (cached) return cached; + + const schema = await workflowPort.getCollectionSchema(collectionName, runId); + schemaCache.set(collectionName, schema); + + return schema; + }; +} + +function fieldDisplayName(schema: CollectionSchema | null, name: string): string { + return schema?.fields.find(f => f.fieldName === name)?.displayName ?? name; +} + +function actionDisplayName(schema: CollectionSchema | null, name: string): string { + return schema?.actions.find(a => a.name === name)?.displayName ?? name; +} + +function hydrateRelationResult( + result: LoadRelatedRecordStepExecutionData['executionResult'], + schema: CollectionSchema | null, +): DisplayedLoadRelatedRecordStepExecutionData['executionResult'] { + if (result === undefined) return undefined; + if ('skipped' in result) return result; + + return { + ...result, + relation: { ...result.relation, displayName: fieldDisplayName(schema, result.relation.name) }, + }; +} + +export default async function hydrateStepExecutionData( + execution: StepExecutionData, + getSchema: SchemaGetter, +): Promise { + if ( + execution.type === 'condition' || + execution.type === 'mcp' || + execution.type === 'record' || + execution.type === 'guidance' + ) { + return execution; + } + + // A missing/renamed collection must not break a read — fall back to technical names. + let schema: CollectionSchema | null = null; + + try { + schema = await getSchema(execution.selectedRecordRef.collectionName); + } catch { + schema = null; + } + + switch (execution.type) { + case 'read-record': + return { + ...execution, + executionParams: { + fields: execution.executionParams.fields.map(f => ({ + ...f, + displayName: fieldDisplayName(schema, f.name), + })), + }, + executionResult: { + fields: execution.executionResult.fields.map(f => ({ + ...f, + displayName: fieldDisplayName(schema, f.name), + })), + }, + }; + + case 'update-record': + return { + ...execution, + executionParams: execution.executionParams && { + ...execution.executionParams, + displayName: fieldDisplayName(schema, execution.executionParams.name), + }, + pendingData: execution.pendingData && { + ...execution.pendingData, + displayName: fieldDisplayName(schema, execution.pendingData.name), + }, + }; + + case 'trigger-action': + return { + ...execution, + executionParams: execution.executionParams && { + ...execution.executionParams, + displayName: actionDisplayName(schema, execution.executionParams.name), + }, + pendingData: execution.pendingData && { + ...execution.pendingData, + displayName: actionDisplayName(schema, execution.pendingData.name), + }, + }; + + case 'load-related-record': + return { + ...execution, + pendingData: execution.pendingData && { + ...execution.pendingData, + displayName: fieldDisplayName(schema, execution.pendingData.name), + }, + executionParams: execution.executionParams && { + ...execution.executionParams, + displayName: fieldDisplayName(schema, execution.executionParams.name), + }, + executionResult: hydrateRelationResult(execution.executionResult, schema), + }; + + default: + return execution; + } +} diff --git a/packages/workflow-executor/src/index.ts b/packages/workflow-executor/src/index.ts index 607cf2fe9c..469f162d17 100644 --- a/packages/workflow-executor/src/index.ts +++ b/packages/workflow-executor/src/index.ts @@ -40,6 +40,15 @@ export type { GuidanceStepExecutionData, ExecutedStepExecutionData, StepExecutionData, + DisplayedFieldRef, + DisplayedActionRef, + DisplayedRelationRef, + DisplayedFieldReadResult, + DisplayedReadRecordStepExecutionData, + DisplayedUpdateRecordStepExecutionData, + DisplayedTriggerRecordActionStepExecutionData, + DisplayedLoadRelatedRecordStepExecutionData, + HydratedStepExecutionData, } from './types/step-execution-data'; export type { diff --git a/packages/workflow-executor/src/runner.ts b/packages/workflow-executor/src/runner.ts index d43df7562a..b685b552da 100644 --- a/packages/workflow-executor/src/runner.ts +++ b/packages/workflow-executor/src/runner.ts @@ -12,7 +12,7 @@ import type { } from './ports/workflow-port'; import type SchemaCache from './schema-cache'; import type { AvailableStepExecution, StepExecutionResult } from './types/execution-context'; -import type { StepExecutionData } from './types/step-execution-data'; +import type { HydratedStepExecutionData } from './types/step-execution-data'; import type { StepOutcome } from './types/validated/step-outcome'; import type { RemoteTool } from '@forestadmin/ai-proxy'; @@ -25,6 +25,7 @@ import { extractErrorMessage, } from './errors'; import StepExecutorFactory from './executors/step-executor-factory'; +import hydrateStepExecutionData, { makeSchemaGetter } from './hydrate-step-execution-data'; import InFlightRunRegistry from './in-flight-run-registry'; import { stepTypeToOutcomeType } from './types/validated/step-outcome'; import validateSecrets from './validate-secrets'; @@ -155,8 +156,11 @@ export default class Runner { } } - async getRunStepExecutions(runId: string): Promise { - return this.config.runStore.getStepExecutions(runId); + async getRunStepExecutions(runId: string): Promise { + const executions = await this.config.runStore.getStepExecutions(runId); + const getSchema = makeSchemaGetter(this.config.schemaCache, this.config.workflowPort, runId); + + return Promise.all(executions.map(execution => hydrateStepExecutionData(execution, getSchema))); } async triggerPoll( diff --git a/packages/workflow-executor/src/types/step-execution-data.ts b/packages/workflow-executor/src/types/step-execution-data.ts index da1bce172c..db168f0bcc 100644 --- a/packages/workflow-executor/src/types/step-execution-data.ts +++ b/packages/workflow-executor/src/types/step-execution-data.ts @@ -24,9 +24,10 @@ export interface ConditionStepExecutionData extends BaseStepExecutionData { // -- Shared -- +// Persisted refs hold the technical name only; displayName is re-derived from the schema at read +// time (see hydrate-step-execution-data.ts). export interface FieldRef { name: string; - displayName: string; } // -- Read Record -- @@ -63,14 +64,12 @@ export interface UpdateRecordStepExecutionData extends MutatingStepExecutionData export interface ActionRef { name: string; - displayName: string; } // Intentionally separate from ActionRef/FieldRef: expected to gain relation-specific // fields (e.g. relationType) in a future iteration. export interface RelationRef { name: string; - displayName: string; } export interface TriggerRecordActionStepExecutionData extends MutatingStepExecutionData { @@ -153,3 +152,57 @@ export type StepExecutionData = | GuidanceStepExecutionData; export type ExecutedStepExecutionData = StepExecutionData; + +// -- Hydrated view (displayName re-derived from the schema at read time) -- + +export interface DisplayedFieldRef extends FieldRef { + displayName: string; +} + +export interface DisplayedActionRef extends ActionRef { + displayName: string; +} + +export interface DisplayedRelationRef extends RelationRef { + displayName: string; +} + +export type DisplayedFieldReadResult = FieldReadResult & { displayName: string }; + +export interface DisplayedReadRecordStepExecutionData + extends Omit { + executionParams: { fields: DisplayedFieldRef[] }; + executionResult: { fields: DisplayedFieldReadResult[] }; +} + +export interface DisplayedUpdateRecordStepExecutionData + extends Omit { + executionParams?: DisplayedFieldRef & { value: unknown }; + pendingData?: DisplayedFieldRef & { value: unknown; userConfirmed?: boolean }; +} + +export interface DisplayedTriggerRecordActionStepExecutionData + extends Omit { + executionParams?: DisplayedActionRef; + pendingData?: DisplayedActionRef & { userConfirmed?: boolean; actionResult?: unknown }; +} + +export interface DisplayedLoadRelatedRecordStepExecutionData + extends Omit< + LoadRelatedRecordStepExecutionData, + 'executionParams' | 'executionResult' | 'pendingData' + > { + pendingData?: LoadRelatedRecordPendingData & { displayName: string }; + executionParams?: DisplayedRelationRef; + executionResult?: { relation: DisplayedRelationRef; record: RecordRef } | { skipped: true }; +} + +export type HydratedStepExecutionData = + | ConditionStepExecutionData + | DisplayedReadRecordStepExecutionData + | DisplayedUpdateRecordStepExecutionData + | DisplayedTriggerRecordActionStepExecutionData + | RecordStepExecutionData + | DisplayedLoadRelatedRecordStepExecutionData + | McpStepExecutionData + | GuidanceStepExecutionData; diff --git a/packages/workflow-executor/test/executors/load-related-record-step-executor.test.ts b/packages/workflow-executor/test/executors/load-related-record-step-executor.test.ts index e7fdab82ef..a51026e0b4 100644 --- a/packages/workflow-executor/test/executors/load-related-record-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/load-related-record-step-executor.test.ts @@ -163,7 +163,6 @@ function makePendingExecution( type: 'load-related-record', stepIndex: 0, pendingData: { - displayName: 'Order', name: 'order', selectedRecordId: [99], suggestedFields: ['status', 'amount'], @@ -199,7 +198,7 @@ describe('LoadRelatedRecordStepExecutor', () => { expect.objectContaining({ type: 'load-related-record', stepIndex: 0, - executionParams: { displayName: 'Order', name: 'order' }, + executionParams: { name: 'order' }, executionResult: expect.objectContaining({ record: expect.objectContaining({ collectionName: 'orders', @@ -610,7 +609,6 @@ describe('LoadRelatedRecordStepExecutor', () => { type: 'load-related-record', stepIndex: 0, pendingData: { - displayName: 'Order', name: 'order', selectedRecordId: [99], suggestedFields: [], @@ -682,7 +680,6 @@ describe('LoadRelatedRecordStepExecutor', () => { 'run-1', expect.objectContaining({ pendingData: { - displayName: 'Order', name: 'order', selectedRecordId: [2], // record at index 1 suggestedFields: ['status'], @@ -761,7 +758,6 @@ describe('LoadRelatedRecordStepExecutor', () => { const agentPort = makeMockAgentPort(); const execution = makePendingExecution({ pendingData: { - displayName: 'Order', name: 'order', suggestedFields: ['status', 'amount'], selectedRecordId: [99], @@ -782,12 +778,11 @@ describe('LoadRelatedRecordStepExecutor', () => { 'run-1', expect.objectContaining({ type: 'load-related-record', - executionParams: { displayName: 'Order', name: 'order' }, + executionParams: { name: 'order' }, executionResult: expect.objectContaining({ record: expect.objectContaining({ collectionName: 'orders', recordId: [99] }), }), pendingData: expect.objectContaining({ - displayName: 'Order', name: 'order', selectedRecordId: [99], }), @@ -799,7 +794,6 @@ describe('LoadRelatedRecordStepExecutor', () => { const agentPort = makeMockAgentPort(); const execution = makePendingExecution({ pendingData: { - displayName: 'Order', name: 'order', suggestedFields: ['status', 'amount'], selectedRecordId: [42], @@ -842,7 +836,6 @@ describe('LoadRelatedRecordStepExecutor', () => { }); const execution = makePendingExecution({ pendingData: { - displayName: 'Order', name: 'order', suggestedFields: [], selectedRecordId: [99], @@ -883,7 +876,6 @@ describe('LoadRelatedRecordStepExecutor', () => { }); const execution = makePendingExecution({ pendingData: { - displayName: 'Order', name: 'order', suggestedFields: [], selectedRecordId: [99], @@ -928,7 +920,6 @@ describe('LoadRelatedRecordStepExecutor', () => { // User overrode AI's suggestion of 'order' to 'address' via PATCH const execution = makePendingExecution({ pendingData: { - displayName: 'Address', name: 'address', suggestedFields: [], selectedRecordId: [77], @@ -959,7 +950,6 @@ describe('LoadRelatedRecordStepExecutor', () => { const execution = makePendingExecution({ selectedRecordRef: { collectionName: 'customers', recordId: [42], stepIndex: 0 }, pendingData: { - displayName: 'Order', name: 'order', suggestedFields: [], selectedRecordId: [99], @@ -987,7 +977,6 @@ describe('LoadRelatedRecordStepExecutor', () => { const agentPort = makeMockAgentPort(); const execution = makePendingExecution({ pendingData: { - displayName: 'Order', name: 'order', suggestedFields: ['status', 'amount'], selectedRecordId: [99], @@ -1008,7 +997,7 @@ describe('LoadRelatedRecordStepExecutor', () => { 'run-1', expect.objectContaining({ executionResult: { skipped: true }, - pendingData: expect.objectContaining({ displayName: 'Order', name: 'order' }), + pendingData: expect.objectContaining({ name: 'order' }), }), ); }); @@ -1175,7 +1164,6 @@ describe('LoadRelatedRecordStepExecutor', () => { it('returns error outcome when saveStepExecution fails after load (Branch A confirmed)', async () => { const execution = makePendingExecution({ pendingData: { - displayName: 'Order', name: 'order', suggestedFields: ['status', 'amount'], selectedRecordId: [99], @@ -1395,7 +1383,7 @@ describe('LoadRelatedRecordStepExecutor', () => { type: 'load-related-record', stepIndex: 2, executionResult: { - relation: { name: 'order', displayName: 'Order' }, + relation: { name: 'order' }, record: relatedRecord, }, selectedRecordRef: makeRecordRef(), @@ -1424,7 +1412,6 @@ describe('LoadRelatedRecordStepExecutor', () => { 'run-1', expect.objectContaining({ pendingData: expect.objectContaining({ - displayName: 'Invoice', name: 'invoice', selectedRecordId: [55], }), @@ -1543,7 +1530,6 @@ describe('LoadRelatedRecordStepExecutor', () => { it('returns error outcome when saveStepExecution fails on user reject (Branch A)', async () => { const execution = makePendingExecution({ pendingData: { - displayName: 'Order', name: 'order', suggestedFields: ['status', 'amount'], selectedRecordId: [99], @@ -1626,7 +1612,6 @@ describe('LoadRelatedRecordStepExecutor', () => { stepIndex: 3, selectedRecordRef: makeRecordRef(), pendingData: { - displayName: 'Invoice', name: 'invoice', selectedRecordId: [55], }, @@ -1676,7 +1661,7 @@ describe('LoadRelatedRecordStepExecutor', () => { type: 'load-related-record', stepIndex: 2, executionResult: { - relation: { name: 'order', displayName: 'Order' }, + relation: { name: 'order' }, record: completedRecord, }, selectedRecordRef: makeRecordRef(), diff --git a/packages/workflow-executor/test/executors/read-record-step-executor.test.ts b/packages/workflow-executor/test/executors/read-record-step-executor.test.ts index 03e25f4a5b..ad8b35f487 100644 --- a/packages/workflow-executor/test/executors/read-record-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/read-record-step-executor.test.ts @@ -157,9 +157,9 @@ describe('ReadRecordStepExecutor', () => { expect.objectContaining({ type: 'read-record', stepIndex: 0, - executionParams: { fields: [{ name: 'email', displayName: 'Email' }] }, + executionParams: { fields: [{ name: 'email' }] }, executionResult: { - fields: [{ value: 'john@example.com', name: 'email', displayName: 'Email' }], + fields: [{ value: 'john@example.com', name: 'email' }], }, }), ); @@ -180,15 +180,12 @@ describe('ReadRecordStepExecutor', () => { 'run-1', expect.objectContaining({ executionParams: { - fields: [ - { name: 'email', displayName: 'Email' }, - { name: 'name', displayName: 'Full Name' }, - ], + fields: [{ name: 'email' }, { name: 'name' }], }, executionResult: { fields: [ - { value: 'john@example.com', name: 'email', displayName: 'Email' }, - { value: 'John Doe', name: 'name', displayName: 'Full Name' }, + { value: 'john@example.com', name: 'email' }, + { value: 'John Doe', name: 'name' }, ], }, }), @@ -209,9 +206,9 @@ describe('ReadRecordStepExecutor', () => { expect(runStore.saveStepExecution).toHaveBeenCalledWith( 'run-1', expect.objectContaining({ - executionParams: { fields: [{ name: 'name', displayName: 'Full Name' }] }, + executionParams: { fields: [{ name: 'name' }] }, executionResult: { - fields: [{ value: 'John Doe', name: 'name', displayName: 'Full Name' }], + fields: [{ value: 'John Doe', name: 'name' }], }, }), ); @@ -282,11 +279,10 @@ describe('ReadRecordStepExecutor', () => { expect.objectContaining({ executionResult: { fields: [ - { value: 'john@example.com', name: 'email', displayName: 'Email' }, + { value: 'john@example.com', name: 'email' }, { error: 'Field not found: nonexistent', name: 'nonexistent', - displayName: 'nonexistent', }, ], }, @@ -396,7 +392,7 @@ describe('ReadRecordStepExecutor', () => { type: 'load-related-record', stepIndex: 2, executionResult: { - relation: { name: 'order', displayName: 'Order' }, + relation: { name: 'order' }, record: relatedRecord, }, selectedRecordRef: makeRecordRef(), @@ -434,7 +430,7 @@ describe('ReadRecordStepExecutor', () => { 'run-1', expect.objectContaining({ executionResult: { - fields: [{ value: 'john@example.com', name: 'email', displayName: 'Email' }], + fields: [{ value: 'john@example.com', name: 'email' }], }, selectedRecordRef: expect.objectContaining({ recordId: [42], @@ -487,7 +483,7 @@ describe('ReadRecordStepExecutor', () => { type: 'load-related-record', stepIndex: 2, executionResult: { - relation: { name: 'order', displayName: 'Order' }, + relation: { name: 'order' }, record: relatedRecord, }, selectedRecordRef: makeRecordRef(), @@ -511,7 +507,7 @@ describe('ReadRecordStepExecutor', () => { 'run-1', expect.objectContaining({ executionResult: { - fields: [{ value: 150, name: 'total', displayName: 'Total' }], + fields: [{ value: 150, name: 'total' }], }, selectedRecordRef: expect.objectContaining({ recordId: [99], @@ -564,7 +560,7 @@ describe('ReadRecordStepExecutor', () => { type: 'load-related-record', stepIndex: 5, executionResult: { - relation: { name: 'order', displayName: 'Order' }, + relation: { name: 'order' }, record: relatedRecord, }, selectedRecordRef: makeRecordRef(), @@ -620,7 +616,7 @@ describe('ReadRecordStepExecutor', () => { type: 'load-related-record', stepIndex: 1, executionResult: { - relation: { name: 'order', displayName: 'Order' }, + relation: { name: 'order' }, record: relatedRecord, }, selectedRecordRef: makeRecordRef(), @@ -867,15 +863,12 @@ describe('ReadRecordStepExecutor', () => { type: 'read-record', stepIndex: 3, executionParams: { - fields: [ - { name: 'email', displayName: 'Email' }, - { name: 'name', displayName: 'Full Name' }, - ], + fields: [{ name: 'email' }, { name: 'name' }], }, executionResult: { fields: [ - { value: 'john@example.com', name: 'email', displayName: 'Email' }, - { value: 'John Doe', name: 'name', displayName: 'Full Name' }, + { value: 'john@example.com', name: 'email' }, + { value: 'John Doe', name: 'name' }, ], }, selectedRecordRef: { @@ -908,7 +901,7 @@ describe('ReadRecordStepExecutor', () => { 'run-1', expect.objectContaining({ executionResult: { - fields: [{ value: 'john@example.com', name: 'email', displayName: 'Email' }], + fields: [{ value: 'john@example.com', name: 'email' }], }, }), ); diff --git a/packages/workflow-executor/test/executors/step-execution-formatters.test.ts b/packages/workflow-executor/test/executors/step-execution-formatters.test.ts index 9a79847e02..b2d7fcc54b 100644 --- a/packages/workflow-executor/test/executors/step-execution-formatters.test.ts +++ b/packages/workflow-executor/test/executors/step-execution-formatters.test.ts @@ -1,4 +1,4 @@ -import type { StepExecutionData } from '../../src/types/step-execution-data'; +import type { HydratedStepExecutionData } from '../../src/types/step-execution-data'; import StepExecutionFormatters from '../../src/executors/summary/step-execution-formatters'; @@ -6,7 +6,7 @@ describe('StepExecutionFormatters', () => { describe('format', () => { describe('load-related-record', () => { it('returns the full Loaded: line for a completed execution', () => { - const execution: StepExecutionData = { + const execution: HydratedStepExecutionData = { type: 'load-related-record', stepIndex: 1, selectedRecordRef: { collectionName: 'customers', recordId: [42], stepIndex: 0 }, @@ -22,7 +22,7 @@ describe('StepExecutionFormatters', () => { }); it('returns null for a skipped execution', () => { - const execution: StepExecutionData = { + const execution: HydratedStepExecutionData = { type: 'load-related-record', stepIndex: 1, selectedRecordRef: { collectionName: 'customers', recordId: [42], stepIndex: 0 }, @@ -33,7 +33,7 @@ describe('StepExecutionFormatters', () => { }); it('returns null when executionResult is absent (pending phase)', () => { - const execution: StepExecutionData = { + const execution: HydratedStepExecutionData = { type: 'load-related-record', stepIndex: 1, selectedRecordRef: { collectionName: 'customers', recordId: [42], stepIndex: 0 }, @@ -48,7 +48,7 @@ describe('StepExecutionFormatters', () => { }); it('formats composite record IDs joined by ", "', () => { - const execution: StepExecutionData = { + const execution: HydratedStepExecutionData = { type: 'load-related-record', stepIndex: 1, selectedRecordRef: { collectionName: 'customers', recordId: [42, 'abc'], stepIndex: 0 }, @@ -66,7 +66,7 @@ describe('StepExecutionFormatters', () => { describe('mcp', () => { it('returns the Result: line when formattedResponse is present', () => { - const execution: StepExecutionData = { + const execution: HydratedStepExecutionData = { type: 'mcp', stepIndex: 2, executionParams: { @@ -85,7 +85,7 @@ describe('StepExecutionFormatters', () => { }); it('returns a generic Executed: line when formattedResponse is absent', () => { - const execution: StepExecutionData = { + const execution: HydratedStepExecutionData = { type: 'mcp', stepIndex: 2, executionParams: { @@ -102,7 +102,7 @@ describe('StepExecutionFormatters', () => { }); it('returns null when executionResult is absent (pending phase)', () => { - const execution: StepExecutionData = { + const execution: HydratedStepExecutionData = { type: 'mcp', stepIndex: 2, pendingData: { name: 'search_records', sourceId: 'mcp-server-1', input: {} }, @@ -112,7 +112,7 @@ describe('StepExecutionFormatters', () => { }); it('returns null for a skipped execution', () => { - const execution: StepExecutionData = { + const execution: HydratedStepExecutionData = { type: 'mcp', stepIndex: 2, executionResult: { skipped: true }, @@ -124,7 +124,7 @@ describe('StepExecutionFormatters', () => { describe('types without a custom formatter', () => { it('returns null for condition type', () => { - const execution: StepExecutionData = { + const execution: HydratedStepExecutionData = { type: 'condition', stepIndex: 0, executionParams: { answer: 'Yes' }, @@ -135,7 +135,7 @@ describe('StepExecutionFormatters', () => { }); it('returns null for record type', () => { - const execution: StepExecutionData = { + const execution: HydratedStepExecutionData = { type: 'record', stepIndex: 0, executionResult: { success: true }, @@ -147,7 +147,7 @@ describe('StepExecutionFormatters', () => { describe('guidance', () => { it('returns the user input line when executionResult is present', () => { - const execution: StepExecutionData = { + const execution: HydratedStepExecutionData = { type: 'guidance', stepIndex: 0, executionResult: { userInput: 'I called the client and confirmed the delivery date.' }, @@ -159,7 +159,7 @@ describe('StepExecutionFormatters', () => { }); it('returns null when executionResult is absent', () => { - const execution: StepExecutionData = { + const execution: HydratedStepExecutionData = { type: 'guidance', stepIndex: 0, }; diff --git a/packages/workflow-executor/test/executors/step-summary-builder.test.ts b/packages/workflow-executor/test/executors/step-summary-builder.test.ts index 3b1770eb37..fd0a504bac 100644 --- a/packages/workflow-executor/test/executors/step-summary-builder.test.ts +++ b/packages/workflow-executor/test/executors/step-summary-builder.test.ts @@ -1,4 +1,4 @@ -import type { StepExecutionData } from '../../src/types/step-execution-data'; +import type { HydratedStepExecutionData } from '../../src/types/step-execution-data'; import type { StepDefinition } from '../../src/types/validated/step-definition'; import type { StepOutcome } from '../../src/types/validated/step-outcome'; @@ -22,7 +22,7 @@ describe('StepSummaryBuilder', () => { it('renders header, prompt, Input, and Output for a condition step with execution data', () => { const step = makeConditionStep('Approve?'); const outcome = makeConditionOutcome('cond-1', 0); - const execution: StepExecutionData = { + const execution: HydratedStepExecutionData = { type: 'condition', stepIndex: 0, executionParams: { answer: 'Yes', reasoning: 'Order is valid' }, @@ -45,7 +45,7 @@ describe('StepSummaryBuilder', () => { stepIndex: 0, status: 'success', }; - const execution: StepExecutionData = { + const execution: HydratedStepExecutionData = { type: 'record', stepIndex: 0, executionResult: { success: true }, @@ -119,7 +119,7 @@ describe('StepSummaryBuilder', () => { stepIndex: 0, status: 'success', }; - const execution: StepExecutionData = { type: 'record', stepIndex: 0 }; + const execution: HydratedStepExecutionData = { type: 'record', stepIndex: 0 }; const result = StepSummaryBuilder.build(step, outcome, execution); @@ -137,7 +137,7 @@ describe('StepSummaryBuilder', () => { stepIndex: 0, status: 'awaiting-input', }; - const execution: StepExecutionData = { + const execution: HydratedStepExecutionData = { type: 'update-record', stepIndex: 0, pendingData: { displayName: 'Status', name: 'status', value: 'active' }, @@ -163,7 +163,7 @@ describe('StepSummaryBuilder', () => { stepIndex: 0, status: 'awaiting-input', }; - const execution: StepExecutionData = { + const execution: HydratedStepExecutionData = { type: 'trigger-action', stepIndex: 0, pendingData: { displayName: 'Archive Customer', name: 'archive' }, @@ -189,7 +189,7 @@ describe('StepSummaryBuilder', () => { stepIndex: 1, status: 'success', }; - const execution: StepExecutionData = { + const execution: HydratedStepExecutionData = { type: 'load-related-record', stepIndex: 1, selectedRecordRef: { collectionName: 'customers', recordId: [42], stepIndex: 0 }, @@ -221,7 +221,7 @@ describe('StepSummaryBuilder', () => { stepIndex: 1, status: 'success', }; - const execution: StepExecutionData = { + const execution: HydratedStepExecutionData = { type: 'load-related-record', stepIndex: 1, selectedRecordRef: { collectionName: 'customers', recordId: [42], stepIndex: 0 }, @@ -245,7 +245,7 @@ describe('StepSummaryBuilder', () => { stepIndex: 1, status: 'awaiting-input', }; - const execution: StepExecutionData = { + const execution: HydratedStepExecutionData = { type: 'load-related-record', stepIndex: 1, selectedRecordRef: { collectionName: 'customers', recordId: [42], stepIndex: 0 }, @@ -277,7 +277,7 @@ describe('StepSummaryBuilder', () => { stepIndex: 0, status: 'success', }; - const execution: StepExecutionData = { + const execution: HydratedStepExecutionData = { type: 'update-record', stepIndex: 0, pendingData: { displayName: 'Status', name: 'status', value: 'active' }, @@ -304,7 +304,7 @@ describe('StepSummaryBuilder', () => { stepIndex: 0, status: 'success', }; - const execution: StepExecutionData = { + const execution: HydratedStepExecutionData = { type: 'trigger-action', stepIndex: 0, pendingData: { displayName: 'Archive Customer', name: 'archive' }, @@ -326,7 +326,7 @@ describe('StepSummaryBuilder', () => { stepIndex: 0, status: 'success', }; - const execution: StepExecutionData = { + const execution: HydratedStepExecutionData = { type: 'update-record', stepIndex: 0, idempotencyPhase: 'done', @@ -348,7 +348,7 @@ describe('StepSummaryBuilder', () => { stepIndex: 0, status: 'awaiting-input', }; - const execution: StepExecutionData = { + const execution: HydratedStepExecutionData = { type: 'update-record', stepIndex: 0, pendingData: { displayName: 'Status', name: 'status', value: 'active' }, @@ -372,7 +372,7 @@ describe('StepSummaryBuilder', () => { stepIndex: 0, status: 'success', }; - const execution: StepExecutionData = { + const execution: HydratedStepExecutionData = { type: 'trigger-action', stepIndex: 0, pendingData: { displayName: 'Archive Customer', name: 'archive' }, @@ -393,7 +393,7 @@ describe('StepSummaryBuilder', () => { stepIndex: 0, status: 'success', }; - const execution: StepExecutionData = { + const execution: HydratedStepExecutionData = { type: 'update-record', stepIndex: 0, pendingData: { displayName: 'Status', name: 'status', value: 'active' }, @@ -417,7 +417,7 @@ describe('StepSummaryBuilder', () => { stepIndex: 1, status: 'success', }; - const execution: StepExecutionData = { + const execution: HydratedStepExecutionData = { type: 'load-related-record', stepIndex: 1, selectedRecordRef: { collectionName: 'customers', recordId: [42], stepIndex: 0 }, @@ -437,7 +437,7 @@ describe('StepSummaryBuilder', () => { it('shows "(no prompt)" when step has no prompt', () => { const step: StepDefinition = { type: StepType.Condition, options: ['A', 'B'] }; const outcome = makeConditionOutcome('cond-1', 0); - const execution: StepExecutionData = { + const execution: HydratedStepExecutionData = { type: 'condition', stepIndex: 0, executionParams: { answer: 'A', reasoning: 'Only option' }, diff --git a/packages/workflow-executor/test/executors/trigger-record-action-step-executor.test.ts b/packages/workflow-executor/test/executors/trigger-record-action-step-executor.test.ts index f2c62ee988..85d4da145b 100644 --- a/packages/workflow-executor/test/executors/trigger-record-action-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/trigger-record-action-step-executor.test.ts @@ -174,7 +174,6 @@ describe('TriggerRecordActionStepExecutor', () => { type: 'trigger-action', stepIndex: 0, executionParams: { - displayName: 'Send Welcome Email', name: 'send-welcome-email', }, executionResult: { success: true, actionResult: { message: 'Email sent' } }, @@ -209,7 +208,6 @@ describe('TriggerRecordActionStepExecutor', () => { type: 'trigger-action', stepIndex: 0, pendingData: { - displayName: 'Send Welcome Email', name: 'send-welcome-email', }, selectedRecordRef: expect.objectContaining({ @@ -246,7 +244,6 @@ describe('TriggerRecordActionStepExecutor', () => { type: 'trigger-action', stepIndex: 0, pendingData: { - displayName: 'Send Welcome Email', name: 'send-welcome-email', userConfirmed: true, actionResult: { success: 'ok', html: '

Email queued

' }, @@ -269,7 +266,6 @@ describe('TriggerRecordActionStepExecutor', () => { expect.objectContaining({ type: 'trigger-action', executionParams: { - displayName: 'Send Welcome Email', name: 'send-welcome-email', }, executionResult: { @@ -277,7 +273,6 @@ describe('TriggerRecordActionStepExecutor', () => { actionResult: { success: 'ok', html: '

Email queued

' }, }, pendingData: { - displayName: 'Send Welcome Email', name: 'send-welcome-email', userConfirmed: true, actionResult: { success: 'ok', html: '

Email queued

' }, @@ -292,7 +287,6 @@ describe('TriggerRecordActionStepExecutor', () => { type: 'trigger-action', stepIndex: 0, pendingData: { - displayName: 'Send Welcome Email', name: 'send-welcome-email', userConfirmed: true, actionResult: null, @@ -323,7 +317,6 @@ describe('TriggerRecordActionStepExecutor', () => { type: 'trigger-action', stepIndex: 0, pendingData: { - displayName: 'Send Welcome Email', name: 'send-welcome-email', userConfirmed: true, }, @@ -353,7 +346,6 @@ describe('TriggerRecordActionStepExecutor', () => { type: 'trigger-action', stepIndex: 0, pendingData: { - displayName: 'Send Welcome Email', name: 'send-welcome-email', userConfirmed: false, }, @@ -374,7 +366,6 @@ describe('TriggerRecordActionStepExecutor', () => { expect.objectContaining({ executionResult: { skipped: true }, pendingData: { - displayName: 'Send Welcome Email', name: 'send-welcome-email', userConfirmed: false, }, @@ -404,7 +395,7 @@ describe('TriggerRecordActionStepExecutor', () => { { type: 'trigger-action', stepIndex: 5, - pendingData: { displayName: 'Send Welcome Email' }, + pendingData: { name: 'send-welcome-email' }, selectedRecordRef: makeRecordRef(), }, ]), @@ -522,7 +513,7 @@ describe('TriggerRecordActionStepExecutor', () => { 'run-1', expect.objectContaining({ type: 'trigger-action', - pendingData: { displayName: 'Send Welcome Email', name: 'send-welcome-email' }, + pendingData: { name: 'send-welcome-email' }, }), ); }); @@ -754,7 +745,7 @@ describe('TriggerRecordActionStepExecutor', () => { type: 'load-related-record', stepIndex: 2, executionResult: { - relation: { name: 'order', displayName: 'Order' }, + relation: { name: 'order' }, record: relatedRecord, }, selectedRecordRef: makeRecordRef(), @@ -783,7 +774,7 @@ describe('TriggerRecordActionStepExecutor', () => { expect(runStore.saveStepExecution).toHaveBeenCalledWith( 'run-1', expect.objectContaining({ - pendingData: { displayName: 'Cancel Order', name: 'cancel-order' }, + pendingData: { name: 'cancel-order' }, selectedRecordRef: expect.objectContaining({ recordId: [99], collectionName: 'orders', @@ -892,7 +883,6 @@ describe('TriggerRecordActionStepExecutor', () => { type: 'trigger-action', stepIndex: 0, pendingData: { - displayName: 'Send Welcome Email', name: 'send-welcome-email', userConfirmed: false, }, @@ -943,7 +933,6 @@ describe('TriggerRecordActionStepExecutor', () => { type: 'trigger-action', stepIndex: 0, pendingData: { - displayName: 'Send Welcome Email', name: 'send-welcome-email', userConfirmed: true, actionResult: { success: 'ok' }, @@ -1105,7 +1094,7 @@ describe('TriggerRecordActionStepExecutor', () => { const doneExecution: TriggerRecordActionStepExecutionData = { type: 'trigger-action', stepIndex: 0, - executionParams: { displayName: 'Send Welcome Email', name: 'send-welcome-email' }, + executionParams: { name: 'send-welcome-email' }, executionResult: { success: true, actionResult: undefined }, selectedRecordRef: makeRecordRef(), idempotencyPhase: 'done', diff --git a/packages/workflow-executor/test/executors/update-record-step-executor.test.ts b/packages/workflow-executor/test/executors/update-record-step-executor.test.ts index 9c1866e092..21359a8189 100644 --- a/packages/workflow-executor/test/executors/update-record-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/update-record-step-executor.test.ts @@ -169,7 +169,7 @@ describe('UpdateRecordStepExecutor', () => { expect.objectContaining({ type: 'update-record', stepIndex: 0, - executionParams: { displayName: 'Status', name: 'status', value: 'active' }, + executionParams: { name: 'status', value: 'active' }, executionResult: { updatedValues }, selectedRecordRef: expect.objectContaining({ collectionName: 'customers', @@ -200,7 +200,7 @@ describe('UpdateRecordStepExecutor', () => { expect.objectContaining({ type: 'update-record', stepIndex: 0, - pendingData: { displayName: 'Status', name: 'status', value: 'active' }, + pendingData: { name: 'status', value: 'active' }, selectedRecordRef: expect.objectContaining({ collectionName: 'customers', recordId: [42], @@ -218,7 +218,6 @@ describe('UpdateRecordStepExecutor', () => { type: 'update-record', stepIndex: 0, pendingData: { - displayName: 'Status', name: 'status', value: 'active', userConfirmed: true, @@ -242,10 +241,9 @@ describe('UpdateRecordStepExecutor', () => { 'run-1', expect.objectContaining({ type: 'update-record', - executionParams: { displayName: 'Status', name: 'status', value: 'active' }, + executionParams: { name: 'status', value: 'active' }, executionResult: { updatedValues }, pendingData: { - displayName: 'Status', name: 'status', value: 'active', userConfirmed: true, @@ -262,7 +260,6 @@ describe('UpdateRecordStepExecutor', () => { type: 'update-record', stepIndex: 0, pendingData: { - displayName: 'Status', name: 'status', value: 'active', userConfirmed: false, @@ -284,7 +281,6 @@ describe('UpdateRecordStepExecutor', () => { expect.objectContaining({ executionResult: { skipped: true }, pendingData: { - displayName: 'Status', name: 'status', value: 'active', userConfirmed: false, @@ -315,7 +311,7 @@ describe('UpdateRecordStepExecutor', () => { { type: 'update-record', stepIndex: 5, - pendingData: { displayName: 'Status', name: 'status', value: 'active' }, + pendingData: { name: 'status', value: 'active' }, selectedRecordRef: makeRecordRef(), }, ]), @@ -413,7 +409,7 @@ describe('UpdateRecordStepExecutor', () => { type: 'load-related-record', stepIndex: 2, executionResult: { - relation: { name: 'order', displayName: 'Order' }, + relation: { name: 'order' }, record: relatedRecord, }, selectedRecordRef: makeRecordRef(), @@ -442,7 +438,6 @@ describe('UpdateRecordStepExecutor', () => { 'run-1', expect.objectContaining({ pendingData: { - displayName: 'Order Status', name: 'status', value: 'shipped', }, @@ -619,7 +614,6 @@ describe('UpdateRecordStepExecutor', () => { type: 'update-record', stepIndex: 0, pendingData: { - displayName: 'Status', name: 'status', value: 'active', userConfirmed: true, @@ -669,7 +663,6 @@ describe('UpdateRecordStepExecutor', () => { type: 'update-record', stepIndex: 0, pendingData: { - displayName: 'Status', name: 'status', value: 'active', userConfirmed: true, @@ -789,7 +782,6 @@ describe('UpdateRecordStepExecutor', () => { type: 'update-record', stepIndex: 0, pendingData: { - displayName: 'Status', name: 'status', value: 'active', userConfirmed: false, @@ -1302,7 +1294,7 @@ describe('UpdateRecordStepExecutor', () => { { type: 'update-record', stepIndex: 0, - pendingData: { displayName: 'Status', name: 'status', value: 'active' }, + pendingData: { name: 'status', value: 'active' }, selectedRecordRef: makeRecordRef(), }, ]), @@ -1331,7 +1323,7 @@ describe('UpdateRecordStepExecutor', () => { const doneExecution: UpdateRecordStepExecutionData = { type: 'update-record', stepIndex: 0, - executionParams: { displayName: 'Status', name: 'status', value: 'active' }, + executionParams: { name: 'status', value: 'active' }, executionResult: { updatedValues: { status: 'active' } }, selectedRecordRef: makeRecordRef(), idempotencyPhase: 'done', diff --git a/packages/workflow-executor/test/hydrate-step-execution-data.test.ts b/packages/workflow-executor/test/hydrate-step-execution-data.test.ts new file mode 100644 index 0000000000..d915004500 --- /dev/null +++ b/packages/workflow-executor/test/hydrate-step-execution-data.test.ts @@ -0,0 +1,213 @@ +import type { SchemaGetter } from '../src/hydrate-step-execution-data'; +import type { WorkflowPort } from '../src/ports/workflow-port'; +import type { StepExecutionData } from '../src/types/step-execution-data'; +import type { CollectionSchema, RecordRef } from '../src/types/validated/collection'; + +import hydrateStepExecutionData, { makeSchemaGetter } from '../src/hydrate-step-execution-data'; +import SchemaCache from '../src/schema-cache'; + +function makeSchema(overrides: Partial = {}): CollectionSchema { + return { + collectionName: 'customers', + collectionDisplayName: 'Customers', + primaryKeyFields: ['id'], + fields: [ + { fieldName: 'status', displayName: 'Status', isRelationship: false }, + { fieldName: 'email', displayName: 'Email', isRelationship: false }, + { + fieldName: 'orders', + displayName: 'Orders', + isRelationship: true, + relationType: 'HasMany', + relatedCollectionName: 'orders', + }, + ], + actions: [{ name: 'send-email', displayName: 'Send Email', endpoint: '/forest/send' }], + ...overrides, + }; +} + +const recordRef: RecordRef = { collectionName: 'customers', recordId: [42], stepIndex: 0 }; + +// A getter that always returns the same schema, recording which collections were requested. +function makeGetter(schema: CollectionSchema = makeSchema()): SchemaGetter & jest.Mock { + return jest.fn().mockResolvedValue(schema) as SchemaGetter & jest.Mock; +} + +describe('hydrateStepExecutionData', () => { + it('re-derives the field displayName for update-record params and pending data', async () => { + const execution: StepExecutionData = { + type: 'update-record', + stepIndex: 1, + selectedRecordRef: recordRef, + executionParams: { name: 'status', value: 'active' }, + pendingData: { name: 'status', value: 'active', userConfirmed: true }, + }; + + const result = await hydrateStepExecutionData(execution, makeGetter()); + + expect(result).toMatchObject({ + type: 'update-record', + executionParams: { name: 'status', displayName: 'Status', value: 'active' }, + pendingData: { name: 'status', displayName: 'Status', value: 'active', userConfirmed: true }, + }); + }); + + it('re-derives the action displayName for trigger-action', async () => { + const execution: StepExecutionData = { + type: 'trigger-action', + stepIndex: 2, + selectedRecordRef: recordRef, + executionParams: { name: 'send-email' }, + pendingData: { name: 'send-email' }, + }; + + const result = await hydrateStepExecutionData(execution, makeGetter()); + + expect(result).toMatchObject({ + executionParams: { name: 'send-email', displayName: 'Send Email' }, + pendingData: { name: 'send-email', displayName: 'Send Email' }, + }); + }); + + it('re-derives field displayNames for every read-record field (success and error)', async () => { + const execution: StepExecutionData = { + type: 'read-record', + stepIndex: 0, + selectedRecordRef: recordRef, + executionParams: { fields: [{ name: 'email' }, { name: 'ghost' }] }, + executionResult: { + fields: [ + { name: 'email', value: 'a@b.c' }, + { name: 'ghost', error: 'Field not found: ghost' }, + ], + }, + }; + + const result = await hydrateStepExecutionData(execution, makeGetter()); + + expect(result).toMatchObject({ + executionParams: { + fields: [ + { name: 'email', displayName: 'Email' }, + // Unknown field falls back to its technical name. + { name: 'ghost', displayName: 'ghost' }, + ], + }, + executionResult: { + fields: [ + { name: 'email', displayName: 'Email', value: 'a@b.c' }, + { name: 'ghost', displayName: 'ghost', error: 'Field not found: ghost' }, + ], + }, + }); + }); + + it('re-derives the relation displayName for load-related-record params, pending and result', async () => { + const execution: StepExecutionData = { + type: 'load-related-record', + stepIndex: 3, + selectedRecordRef: recordRef, + pendingData: { name: 'orders', selectedRecordId: [7] }, + executionParams: { name: 'orders' }, + executionResult: { + relation: { name: 'orders' }, + record: { collectionName: 'orders', recordId: [7], stepIndex: 3 }, + }, + }; + + const result = await hydrateStepExecutionData(execution, makeGetter()); + + expect(result).toMatchObject({ + pendingData: { name: 'orders', displayName: 'Orders', selectedRecordId: [7] }, + executionParams: { name: 'orders', displayName: 'Orders' }, + executionResult: { + relation: { name: 'orders', displayName: 'Orders' }, + record: { collectionName: 'orders', recordId: [7] }, + }, + }); + }); + + it('reflects a CHANGED schema rather than any previously persisted label', async () => { + const execution: StepExecutionData = { + type: 'update-record', + stepIndex: 1, + selectedRecordRef: recordRef, + // Simulate an old row that still carries a (now stale) displayName alongside the name. + executionParams: { name: 'status', value: 'active', displayName: 'OLD LABEL' } as never, + }; + const renamed = makeSchema({ + fields: [{ fieldName: 'status', displayName: 'Lifecycle Stage', isRelationship: false }], + }); + + const result = await hydrateStepExecutionData(execution, makeGetter(renamed)); + + // The stale 'OLD LABEL' is ignored; the label is rebuilt from the (renamed) schema. + expect(result).toMatchObject({ + type: 'update-record', + executionParams: { name: 'status', displayName: 'Lifecycle Stage' }, + }); + }); + + it('falls back to technical names when the schema cannot be fetched', async () => { + const execution: StepExecutionData = { + type: 'trigger-action', + stepIndex: 2, + selectedRecordRef: recordRef, + executionParams: { name: 'send-email' }, + }; + const failingGetter: SchemaGetter = jest.fn().mockRejectedValue(new Error('boom')); + + const result = await hydrateStepExecutionData(execution, failingGetter); + + expect(result).toMatchObject({ + executionParams: { name: 'send-email', displayName: 'send-email' }, + }); + }); + + it('returns condition / mcp / guidance / record executions unchanged without fetching a schema', async () => { + const getter = makeGetter(); + const condition: StepExecutionData = { + type: 'condition', + stepIndex: 0, + executionParams: { answer: 'yes' }, + }; + + const result = await hydrateStepExecutionData(condition, getter); + + expect(result).toEqual(condition); + expect(getter).not.toHaveBeenCalled(); + }); + + it('leaves a skipped load-related result untouched', async () => { + const execution: StepExecutionData = { + type: 'load-related-record', + stepIndex: 3, + selectedRecordRef: recordRef, + executionResult: { skipped: true }, + }; + + const result = await hydrateStepExecutionData(execution, makeGetter()); + + expect(result).toMatchObject({ executionResult: { skipped: true } }); + }); +}); + +describe('makeSchemaGetter', () => { + it('serves the workflowPort schema on a miss and the cache on a hit', async () => { + const schema = makeSchema(); + const cache = new SchemaCache(); + const workflowPort = { + getCollectionSchema: jest.fn().mockResolvedValue(schema), + } as unknown as WorkflowPort; + + const getSchema = makeSchemaGetter(cache, workflowPort, 'run-1'); + + await expect(getSchema('customers')).resolves.toBe(schema); + await expect(getSchema('customers')).resolves.toBe(schema); + + // Second call is served from the cache — the port is hit exactly once. + expect(workflowPort.getCollectionSchema).toHaveBeenCalledTimes(1); + expect(workflowPort.getCollectionSchema).toHaveBeenCalledWith('customers', 'run-1'); + }); +}); diff --git a/packages/workflow-executor/test/integration/workflow-execution.test.ts b/packages/workflow-executor/test/integration/workflow-execution.test.ts index 103f25c14c..fcfb5820e3 100644 --- a/packages/workflow-executor/test/integration/workflow-execution.test.ts +++ b/packages/workflow-executor/test/integration/workflow-execution.test.ts @@ -293,7 +293,7 @@ describe('workflow execution (integration)', () => { type: 'read-record', stepIndex: 0, executionResult: { - fields: [{ value: 'john@example.com', name: 'email', displayName: 'Email' }], + fields: [{ value: 'john@example.com', name: 'email' }], }, }), ); @@ -533,7 +533,7 @@ describe('workflow execution (integration)', () => { expect.objectContaining({ type: 'load-related-record', executionResult: { - relation: { name: 'order', displayName: 'Order' }, + relation: { name: 'order' }, record: { collectionName: 'orders', recordId: [99], stepIndex: 0 }, }, }), diff --git a/packages/workflow-executor/test/runner.test.ts b/packages/workflow-executor/test/runner.test.ts index f1bc218aa0..94e89ed61f 100644 --- a/packages/workflow-executor/test/runner.test.ts +++ b/packages/workflow-executor/test/runner.test.ts @@ -1775,6 +1775,38 @@ describe('getRunStepExecutions', () => { expect(result).toEqual(steps); expect(runStore.getStepExecutions).toHaveBeenCalledWith('run-1'); }); + + it('re-derives displayName from the current schema (not from persisted data)', async () => { + // Persisted execution carries ONLY the technical fieldName. + const persisted = [ + { + type: 'update-record' as const, + stepIndex: 0, + selectedRecordRef: { collectionName: 'customers', recordId: [1], stepIndex: 0 }, + executionParams: { name: 'status', value: 'active' }, + }, + ]; + const runStore = createMockRunStore({ + getStepExecutions: jest.fn().mockResolvedValue(persisted), + }); + const workflowPort = createMockWorkflowPort(); + workflowPort.getCollectionSchema.mockResolvedValue({ + collectionName: 'customers', + collectionDisplayName: 'Customers', + primaryKeyFields: ['id'], + fields: [{ fieldName: 'status', displayName: 'Lifecycle Stage', isRelationship: false }], + actions: [], + }); + runner = new Runner(createRunnerConfig({ runStore, workflowPort })); + + const result = await runner.getRunStepExecutions('run-1'); + + expect(result[0]).toMatchObject({ + type: 'update-record', + executionParams: { name: 'status', displayName: 'Lifecycle Stage', value: 'active' }, + }); + expect(workflowPort.getCollectionSchema).toHaveBeenCalledWith('customers', 'run-1'); + }); }); // --------------------------------------------------------------------------- diff --git a/packages/workflow-executor/test/stores/database-store.test.ts b/packages/workflow-executor/test/stores/database-store.test.ts index cab80c9bae..c68ac03d25 100644 --- a/packages/workflow-executor/test/stores/database-store.test.ts +++ b/packages/workflow-executor/test/stores/database-store.test.ts @@ -92,9 +92,9 @@ describe('DatabaseStore (SQLite)', () => { const step: StepExecutionData = { type: 'update-record', stepIndex: 0, - executionParams: { displayName: 'Status', name: 'status', value: 'active' }, + executionParams: { name: 'status', value: 'active' }, executionResult: { updatedValues: { status: 'active', nested: { deep: true } } }, - pendingData: { displayName: 'Status', name: 'status', value: 'active' }, + pendingData: { name: 'status', value: 'active' }, selectedRecordRef: { collectionName: 'users', recordId: ['42'], stepIndex: 0 }, }; From d1640c9e993fb9aa7b1195b24e4cda86a46a7417 Mon Sep 17 00:00:00 2001 From: Pierre Merlet Date: Tue, 2 Jun 2026 18:32:14 +0200 Subject: [PATCH 2/8] test(workflow-executor): cover hydrate-step-execution-data optional-field and fallthrough paths Adds cases for the awaiting-input / executing / automatic phases (where executionParams or pendingData is absent), the relation-result undefined branch, the action-vs-field lookup, and the unrecognized-type fallthrough. Brings hydrate-step-execution-data.ts to 100% statement/branch/function/line coverage. Co-Authored-By: Claude Opus 4.8 --- .../test/hydrate-step-execution-data.test.ts | 135 ++++++++++++++++++ 1 file changed, 135 insertions(+) diff --git a/packages/workflow-executor/test/hydrate-step-execution-data.test.ts b/packages/workflow-executor/test/hydrate-step-execution-data.test.ts index c9300a7503..2d5bfd55a7 100644 --- a/packages/workflow-executor/test/hydrate-step-execution-data.test.ts +++ b/packages/workflow-executor/test/hydrate-step-execution-data.test.ts @@ -191,6 +191,141 @@ describe('hydrateStepExecutionData', () => { expect(result).toMatchObject({ executionResult: { skipped: true } }); }); + + // --- optional-field phases: only the present fields get a displayName, absent ones stay absent + + it('hydrates only pendingData when update-record is awaiting input (no executionParams)', async () => { + const execution: StepExecutionData = { + type: 'update-record', + stepIndex: 1, + selectedRecordRef: recordRef, + pendingData: { name: 'status', value: 'active' }, + }; + + const result = await hydrateStepExecutionData(execution, makeGetter()); + + expect(result).toMatchObject({ + pendingData: { name: 'status', displayName: 'Status', value: 'active' }, + }); + expect((result as { executionParams?: unknown }).executionParams).toBeUndefined(); + }); + + it('hydrates only executionParams when update-record is done (no pendingData)', async () => { + const execution: StepExecutionData = { + type: 'update-record', + stepIndex: 1, + selectedRecordRef: recordRef, + executionParams: { name: 'status', value: 'active' }, + executionResult: { updatedValues: { status: 'active' } }, + }; + + const result = await hydrateStepExecutionData(execution, makeGetter()); + + expect(result).toMatchObject({ + executionParams: { name: 'status', displayName: 'Status', value: 'active' }, + executionResult: { updatedValues: { status: 'active' } }, + }); + expect((result as { pendingData?: unknown }).pendingData).toBeUndefined(); + }); + + it('leaves both fields undefined for the update-record executing phase', async () => { + const execution: StepExecutionData = { + type: 'update-record', + stepIndex: 1, + selectedRecordRef: recordRef, + idempotencyPhase: 'executing', + }; + + const result = await hydrateStepExecutionData(execution, makeGetter()); + + expect(result).toMatchObject({ type: 'update-record', idempotencyPhase: 'executing' }); + expect((result as { executionParams?: unknown }).executionParams).toBeUndefined(); + expect((result as { pendingData?: unknown }).pendingData).toBeUndefined(); + }); + + it('leaves both fields undefined for the trigger-action executing phase', async () => { + const execution: StepExecutionData = { + type: 'trigger-action', + stepIndex: 2, + selectedRecordRef: recordRef, + idempotencyPhase: 'executing', + }; + + const result = await hydrateStepExecutionData(execution, makeGetter()); + + expect((result as { executionParams?: unknown }).executionParams).toBeUndefined(); + expect((result as { pendingData?: unknown }).pendingData).toBeUndefined(); + }); + + it('hydrates load-related automatic execution (executionParams + result, no pendingData)', async () => { + const execution: StepExecutionData = { + type: 'load-related-record', + stepIndex: 3, + selectedRecordRef: recordRef, + executionParams: { name: 'orders' }, + executionResult: { + relation: { name: 'orders' }, + record: { collectionName: 'orders', recordId: [7], stepIndex: 3 }, + }, + }; + + const result = await hydrateStepExecutionData(execution, makeGetter()); + + expect(result).toMatchObject({ + executionParams: { name: 'orders', displayName: 'Orders' }, + executionResult: { relation: { name: 'orders', displayName: 'Orders' } }, + }); + expect((result as { pendingData?: unknown }).pendingData).toBeUndefined(); + }); + + it('hydrates load-related pendingData while it is still awaiting input (no executionResult)', async () => { + const execution: StepExecutionData = { + type: 'load-related-record', + stepIndex: 3, + selectedRecordRef: recordRef, + pendingData: { name: 'orders', selectedRecordId: [7] }, + }; + + const result = await hydrateStepExecutionData(execution, makeGetter()); + + expect(result).toMatchObject({ + pendingData: { name: 'orders', displayName: 'Orders', selectedRecordId: [7] }, + }); + // hydrateRelationResult passes an absent executionResult straight through. + expect((result as { executionResult?: unknown }).executionResult).toBeUndefined(); + expect((result as { executionParams?: unknown }).executionParams).toBeUndefined(); + }); + + it('resolves the action displayName from schema.actions, not schema.fields', async () => { + const execution: StepExecutionData = { + type: 'trigger-action', + stepIndex: 2, + selectedRecordRef: recordRef, + // A field shares the name of no action; ensure the action lookup is used (and a matching + // field name would NOT leak its label into an action ref). + executionParams: { name: 'send-email' }, + }; + + const result = await hydrateStepExecutionData(execution, makeGetter()); + + expect(result).toMatchObject({ + executionParams: { name: 'send-email', displayName: 'Send Email' }, + }); + }); + + it('returns an unrecognized step type unchanged after fetching the schema', async () => { + const getter = makeGetter(); + const future = { + type: 'future-step', + stepIndex: 9, + selectedRecordRef: recordRef, + } as unknown as StepExecutionData; + + const result = await hydrateStepExecutionData(future, getter); + + expect(result).toBe(future); + expect(getter).toHaveBeenCalledWith('customers'); + }); }); describe('makeSchemaGetter', () => { From b4c38326a1d28d95d117e8d62e9cfcc0f7bf4a75 Mon Sep 17 00:00:00 2001 From: Pierre Merlet Date: Tue, 2 Jun 2026 18:44:50 +0200 Subject: [PATCH 3/8] fix(workflow-executor): harden displayName hydration for production reads QA hardening of the read-time hydration path: - De-duplicate concurrent schema fetches: makeSchemaGetter now shares an in-flight promise per collection, so hydrating N steps of one collection (Promise.all in getRunStepExecutions / buildPreviousStepsMessages) triggers a single orchestrator call instead of N. - Tolerate malformed/legacy rows: a throw while hydrating one execution is caught and the raw execution returned, so one bad row no longer 500s the whole GET /runs/:runId response. - Observability: schema-fetch failures (warn) and per-row hydration failures (error) are now logged with collection/type/stepIndex context instead of failing silently. Tests cover the stampede de-dup, failed-fetch retry/no-cache, malformed-row resilience, and both log paths. hydrate-step-execution-data.ts stays at 100% coverage. Co-Authored-By: Claude Opus 4.8 --- .../src/executors/base-step-executor.ts | 2 +- .../src/hydrate-step-execution-data.ts | 107 +++++++++++++----- packages/workflow-executor/src/runner.ts | 4 +- .../test/hydrate-step-execution-data.test.ts | 104 +++++++++++++++++ .../workflow-executor/test/runner.test.ts | 67 +++++++++++ 5 files changed, 255 insertions(+), 29 deletions(-) diff --git a/packages/workflow-executor/src/executors/base-step-executor.ts b/packages/workflow-executor/src/executors/base-step-executor.ts index 1f9af0a6b5..cae0db948c 100644 --- a/packages/workflow-executor/src/executors/base-step-executor.ts +++ b/packages/workflow-executor/src/executors/base-step-executor.ts @@ -291,7 +291,7 @@ export default abstract class BaseStepExecutor { const execution = allStepExecutions.find(e => e.stepIndex === stepOutcome.stepIndex); const hydrated = execution - ? await hydrateStepExecutionData(execution, getSchema) + ? await hydrateStepExecutionData(execution, getSchema, this.context.logger) : undefined; return StepSummaryBuilder.build(stepDefinition, stepOutcome, hydrated); diff --git a/packages/workflow-executor/src/hydrate-step-execution-data.ts b/packages/workflow-executor/src/hydrate-step-execution-data.ts index 9fec69cd30..08699253ac 100644 --- a/packages/workflow-executor/src/hydrate-step-execution-data.ts +++ b/packages/workflow-executor/src/hydrate-step-execution-data.ts @@ -1,3 +1,4 @@ +import type { Logger } from './ports/logger-port'; import type { WorkflowPort } from './ports/workflow-port'; import type SchemaCache from './schema-cache'; import type { @@ -8,6 +9,8 @@ import type { } from './types/step-execution-data'; import type { CollectionSchema } from './types/validated/collection'; +import { extractErrorMessage } from './errors'; + // displayName is not persisted — it is rebuilt from the current schema so labels never go stale. export type SchemaGetter = (collectionName: string) => Promise; @@ -17,14 +20,29 @@ export function makeSchemaGetter( workflowPort: WorkflowPort, runId: string, ): SchemaGetter { - return async (collectionName: string) => { + // De-duplicates concurrent fetches for the same collection within a single read (e.g. the + // Promise.all over a run's steps): the first miss starts the fetch, the rest share its promise. + const inFlight = new Map>(); + + return (collectionName: string) => { const cached = schemaCache.get(collectionName); - if (cached) return cached; + if (cached) return Promise.resolve(cached); + + const pending = inFlight.get(collectionName); + if (pending) return pending; + + const promise = workflowPort + .getCollectionSchema(collectionName, runId) + .then(schema => { + schemaCache.set(collectionName, schema); - const schema = await workflowPort.getCollectionSchema(collectionName, runId); - schemaCache.set(collectionName, schema); + return schema; + }) + .finally(() => inFlight.delete(collectionName)); - return schema; + inFlight.set(collectionName, promise); + + return promise; }; } @@ -49,28 +67,16 @@ function hydrateRelationResult( }; } -export default async function hydrateStepExecutionData( - execution: StepExecutionData, - getSchema: SchemaGetter, -): Promise { - if ( - execution.type === 'condition' || - execution.type === 'mcp' || - execution.type === 'record' || - execution.type === 'guidance' - ) { - return execution; - } - - // A missing/renamed collection must not break a read — fall back to technical names. - let schema: CollectionSchema | null = null; - - try { - schema = await getSchema(execution.selectedRecordRef.collectionName); - } catch { - schema = null; - } - +// Pure transform — assumes the schema fetch already happened (or failed to null). Split from the +// async wrapper so the wrapper can guard it: a throw here (malformed row) is caught, logged, and +// the raw execution returned rather than failing the whole read. +function hydrate( + execution: Exclude< + StepExecutionData, + { type: 'condition' } | { type: 'mcp' } | { type: 'record' } | { type: 'guidance' } + >, + schema: CollectionSchema | null, +): HydratedStepExecutionData { switch (execution.type) { case 'read-record': return { @@ -133,3 +139,50 @@ export default async function hydrateStepExecutionData( return execution; } } + +export default async function hydrateStepExecutionData( + execution: StepExecutionData, + getSchema: SchemaGetter, + logger?: Logger, +): Promise { + if ( + execution.type === 'condition' || + execution.type === 'mcp' || + execution.type === 'record' || + execution.type === 'guidance' + ) { + return execution; + } + + const { collectionName } = execution.selectedRecordRef; + + // A missing/renamed collection must not break a read — fall back to technical names. + let schema: CollectionSchema | null = null; + + try { + schema = await getSchema(collectionName); + } catch (error) { + logger?.warn( + 'Failed to fetch collection schema for displayName hydration; using technical names', + { + collection: collectionName, + stepIndex: execution.stepIndex, + error: extractErrorMessage(error), + }, + ); + schema = null; + } + + try { + return hydrate(execution, schema); + } catch (error) { + // A single malformed/legacy row must not take down the whole read — return it un-hydrated. + logger?.error('Failed to hydrate step execution displayNames; returning the raw execution', { + type: execution.type, + stepIndex: execution.stepIndex, + error: extractErrorMessage(error), + }); + + return execution as HydratedStepExecutionData; + } +} diff --git a/packages/workflow-executor/src/runner.ts b/packages/workflow-executor/src/runner.ts index 0d6f0a1f6c..ba7ee432af 100644 --- a/packages/workflow-executor/src/runner.ts +++ b/packages/workflow-executor/src/runner.ts @@ -157,7 +157,9 @@ export default class Runner { const executions = await this.config.runStore.getStepExecutions(runId); const getSchema = makeSchemaGetter(this.config.schemaCache, this.config.workflowPort, runId); - return Promise.all(executions.map(execution => hydrateStepExecutionData(execution, getSchema))); + return Promise.all( + executions.map(execution => hydrateStepExecutionData(execution, getSchema, this.logger)), + ); } async triggerPoll( diff --git a/packages/workflow-executor/test/hydrate-step-execution-data.test.ts b/packages/workflow-executor/test/hydrate-step-execution-data.test.ts index 2d5bfd55a7..389429cc02 100644 --- a/packages/workflow-executor/test/hydrate-step-execution-data.test.ts +++ b/packages/workflow-executor/test/hydrate-step-execution-data.test.ts @@ -1,4 +1,5 @@ import type { SchemaGetter } from '../src/hydrate-step-execution-data'; +import type { Logger } from '../src/ports/logger-port'; import type { WorkflowPort } from '../src/ports/workflow-port'; import type { StepExecutionData } from '../src/types/step-execution-data'; import type { CollectionSchema, RecordRef } from '../src/types/validated/collection'; @@ -6,6 +7,10 @@ import type { CollectionSchema, RecordRef } from '../src/types/validated/collect import hydrateStepExecutionData, { makeSchemaGetter } from '../src/hydrate-step-execution-data'; import SchemaCache from '../src/schema-cache'; +function makeLogger(): jest.Mocked { + return { error: jest.fn(), warn: jest.fn(), info: jest.fn() }; +} + function makeSchema(overrides: Partial = {}): CollectionSchema { return { collectionName: 'customers', @@ -326,6 +331,47 @@ describe('hydrateStepExecutionData', () => { expect(result).toBe(future); expect(getter).toHaveBeenCalledWith('customers'); }); + + // --- resilience & observability + + it('warns (but does not throw) when the schema fetch fails', async () => { + const logger = makeLogger(); + const failingGetter: SchemaGetter = jest.fn().mockRejectedValue(new Error('endpoint down')); + const execution: StepExecutionData = { + type: 'trigger-action', + stepIndex: 2, + selectedRecordRef: recordRef, + executionParams: { name: 'send-email' }, + }; + + const result = await hydrateStepExecutionData(execution, failingGetter, logger); + + expect(result).toMatchObject({ + executionParams: { name: 'send-email', displayName: 'send-email' }, + }); + expect(logger.warn).toHaveBeenCalledWith( + expect.stringContaining('Failed to fetch collection schema'), + expect.objectContaining({ collection: 'customers', stepIndex: 2 }), + ); + }); + + it('returns the raw execution and logs an error when a malformed row cannot be hydrated', async () => { + const logger = makeLogger(); + // A corrupted/legacy read-record row missing executionParams would throw inside hydration. + const malformed = { + type: 'read-record', + stepIndex: 4, + selectedRecordRef: recordRef, + } as unknown as StepExecutionData; + + const result = await hydrateStepExecutionData(malformed, makeGetter(), logger); + + expect(result).toBe(malformed); + expect(logger.error).toHaveBeenCalledWith( + expect.stringContaining('Failed to hydrate'), + expect.objectContaining({ type: 'read-record', stepIndex: 4 }), + ); + }); }); describe('makeSchemaGetter', () => { @@ -345,4 +391,62 @@ describe('makeSchemaGetter', () => { expect(workflowPort.getCollectionSchema).toHaveBeenCalledTimes(1); expect(workflowPort.getCollectionSchema).toHaveBeenCalledWith('customers', 'run-1'); }); + + it('de-duplicates concurrent fetches for the same collection (no stampede)', async () => { + const cache = new SchemaCache(); + let resolveFetch: (s: CollectionSchema) => void = () => undefined; + const deferred = new Promise(resolve => { + resolveFetch = resolve; + }); + const getCollectionSchema = jest.fn().mockReturnValue(deferred); + const getSchema = makeSchemaGetter( + cache, + { getCollectionSchema } as unknown as WorkflowPort, + 'run-1', + ); + + // Five concurrent misses while the first fetch is still in flight. + const all = Promise.all([ + getSchema('customers'), + getSchema('customers'), + getSchema('customers'), + getSchema('customers'), + getSchema('customers'), + ]); + resolveFetch(makeSchema()); + const results = await all; + + expect(getCollectionSchema).toHaveBeenCalledTimes(1); + results.forEach(r => expect(r).toMatchObject({ collectionName: 'customers' })); + }); + + it('clears the in-flight entry so a failed fetch can be retried', async () => { + const cache = new SchemaCache(); + const getCollectionSchema = jest + .fn() + .mockRejectedValueOnce(new Error('transient')) + .mockResolvedValueOnce(makeSchema()); + const getSchema = makeSchemaGetter( + cache, + { getCollectionSchema } as unknown as WorkflowPort, + 'run-1', + ); + + await expect(getSchema('customers')).rejects.toThrow('transient'); + await expect(getSchema('customers')).resolves.toMatchObject({ collectionName: 'customers' }); + expect(getCollectionSchema).toHaveBeenCalledTimes(2); + }); + + it('does not cache a failed fetch', async () => { + const cache = new SchemaCache(); + const getCollectionSchema = jest.fn().mockRejectedValue(new Error('down')); + const getSchema = makeSchemaGetter( + cache, + { getCollectionSchema } as unknown as WorkflowPort, + 'run-1', + ); + + await expect(getSchema('customers')).rejects.toThrow('down'); + expect(cache.get('customers')).toBeUndefined(); + }); }); diff --git a/packages/workflow-executor/test/runner.test.ts b/packages/workflow-executor/test/runner.test.ts index 95eba2c3fa..5fad285cca 100644 --- a/packages/workflow-executor/test/runner.test.ts +++ b/packages/workflow-executor/test/runner.test.ts @@ -2085,6 +2085,73 @@ describe('getRunStepExecutions', () => { }); expect(workflowPort.getCollectionSchema).toHaveBeenCalledWith('customers', 'run-1'); }); + + it('fetches the schema once for many steps sharing a collection (no stampede)', async () => { + const ref = { collectionName: 'customers', recordId: [1], stepIndex: 0 }; + const persisted = Array.from({ length: 6 }, (_, i) => ({ + type: 'update-record' as const, + stepIndex: i, + selectedRecordRef: { ...ref, stepIndex: i }, + executionParams: { name: 'status', value: 'active' }, + })); + const runStore = createMockRunStore({ + getStepExecutions: jest.fn().mockResolvedValue(persisted), + }); + const workflowPort = createMockWorkflowPort(); + workflowPort.getCollectionSchema.mockResolvedValue({ + collectionName: 'customers', + collectionDisplayName: 'Customers', + primaryKeyFields: ['id'], + fields: [{ fieldName: 'status', displayName: 'Status', isRelationship: false }], + actions: [], + }); + runner = new Runner(createRunnerConfig({ runStore, workflowPort })); + + const result = await runner.getRunStepExecutions('run-1'); + + expect(result).toHaveLength(6); + expect(workflowPort.getCollectionSchema).toHaveBeenCalledTimes(1); + }); + + it('does not fail the whole read when one persisted row is malformed', async () => { + const ref = { collectionName: 'customers', recordId: [1], stepIndex: 0 }; + const persisted = [ + { + type: 'update-record' as const, + stepIndex: 0, + selectedRecordRef: ref, + executionParams: { name: 'status', value: 'active' }, + }, + // Corrupted/legacy read-record row missing executionParams — would throw during hydration. + { type: 'read-record' as const, stepIndex: 1, selectedRecordRef: ref }, + ]; + const runStore = createMockRunStore({ + getStepExecutions: jest.fn().mockResolvedValue(persisted), + }); + const workflowPort = createMockWorkflowPort(); + workflowPort.getCollectionSchema.mockResolvedValue({ + collectionName: 'customers', + collectionDisplayName: 'Customers', + primaryKeyFields: ['id'], + fields: [{ fieldName: 'status', displayName: 'Status', isRelationship: false }], + actions: [], + }); + const logger = createMockLogger(); + runner = new Runner(createRunnerConfig({ runStore, workflowPort, logger })); + + const result = await runner.getRunStepExecutions('run-1'); + + // The good row is hydrated; the malformed row is returned raw rather than 500-ing the read. + expect(result).toHaveLength(2); + expect(result[0]).toMatchObject({ + executionParams: { name: 'status', displayName: 'Status', value: 'active' }, + }); + expect(result[1]).toMatchObject({ type: 'read-record', stepIndex: 1 }); + expect(logger.error).toHaveBeenCalledWith( + expect.stringContaining('Failed to hydrate'), + expect.objectContaining({ type: 'read-record', stepIndex: 1 }), + ); + }); }); // --------------------------------------------------------------------------- From e1b2ed95ee856de1b5215817e97e07134f45b03f Mon Sep 17 00:00:00 2001 From: Pierre Merlet Date: Tue, 2 Jun 2026 19:06:39 +0200 Subject: [PATCH 4/8] fix(workflow-executor): scope schema cache by rendering to prevent cross-rendering label leaks MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit displayName, and the visible field/action set, are rendering-specific (forestadmin-server resolves them from the run's rendering layout). The process-wide SchemaCache was keyed by collectionName only, so within one environment a run on rendering A could serve its labels / visible fields to a run on rendering B for the cache TTL — wrong labels and, when a rendering hides fields, a wrong field set fed to the AI. - SchemaCache is now keyed by (renderingId, collectionName) via nested maps; the bare iterator is replaced by entriesForRendering(renderingId). - makeSchemaGetter takes renderingId (cache scope) alongside runId (server fetch, which resolves the run's rendering). RecordStepExecutor.getCollectionSchema delegates to it for consistent keying + dedup; buildPreviousStepsMessages and getRunStepExecutions pass the rendering. - getRunStepExecutions(runId, renderingId): the HTTP read path passes the requesting user's renderingId. AgentClientAgentPort scopes resolveSchema/buildActionEndpoints by user.renderingId. Technical identifiers (fieldName, action name, primary keys, endpoints) are environment-level and were never corrupted; this fix isolates the rendering-scoped labels and field membership. Tests: SchemaCache rendering isolation + entriesForRendering, makeSchemaGetter per-rendering isolation, and an end-to-end runner test proving two renderings sharing one cache don't contaminate. schema-cache.ts and hydrate-step-execution-data.ts at 100% coverage. Co-Authored-By: Claude Opus 4.8 --- .../src/adapters/agent-client-agent-port.ts | 16 ++-- .../src/executors/base-step-executor.ts | 1 + .../src/executors/record-step-executor.ts | 18 ++-- .../src/http/executor-http-server.ts | 3 +- .../src/hydrate-step-execution-data.ts | 7 +- packages/workflow-executor/src/runner.ts | 14 ++- .../workflow-executor/src/schema-cache.ts | 43 ++++++--- .../adapters/agent-client-agent-port.test.ts | 12 +-- .../test/http/executor-http-server.test.ts | 5 +- .../test/hydrate-step-execution-data.test.ts | 26 +++++- .../workflow-executor/test/runner.test.ts | 52 ++++++++++- .../test/schema-cache.test.ts | 89 ++++++++++++------- 12 files changed, 209 insertions(+), 77 deletions(-) diff --git a/packages/workflow-executor/src/adapters/agent-client-agent-port.ts b/packages/workflow-executor/src/adapters/agent-client-agent-port.ts index 24bec25231..c1889cdea0 100644 --- a/packages/workflow-executor/src/adapters/agent-client-agent-port.ts +++ b/packages/workflow-executor/src/adapters/agent-client-agent-port.ts @@ -73,7 +73,7 @@ export default class AgentClientAgentPort implements AgentPort { async getRecord({ collection, id, fields }: GetRecordQuery, user: StepUser): Promise { return this.callAgent('getRecord', async () => { const client = this.createClient(user); - const schema = this.resolveSchema(collection); + const schema = this.resolveSchema(collection, user.renderingId); const records = await client.collection(collection).list>({ filters: buildPkFilter(schema.primaryKeyFields, id), pagination: { size: 1, number: 1 }, @@ -116,10 +116,10 @@ export default class AgentClientAgentPort implements AgentPort { ): Promise { return this.callAgent('getRelatedData', async () => { const client = this.createClient(user); - const parentSchema = this.resolveSchema(collection); + const parentSchema = this.resolveSchema(collection, user.renderingId); const relationField = parentSchema.fields.find(f => f.fieldName === relation); const relatedCollectionName = relationField?.relatedCollectionName ?? relation; - const relatedSchema = this.resolveSchema(relatedCollectionName); + const relatedSchema = this.resolveSchema(relatedCollectionName, user.renderingId); const records = await client .collection(collection) @@ -186,7 +186,7 @@ export default class AgentClientAgentPort implements AgentPort { return createRemoteAgentClient({ url: this.agentUrl, token, - actionEndpoints: this.buildActionEndpoints(), + actionEndpoints: this.buildActionEndpoints(user.renderingId), }); } @@ -212,10 +212,10 @@ export default class AgentClientAgentPort implements AgentPort { } } - private buildActionEndpoints(): ActionEndpointsByCollection { + private buildActionEndpoints(renderingId: number): ActionEndpointsByCollection { const endpoints: ActionEndpointsByCollection = {}; - for (const [collectionName, schema] of this.schemaCache) { + for (const [collectionName, schema] of this.schemaCache.entriesForRendering(renderingId)) { endpoints[collectionName] = {}; for (const action of schema.actions) { @@ -238,8 +238,8 @@ export default class AgentClientAgentPort implements AgentPort { return endpoints; } - private resolveSchema(collectionName: string): CollectionSchema { - const cached = this.schemaCache.get(collectionName); + private resolveSchema(collectionName: string, renderingId: number): CollectionSchema { + const cached = this.schemaCache.get(renderingId, collectionName); if (!cached) { // eslint-disable-next-line no-console diff --git a/packages/workflow-executor/src/executors/base-step-executor.ts b/packages/workflow-executor/src/executors/base-step-executor.ts index cae0db948c..406a45870f 100644 --- a/packages/workflow-executor/src/executors/base-step-executor.ts +++ b/packages/workflow-executor/src/executors/base-step-executor.ts @@ -286,6 +286,7 @@ export default abstract class BaseStepExecutor { diff --git a/packages/workflow-executor/src/executors/record-step-executor.ts b/packages/workflow-executor/src/executors/record-step-executor.ts index 4430dfacc4..fd777c3a77 100644 --- a/packages/workflow-executor/src/executors/record-step-executor.ts +++ b/packages/workflow-executor/src/executors/record-step-executor.ts @@ -1,3 +1,4 @@ +import type { SchemaGetter } from '../hydrate-step-execution-data'; import type { StepExecutionResult } from '../types/execution-context'; import type { CollectionSchema, FieldSchema, RecordRef } from '../types/validated/collection'; import type { StepDefinition } from '../types/validated/step-definition'; @@ -7,11 +8,14 @@ import { DynamicStructuredTool, HumanMessage, SystemMessage } from '@forestadmin import { z } from 'zod'; import { InvalidAIResponseError, InvalidPreRecordedArgsError, NoRecordsError } from '../errors'; +import { makeSchemaGetter } from '../hydrate-step-execution-data'; import BaseStepExecutor from './base-step-executor'; export default abstract class RecordStepExecutor< TStep extends StepDefinition = StepDefinition, > extends BaseStepExecutor { + private schemaGetter?: SchemaGetter; + protected buildOutcomeResult(outcome: { status: RecordStepStatus; error?: string; @@ -64,16 +68,16 @@ export default abstract class RecordStepExecutor< } protected async getCollectionSchema(collectionName: string): Promise { - const cached = this.context.schemaCache.get(collectionName); - if (cached) return cached; - - const schema = await this.context.workflowPort.getCollectionSchema( - collectionName, + // Lazily build a rendering-scoped getter once per executor instance (one runId/rendering), + // shared across this step's getCollectionSchema calls — same keying as the read path. + this.schemaGetter ??= makeSchemaGetter( + this.context.schemaCache, + this.context.workflowPort, this.context.runId, + this.context.user.renderingId, ); - this.context.schemaCache.set(collectionName, schema); - return schema; + return this.schemaGetter(collectionName); } protected findField(schema: CollectionSchema, name: string): FieldSchema | undefined { diff --git a/packages/workflow-executor/src/http/executor-http-server.ts b/packages/workflow-executor/src/http/executor-http-server.ts index 5438cb6da8..6c12f57de4 100644 --- a/packages/workflow-executor/src/http/executor-http-server.ts +++ b/packages/workflow-executor/src/http/executor-http-server.ts @@ -159,7 +159,8 @@ export default class ExecutorHttpServer { } private async handleGetRun(ctx: Koa.Context): Promise { - const steps = await this.options.runner.getRunStepExecutions(ctx.params.runId); + const { renderingId } = ctx.state.user as StepUser; + const steps = await this.options.runner.getRunStepExecutions(ctx.params.runId, renderingId); ctx.body = { steps }; } diff --git a/packages/workflow-executor/src/hydrate-step-execution-data.ts b/packages/workflow-executor/src/hydrate-step-execution-data.ts index 08699253ac..00372fa55b 100644 --- a/packages/workflow-executor/src/hydrate-step-execution-data.ts +++ b/packages/workflow-executor/src/hydrate-step-execution-data.ts @@ -15,17 +15,20 @@ import { extractErrorMessage } from './errors'; export type SchemaGetter = (collectionName: string) => Promise; +// runId scopes the server fetch (it resolves the run's rendering); renderingId scopes the cache so +// entries are never shared across renderings of one environment. export function makeSchemaGetter( schemaCache: SchemaCache, workflowPort: WorkflowPort, runId: string, + renderingId: number, ): SchemaGetter { // De-duplicates concurrent fetches for the same collection within a single read (e.g. the // Promise.all over a run's steps): the first miss starts the fetch, the rest share its promise. const inFlight = new Map>(); return (collectionName: string) => { - const cached = schemaCache.get(collectionName); + const cached = schemaCache.get(renderingId, collectionName); if (cached) return Promise.resolve(cached); const pending = inFlight.get(collectionName); @@ -34,7 +37,7 @@ export function makeSchemaGetter( const promise = workflowPort .getCollectionSchema(collectionName, runId) .then(schema => { - schemaCache.set(collectionName, schema); + schemaCache.set(renderingId, collectionName, schema); return schema; }) diff --git a/packages/workflow-executor/src/runner.ts b/packages/workflow-executor/src/runner.ts index ba7ee432af..57c3159b45 100644 --- a/packages/workflow-executor/src/runner.ts +++ b/packages/workflow-executor/src/runner.ts @@ -153,9 +153,19 @@ export default class Runner { } } - async getRunStepExecutions(runId: string): Promise { + // renderingId (from the requesting user) scopes the schema cache; the schema content itself is + // resolved server-side from the run's rendering via runId in getCollectionSchema. + async getRunStepExecutions( + runId: string, + renderingId: number, + ): Promise { const executions = await this.config.runStore.getStepExecutions(runId); - const getSchema = makeSchemaGetter(this.config.schemaCache, this.config.workflowPort, runId); + const getSchema = makeSchemaGetter( + this.config.schemaCache, + this.config.workflowPort, + runId, + renderingId, + ); return Promise.all( executions.map(execution => hydrateStepExecutionData(execution, getSchema, this.logger)), diff --git a/packages/workflow-executor/src/schema-cache.ts b/packages/workflow-executor/src/schema-cache.ts index 68b1a3db0b..39cd188f89 100644 --- a/packages/workflow-executor/src/schema-cache.ts +++ b/packages/workflow-executor/src/schema-cache.ts @@ -2,8 +2,17 @@ import type { CollectionSchema } from './types/validated/collection'; const DEFAULT_TTL_MS = 10 * 60 * 1000; // 10 minutes +interface Entry { + schema: CollectionSchema; + fetchedAt: number; +} + +// Scoped by renderingId: collection/field/action displayNames AND the visible field set are +// rendering-specific (see forestadmin-server getCollectionSchema, which resolves them from the +// run's rendering layout). The same collection name in two renderings of one environment must NOT +// share an entry, or one team's labels/visible fields would leak into another's run. export default class SchemaCache { - private readonly store = new Map(); + private readonly store = new Map>(); private readonly ttlMs: number; private readonly now: () => number; @@ -12,13 +21,14 @@ export default class SchemaCache { this.now = now; } - get(collectionName: string): CollectionSchema | undefined { - const entry = this.store.get(collectionName); + get(renderingId: number, collectionName: string): CollectionSchema | undefined { + const byCollection = this.store.get(renderingId); + const entry = byCollection?.get(collectionName); if (!entry) return undefined; if (this.now() - entry.fetchedAt > this.ttlMs) { - this.store.delete(collectionName); + byCollection!.delete(collectionName); return undefined; } @@ -26,19 +36,30 @@ export default class SchemaCache { return entry.schema; } - set(collectionName: string, schema: CollectionSchema): void { - this.store.set(collectionName, { schema, fetchedAt: this.now() }); + set(renderingId: number, collectionName: string, schema: CollectionSchema): void { + let byCollection = this.store.get(renderingId); + + if (!byCollection) { + byCollection = new Map(); + this.store.set(renderingId, byCollection); + } + + byCollection.set(collectionName, { schema, fetchedAt: this.now() }); } - // Yields non-expired entries; deletes stale ones along the way. - *[Symbol.iterator](): IterableIterator<[string, CollectionSchema]> { + // Yields the non-expired entries for a single rendering; deletes stale ones along the way. + *entriesForRendering(renderingId: number): IterableIterator<[string, CollectionSchema]> { + const byCollection = this.store.get(renderingId); + + if (!byCollection) return; + const now = this.now(); - for (const [key, entry] of this.store) { + for (const [collectionName, entry] of byCollection) { if (now - entry.fetchedAt <= this.ttlMs) { - yield [key, entry.schema]; + yield [collectionName, entry.schema]; } else { - this.store.delete(key); + byCollection.delete(collectionName); } } } diff --git a/packages/workflow-executor/test/adapters/agent-client-agent-port.test.ts b/packages/workflow-executor/test/adapters/agent-client-agent-port.test.ts index aacaff0652..981cd41db7 100644 --- a/packages/workflow-executor/test/adapters/agent-client-agent-port.test.ts +++ b/packages/workflow-executor/test/adapters/agent-client-agent-port.test.ts @@ -46,7 +46,7 @@ describe('AgentClientAgentPort', () => { mockedCreateRemoteAgentClient.mockReturnValue(mocks.client as any); const schemaCache = new SchemaCache(); - schemaCache.set('users', { + schemaCache.set(1, 'users', { collectionName: 'users', collectionDisplayName: 'Users', primaryKeyFields: ['id'], @@ -59,7 +59,7 @@ describe('AgentClientAgentPort', () => { { name: 'archive', displayName: 'Archive', endpoint: '/forest/actions/archive' }, ], }); - schemaCache.set('orders', { + schemaCache.set(1, 'orders', { collectionName: 'orders', collectionDisplayName: 'Orders', primaryKeyFields: ['tenantId', 'orderId'], @@ -69,7 +69,7 @@ describe('AgentClientAgentPort', () => { ], actions: [], }); - schemaCache.set('posts', { + schemaCache.set(1, 'posts', { collectionName: 'posts', collectionDisplayName: 'Posts', primaryKeyFields: ['id'], @@ -369,7 +369,7 @@ describe('AgentClientAgentPort', () => { it('should restore snake_case field names in recordId and values when agent returns camelCase keys', async () => { const cache = new SchemaCache(); - cache.set('users', { + cache.set(1, 'users', { collectionName: 'users', collectionDisplayName: 'Users', primaryKeyFields: ['id'], @@ -383,7 +383,7 @@ describe('AgentClientAgentPort', () => { ], actions: [], }); - cache.set('posts', { + cache.set(1, 'posts', { collectionName: 'posts', collectionDisplayName: 'Posts', primaryKeyFields: ['post_id'], @@ -487,7 +487,7 @@ describe('AgentClientAgentPort', () => { describe('buildActionEndpoints', () => { it('passes fields and hooks from schema to agent-client (supports Ruby agent fallback)', async () => { const schemaCache = new SchemaCache(); - schemaCache.set('users', { + schemaCache.set(1, 'users', { collectionName: 'users', collectionDisplayName: 'Users', primaryKeyFields: ['id'], diff --git a/packages/workflow-executor/test/http/executor-http-server.test.ts b/packages/workflow-executor/test/http/executor-http-server.test.ts index c15abf8780..505e7711dd 100644 --- a/packages/workflow-executor/test/http/executor-http-server.test.ts +++ b/packages/workflow-executor/test/http/executor-http-server.test.ts @@ -289,7 +289,7 @@ describe('ExecutorHttpServer', () => { }); const server = createServer({ runner }); - const token = signToken({ id: 1 }); + const token = signToken({ id: 1, renderingId: 7 }); const response = await request(server.callback) .get('/runs/run-1') @@ -297,7 +297,8 @@ describe('ExecutorHttpServer', () => { expect(response.status).toBe(200); expect(response.body).toEqual({ steps }); - expect(runner.getRunStepExecutions).toHaveBeenCalledWith('run-1'); + // Schema hydration is scoped to the requesting user's rendering. + expect(runner.getRunStepExecutions).toHaveBeenCalledWith('run-1', 7); }); it('should return 500 when getRunStepExecutions rejects', async () => { diff --git a/packages/workflow-executor/test/hydrate-step-execution-data.test.ts b/packages/workflow-executor/test/hydrate-step-execution-data.test.ts index 389429cc02..06b68cc698 100644 --- a/packages/workflow-executor/test/hydrate-step-execution-data.test.ts +++ b/packages/workflow-executor/test/hydrate-step-execution-data.test.ts @@ -382,7 +382,7 @@ describe('makeSchemaGetter', () => { getCollectionSchema: jest.fn().mockResolvedValue(schema), } as unknown as WorkflowPort; - const getSchema = makeSchemaGetter(cache, workflowPort, 'run-1'); + const getSchema = makeSchemaGetter(cache, workflowPort, 'run-1', 1); await expect(getSchema('customers')).resolves.toBe(schema); await expect(getSchema('customers')).resolves.toBe(schema); @@ -392,6 +392,25 @@ describe('makeSchemaGetter', () => { expect(workflowPort.getCollectionSchema).toHaveBeenCalledWith('customers', 'run-1'); }); + it('scopes the cache by rendering: two renderings of one collection do not share', async () => { + const cache = new SchemaCache(); + const getCollectionSchema = jest + .fn() + .mockResolvedValueOnce(makeSchema({ collectionDisplayName: 'Clients' })) + .mockResolvedValueOnce(makeSchema({ collectionDisplayName: 'Accounts' })); + const workflowPort = { getCollectionSchema } as unknown as WorkflowPort; + + const r1 = await makeSchemaGetter(cache, workflowPort, 'run-1', 1)('customers'); + const r2 = await makeSchemaGetter(cache, workflowPort, 'run-2', 2)('customers'); + + // Each rendering fetched and cached independently — no cross-rendering hit. + expect(getCollectionSchema).toHaveBeenCalledTimes(2); + expect(r1.collectionDisplayName).toBe('Clients'); + expect(r2.collectionDisplayName).toBe('Accounts'); + expect(cache.get(1, 'customers')?.collectionDisplayName).toBe('Clients'); + expect(cache.get(2, 'customers')?.collectionDisplayName).toBe('Accounts'); + }); + it('de-duplicates concurrent fetches for the same collection (no stampede)', async () => { const cache = new SchemaCache(); let resolveFetch: (s: CollectionSchema) => void = () => undefined; @@ -403,6 +422,7 @@ describe('makeSchemaGetter', () => { cache, { getCollectionSchema } as unknown as WorkflowPort, 'run-1', + 1, ); // Five concurrent misses while the first fetch is still in flight. @@ -430,6 +450,7 @@ describe('makeSchemaGetter', () => { cache, { getCollectionSchema } as unknown as WorkflowPort, 'run-1', + 1, ); await expect(getSchema('customers')).rejects.toThrow('transient'); @@ -444,9 +465,10 @@ describe('makeSchemaGetter', () => { cache, { getCollectionSchema } as unknown as WorkflowPort, 'run-1', + 1, ); await expect(getSchema('customers')).rejects.toThrow('down'); - expect(cache.get('customers')).toBeUndefined(); + expect(cache.get(1, 'customers')).toBeUndefined(); }); }); diff --git a/packages/workflow-executor/test/runner.test.ts b/packages/workflow-executor/test/runner.test.ts index 5fad285cca..b359a1db64 100644 --- a/packages/workflow-executor/test/runner.test.ts +++ b/packages/workflow-executor/test/runner.test.ts @@ -2048,7 +2048,7 @@ describe('getRunStepExecutions', () => { }); runner = new Runner(createRunnerConfig({ runStore })); - const result = await runner.getRunStepExecutions('run-1'); + const result = await runner.getRunStepExecutions('run-1', 1); expect(result).toEqual(steps); expect(runStore.getStepExecutions).toHaveBeenCalledWith('run-1'); @@ -2077,7 +2077,7 @@ describe('getRunStepExecutions', () => { }); runner = new Runner(createRunnerConfig({ runStore, workflowPort })); - const result = await runner.getRunStepExecutions('run-1'); + const result = await runner.getRunStepExecutions('run-1', 1); expect(result[0]).toMatchObject({ type: 'update-record', @@ -2107,7 +2107,7 @@ describe('getRunStepExecutions', () => { }); runner = new Runner(createRunnerConfig({ runStore, workflowPort })); - const result = await runner.getRunStepExecutions('run-1'); + const result = await runner.getRunStepExecutions('run-1', 1); expect(result).toHaveLength(6); expect(workflowPort.getCollectionSchema).toHaveBeenCalledTimes(1); @@ -2139,7 +2139,7 @@ describe('getRunStepExecutions', () => { const logger = createMockLogger(); runner = new Runner(createRunnerConfig({ runStore, workflowPort, logger })); - const result = await runner.getRunStepExecutions('run-1'); + const result = await runner.getRunStepExecutions('run-1', 1); // The good row is hydrated; the malformed row is returned raw rather than 500-ing the read. expect(result).toHaveLength(2); @@ -2152,6 +2152,50 @@ describe('getRunStepExecutions', () => { expect.objectContaining({ type: 'read-record', stepIndex: 1 }), ); }); + + it('does not contaminate labels across renderings sharing one executor cache', async () => { + const ref = { collectionName: 'customers', recordId: [1], stepIndex: 0 }; + const execution = { + type: 'update-record' as const, + stepIndex: 0, + selectedRecordRef: ref, + executionParams: { name: 'status', value: 'active' }, + }; + const runStore = createMockRunStore({ + getStepExecutions: jest.fn().mockResolvedValue([execution]), + }); + const workflowPort = createMockWorkflowPort(); + // The server resolves the schema from the run's rendering (via runId) — different label per run. + workflowPort.getCollectionSchema.mockImplementation((_collection: string, runId: string) => + Promise.resolve({ + collectionName: 'customers', + collectionDisplayName: 'Customers', + primaryKeyFields: ['id'], + fields: [ + { + fieldName: 'status', + displayName: runId === 'run-A' ? 'Lifecycle Stage' : 'État', + isRelationship: false, + }, + ], + actions: [], + }), + ); + // One Runner == one shared SchemaCache, exercised by two renderings of the same environment. + runner = new Runner(createRunnerConfig({ runStore, workflowPort })); + + const resultA = await runner.getRunStepExecutions('run-A', 1); + const resultB = await runner.getRunStepExecutions('run-B', 2); + + expect(resultA[0]).toMatchObject({ + executionParams: { name: 'status', displayName: 'Lifecycle Stage' }, + }); + expect(resultB[0]).toMatchObject({ + executionParams: { name: 'status', displayName: 'État' }, + }); + // Each rendering triggered its own fetch — rendering 2 did NOT read rendering 1's cached entry. + expect(workflowPort.getCollectionSchema).toHaveBeenCalledTimes(2); + }); }); // --------------------------------------------------------------------------- diff --git a/packages/workflow-executor/test/schema-cache.test.ts b/packages/workflow-executor/test/schema-cache.test.ts index 216721640d..178f2927ae 100644 --- a/packages/workflow-executor/test/schema-cache.test.ts +++ b/packages/workflow-executor/test/schema-cache.test.ts @@ -2,10 +2,13 @@ import type { CollectionSchema } from '../src/types/validated/collection'; import SchemaCache from '../src/schema-cache'; -function makeSchema(collectionName: string): CollectionSchema { +const R1 = 1; +const R2 = 2; + +function makeSchema(collectionName: string, displayName = collectionName): CollectionSchema { return { collectionName, - collectionDisplayName: collectionName, + collectionDisplayName: displayName, primaryKeyFields: ['id'], fields: [], actions: [], @@ -17,16 +20,16 @@ describe('SchemaCache', () => { it('returns undefined for unknown keys', () => { const cache = new SchemaCache(); - expect(cache.get('unknown')).toBeUndefined(); + expect(cache.get(R1, 'unknown')).toBeUndefined(); }); it('returns the schema after set', () => { const cache = new SchemaCache(); const schema = makeSchema('customers'); - cache.set('customers', schema); + cache.set(R1, 'customers', schema); - expect(cache.get('customers')).toBe(schema); + expect(cache.get(R1, 'customers')).toBe(schema); }); it('overwrites existing entry on set', () => { @@ -34,10 +37,33 @@ describe('SchemaCache', () => { const old = makeSchema('customers'); const updated = { ...makeSchema('customers'), primaryKeyFields: ['uid'] }; - cache.set('customers', old); - cache.set('customers', updated); + cache.set(R1, 'customers', old); + cache.set(R1, 'customers', updated); + + expect(cache.get(R1, 'customers')).toBe(updated); + }); + }); + + describe('rendering isolation', () => { + it('does not share entries for the same collection across renderings', () => { + const cache = new SchemaCache(); + const r1Schema = makeSchema('customers', 'Clients'); + const r2Schema = makeSchema('customers', 'Accounts'); + + cache.set(R1, 'customers', r1Schema); + cache.set(R2, 'customers', r2Schema); + + // Each rendering sees its own labels; no cross-contamination. + expect(cache.get(R1, 'customers')).toBe(r1Schema); + expect(cache.get(R2, 'customers')).toBe(r2Schema); + }); + + it('a miss in one rendering is unaffected by another rendering populating the same collection', () => { + const cache = new SchemaCache(); + + cache.set(R1, 'customers', makeSchema('customers')); - expect(cache.get('customers')).toBe(updated); + expect(cache.get(R2, 'customers')).toBeUndefined(); }); }); @@ -47,85 +73,84 @@ describe('SchemaCache', () => { const cache = new SchemaCache(1000, () => time); const schema = makeSchema('customers'); - cache.set('customers', schema); + cache.set(R1, 'customers', schema); time = 999; - expect(cache.get('customers')).toBe(schema); + expect(cache.get(R1, 'customers')).toBe(schema); }); it('returns undefined after TTL expires', () => { let time = 0; const cache = new SchemaCache(1000, () => time); - cache.set('customers', makeSchema('customers')); + cache.set(R1, 'customers', makeSchema('customers')); time = 1001; - expect(cache.get('customers')).toBeUndefined(); + expect(cache.get(R1, 'customers')).toBeUndefined(); }); it('deletes the expired entry from the store on get', () => { let time = 0; const cache = new SchemaCache(1000, () => time); - cache.set('customers', makeSchema('customers')); + cache.set(R1, 'customers', makeSchema('customers')); time = 1001; - cache.get('customers'); // triggers delete + cache.get(R1, 'customers'); // triggers delete time = 0; // rewind — entry should still be gone - expect(cache.get('customers')).toBeUndefined(); + expect(cache.get(R1, 'customers')).toBeUndefined(); }); it('refreshes TTL when entry is re-set', () => { let time = 0; const cache = new SchemaCache(1000, () => time); - cache.set('customers', makeSchema('customers')); + cache.set(R1, 'customers', makeSchema('customers')); time = 800; - cache.set('customers', makeSchema('customers')); // re-set refreshes + cache.set(R1, 'customers', makeSchema('customers')); // re-set refreshes time = 1500; // 700ms since re-set, within TTL - expect(cache.get('customers')).toBeDefined(); + expect(cache.get(R1, 'customers')).toBeDefined(); }); }); - describe('iterator', () => { - it('yields all non-expired entries', () => { + describe('entriesForRendering', () => { + it('yields all non-expired entries for the given rendering only', () => { const cache = new SchemaCache(); - cache.set('customers', makeSchema('customers')); - cache.set('orders', makeSchema('orders')); + cache.set(R1, 'customers', makeSchema('customers')); + cache.set(R1, 'orders', makeSchema('orders')); + cache.set(R2, 'customers', makeSchema('customers')); // other rendering — excluded - const entries = [...cache]; + const entries = [...cache.entriesForRendering(R1)]; - expect(entries).toHaveLength(2); - expect(entries[0][0]).toBe('customers'); - expect(entries[1][0]).toBe('orders'); + expect(entries.map(([name]) => name)).toEqual(['customers', 'orders']); }); it('skips and deletes expired entries', () => { let time = 0; const cache = new SchemaCache(1000, () => time); - cache.set('fresh', makeSchema('fresh')); + cache.set(R1, 'fresh', makeSchema('fresh')); time = 500; - cache.set('also-fresh', makeSchema('also-fresh')); + cache.set(R1, 'also-fresh', makeSchema('also-fresh')); time = 1100; // 'fresh' expired, 'also-fresh' still valid - const entries = [...cache]; + const entries = [...cache.entriesForRendering(R1)]; expect(entries).toHaveLength(1); expect(entries[0][0]).toBe('also-fresh'); // expired entry was cleaned up time = 0; - expect(cache.get('fresh')).toBeUndefined(); + expect(cache.get(R1, 'fresh')).toBeUndefined(); }); - it('returns empty for a fresh cache', () => { + it('returns empty for a rendering with no entries', () => { const cache = new SchemaCache(); - expect([...cache]).toHaveLength(0); + expect([...cache.entriesForRendering(R1)]).toHaveLength(0); }); }); }); From dd8b8263399a7430bf374505764510d41684d07e Mon Sep 17 00:00:00 2001 From: Pierre Merlet Date: Tue, 2 Jun 2026 19:08:52 +0200 Subject: [PATCH 5/8] docs(workflow-executor): note the schema-cache TTL freshness trade-off on reads Record that label edits can take up to the 10-minute TTL to surface on the hydration read path as an accepted trade-off, so it reads as intentional rather than a defect. Co-Authored-By: Claude Opus 4.8 --- packages/workflow-executor/src/schema-cache.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/packages/workflow-executor/src/schema-cache.ts b/packages/workflow-executor/src/schema-cache.ts index 39cd188f89..134251387c 100644 --- a/packages/workflow-executor/src/schema-cache.ts +++ b/packages/workflow-executor/src/schema-cache.ts @@ -1,6 +1,8 @@ import type { CollectionSchema } from './types/validated/collection'; -const DEFAULT_TTL_MS = 10 * 60 * 1000; // 10 minutes +// 10 minutes. By design, a label edited in the rendering can take up to this long to appear on the +// hydration read path (GET /runs/:runId) — an accepted freshness/load trade-off, not a bug. +const DEFAULT_TTL_MS = 10 * 60 * 1000; interface Entry { schema: CollectionSchema; From 9d0ce00ad045a0d69f3930ac363721d21b3641bb Mon Sep 17 00:00:00 2001 From: Pierre Merlet Date: Tue, 2 Jun 2026 19:21:05 +0200 Subject: [PATCH 6/8] fix(workflow-executor): validate renderingId from the JWT on the run-read route Addresses Macroscope review: handleGetRun cast the JWT payload to StepUser and passed renderingId downstream without runtime validation. A token missing renderingId (or with a non-numeric value) would send undefined/NaN into the schema cache. Mirror the bearerUserId guard used in handleTrigger: coerce to number, and return 400 when it is not finite. Co-Authored-By: Claude Opus 4.8 --- .../src/http/executor-http-server.ts | 12 +++++++++- .../test/http/executor-http-server.test.ts | 24 +++++++++++++++---- .../integration/workflow-execution.test.ts | 2 +- 3 files changed, 31 insertions(+), 7 deletions(-) diff --git a/packages/workflow-executor/src/http/executor-http-server.ts b/packages/workflow-executor/src/http/executor-http-server.ts index 6c12f57de4..cad94feb2d 100644 --- a/packages/workflow-executor/src/http/executor-http-server.ts +++ b/packages/workflow-executor/src/http/executor-http-server.ts @@ -159,7 +159,17 @@ export default class ExecutorHttpServer { } private async handleGetRun(ctx: Koa.Context): Promise { - const { renderingId } = ctx.state.user as StepUser; + const rawRenderingId = (ctx.state.user as { renderingId?: unknown })?.renderingId; + const renderingId = + typeof rawRenderingId === 'number' ? rawRenderingId : Number(rawRenderingId); + + if (!Number.isFinite(renderingId)) { + ctx.status = 400; + ctx.body = { error: 'Missing or invalid renderingId in token' }; + + return; + } + const steps = await this.options.runner.getRunStepExecutions(ctx.params.runId, renderingId); ctx.body = { steps }; } diff --git a/packages/workflow-executor/test/http/executor-http-server.test.ts b/packages/workflow-executor/test/http/executor-http-server.test.ts index 505e7711dd..a5ac5e750d 100644 --- a/packages/workflow-executor/test/http/executor-http-server.test.ts +++ b/packages/workflow-executor/test/http/executor-http-server.test.ts @@ -105,7 +105,7 @@ describe('ExecutorHttpServer', () => { it('should accept valid token in Authorization header', async () => { const server = createServer(); - const token = signToken({ id: 1 }); + const token = signToken({ id: 1, renderingId: 7 }); const response = await request(server.callback) .get('/runs/run-1') @@ -116,7 +116,7 @@ describe('ExecutorHttpServer', () => { it('should accept valid token in forest_session_token cookie', async () => { const server = createServer(); - const token = signToken({ id: 1 }); + const token = signToken({ id: 1, renderingId: 7 }); const response = await request(server.callback) .get('/runs/run-1') @@ -217,7 +217,7 @@ describe('ExecutorHttpServer', () => { it('calls hasRunAccess with the correct runId and decoded user', async () => { const workflowPort = createMockWorkflowPort(); const server = createServer({ workflowPort }); - const token = signToken({ id: 1 }); + const token = signToken({ id: 1, renderingId: 7 }); const response = await request(server.callback) .get('/runs/run-42') @@ -233,7 +233,7 @@ describe('ExecutorHttpServer', () => { it('calls hasRunAccess with decoded user from cookie token', async () => { const workflowPort = createMockWorkflowPort(); const server = createServer({ workflowPort }); - const token = signToken({ id: 1 }); + const token = signToken({ id: 1, renderingId: 7 }); const response = await request(server.callback) .get('/runs/run-cookie') @@ -307,7 +307,7 @@ describe('ExecutorHttpServer', () => { }); const server = createServer({ runner }); - const token = signToken({ id: 1 }); + const token = signToken({ id: 1, renderingId: 7 }); const response = await request(server.callback) .get('/runs/run-1') @@ -316,6 +316,20 @@ describe('ExecutorHttpServer', () => { expect(response.status).toBe(500); expect(response.body).toEqual({ error: 'Internal server error' }); }); + + it('should return 400 when the token has no renderingId', async () => { + const runner = createMockRunner(); + const server = createServer({ runner }); + const token = signToken({ id: 1 }); // no renderingId + + const response = await request(server.callback) + .get('/runs/run-1') + .set('Authorization', `Bearer ${token}`); + + expect(response.status).toBe(400); + expect(response.body).toEqual({ error: 'Missing or invalid renderingId in token' }); + expect(runner.getRunStepExecutions).not.toHaveBeenCalled(); + }); }); describe('POST /runs/:runId/trigger', () => { diff --git a/packages/workflow-executor/test/integration/workflow-execution.test.ts b/packages/workflow-executor/test/integration/workflow-execution.test.ts index 3741e109be..d8da788def 100644 --- a/packages/workflow-executor/test/integration/workflow-execution.test.ts +++ b/packages/workflow-executor/test/integration/workflow-execution.test.ts @@ -696,7 +696,7 @@ describe('workflow execution (integration)', () => { const { server, runStore } = createIntegrationSetup({ workflowPort }); await runStore.init(); - const token = signToken({ id: STEP_USER.id }); + const token = signToken({ id: STEP_USER.id, renderingId: STEP_USER.renderingId }); // Trigger the step first await request(server.callback) From e4b4fcaa8c3397860caa21153e627108e1481250 Mon Sep 17 00:00:00 2001 From: Pierre Merlet Date: Tue, 2 Jun 2026 19:31:20 +0200 Subject: [PATCH 7/8] fix(workflow-executor): make schema-fetch de-dup global instead of per-getter MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The in-flight de-dup previously lived on each getter (one per getRunStepExecutions call / per executor step), so it only collapsed concurrent misses within a single read. Two independent concurrent readers of the same cold (rendering, collection) — two reads of one run, or two runs of one rendering — still stampeded the orchestrator until the first fetch resolved. Move the read-through + de-dup into SchemaCache.getOrFetch, keyed by (renderingId, collectionName) on the one shared cache. All callers now share a single in-flight promise per (rendering, collection); makeSchemaGetter is a thin wrapper. A rejected fetch is not cached and clears the in-flight slot so the next caller retries. Tests: getOrFetch (hit/miss/concurrent-share/per-rendering/failed-retry) and a cross-getter global de-dup test. schema-cache.ts and hydrate-step-execution-data.ts at 100% coverage. Co-Authored-By: Claude Opus 4.8 --- .../src/hydrate-step-execution-data.ts | 31 ++------ .../workflow-executor/src/schema-cache.ts | 40 ++++++++++ .../test/hydrate-step-execution-data.test.ts | 21 +++++ .../test/schema-cache.test.ts | 78 +++++++++++++++++++ 4 files changed, 145 insertions(+), 25 deletions(-) diff --git a/packages/workflow-executor/src/hydrate-step-execution-data.ts b/packages/workflow-executor/src/hydrate-step-execution-data.ts index 00372fa55b..4908e8baff 100644 --- a/packages/workflow-executor/src/hydrate-step-execution-data.ts +++ b/packages/workflow-executor/src/hydrate-step-execution-data.ts @@ -16,37 +16,18 @@ import { extractErrorMessage } from './errors'; export type SchemaGetter = (collectionName: string) => Promise; // runId scopes the server fetch (it resolves the run's rendering); renderingId scopes the cache so -// entries are never shared across renderings of one environment. +// entries are never shared across renderings of one environment. Fetch de-duplication and caching +// live in SchemaCache.getOrFetch, so concurrent callers share one fetch regardless of getter. export function makeSchemaGetter( schemaCache: SchemaCache, workflowPort: WorkflowPort, runId: string, renderingId: number, ): SchemaGetter { - // De-duplicates concurrent fetches for the same collection within a single read (e.g. the - // Promise.all over a run's steps): the first miss starts the fetch, the rest share its promise. - const inFlight = new Map>(); - - return (collectionName: string) => { - const cached = schemaCache.get(renderingId, collectionName); - if (cached) return Promise.resolve(cached); - - const pending = inFlight.get(collectionName); - if (pending) return pending; - - const promise = workflowPort - .getCollectionSchema(collectionName, runId) - .then(schema => { - schemaCache.set(renderingId, collectionName, schema); - - return schema; - }) - .finally(() => inFlight.delete(collectionName)); - - inFlight.set(collectionName, promise); - - return promise; - }; + return (collectionName: string) => + schemaCache.getOrFetch(renderingId, collectionName, () => + workflowPort.getCollectionSchema(collectionName, runId), + ); } function fieldDisplayName(schema: CollectionSchema | null, name: string): string { diff --git a/packages/workflow-executor/src/schema-cache.ts b/packages/workflow-executor/src/schema-cache.ts index 134251387c..03cb329332 100644 --- a/packages/workflow-executor/src/schema-cache.ts +++ b/packages/workflow-executor/src/schema-cache.ts @@ -15,6 +15,9 @@ interface Entry { // share an entry, or one team's labels/visible fields would leak into another's run. export default class SchemaCache { private readonly store = new Map>(); + // In-flight fetches, so concurrent callers for the same (rendering, collection) — across + // different getters, reads, or runs — share one fetch instead of stampeding a cold cache. + private readonly inFlight = new Map>>(); private readonly ttlMs: number; private readonly now: () => number; @@ -23,6 +26,43 @@ export default class SchemaCache { this.now = now; } + // Read-through with global de-duplication. A rejected fetch is not cached and clears the + // in-flight slot, so the next caller retries. + getOrFetch( + renderingId: number, + collectionName: string, + fetch: () => Promise, + ): Promise { + const cached = this.get(renderingId, collectionName); + if (cached) return Promise.resolve(cached); + + const pending = this.inFlight.get(renderingId)?.get(collectionName); + if (pending) return pending; + + let byCollection = this.inFlight.get(renderingId); + + if (!byCollection) { + byCollection = new Map(); + this.inFlight.set(renderingId, byCollection); + } + + const inFlight = byCollection; + const promise = fetch() + .then(schema => { + this.set(renderingId, collectionName, schema); + + return schema; + }) + .finally(() => { + inFlight.delete(collectionName); + if (inFlight.size === 0) this.inFlight.delete(renderingId); + }); + + inFlight.set(collectionName, promise); + + return promise; + } + get(renderingId: number, collectionName: string): CollectionSchema | undefined { const byCollection = this.store.get(renderingId); const entry = byCollection?.get(collectionName); diff --git a/packages/workflow-executor/test/hydrate-step-execution-data.test.ts b/packages/workflow-executor/test/hydrate-step-execution-data.test.ts index 06b68cc698..7b6e1ec9f3 100644 --- a/packages/workflow-executor/test/hydrate-step-execution-data.test.ts +++ b/packages/workflow-executor/test/hydrate-step-execution-data.test.ts @@ -440,6 +440,27 @@ describe('makeSchemaGetter', () => { results.forEach(r => expect(r).toMatchObject({ collectionName: 'customers' })); }); + it('de-duplicates across independent getters sharing one cache (global, no stampede)', async () => { + // Two separate getters (e.g. two concurrent reads / two runs of one rendering) over the SAME + // cache must still collapse to a single fetch while one is in flight. + const cache = new SchemaCache(); + let resolveFetch: (s: CollectionSchema) => void = () => undefined; + const deferred = new Promise(resolve => { + resolveFetch = resolve; + }); + const getCollectionSchema = jest.fn().mockReturnValue(deferred); + const workflowPort = { getCollectionSchema } as unknown as WorkflowPort; + + const getterA = makeSchemaGetter(cache, workflowPort, 'run-1', 1); + const getterB = makeSchemaGetter(cache, workflowPort, 'run-2', 1); // same rendering, different getter + + const all = Promise.all([getterA('customers'), getterB('customers')]); + resolveFetch(makeSchema()); + await all; + + expect(getCollectionSchema).toHaveBeenCalledTimes(1); + }); + it('clears the in-flight entry so a failed fetch can be retried', async () => { const cache = new SchemaCache(); const getCollectionSchema = jest diff --git a/packages/workflow-executor/test/schema-cache.test.ts b/packages/workflow-executor/test/schema-cache.test.ts index 178f2927ae..1ee803346c 100644 --- a/packages/workflow-executor/test/schema-cache.test.ts +++ b/packages/workflow-executor/test/schema-cache.test.ts @@ -153,4 +153,82 @@ describe('SchemaCache', () => { expect([...cache.entriesForRendering(R1)]).toHaveLength(0); }); }); + + describe('getOrFetch', () => { + it('returns the cached schema without fetching on a hit', async () => { + const cache = new SchemaCache(); + const cached = makeSchema('customers'); + cache.set(R1, 'customers', cached); + const fetch = jest.fn(); + + await expect(cache.getOrFetch(R1, 'customers', fetch)).resolves.toBe(cached); + expect(fetch).not.toHaveBeenCalled(); + }); + + it('fetches and caches on a miss', async () => { + const cache = new SchemaCache(); + const fetched = makeSchema('customers'); + const fetch = jest.fn().mockResolvedValue(fetched); + + await expect(cache.getOrFetch(R1, 'customers', fetch)).resolves.toBe(fetched); + expect(cache.get(R1, 'customers')).toBe(fetched); + // Second call is now a cache hit — no second fetch. + await cache.getOrFetch(R1, 'customers', fetch); + expect(fetch).toHaveBeenCalledTimes(1); + }); + + it('shares one in-flight fetch among concurrent callers', async () => { + const cache = new SchemaCache(); + let resolveFetch: (s: CollectionSchema) => void = () => undefined; + const fetch = jest.fn().mockReturnValue( + new Promise(r => { + resolveFetch = r; + }), + ); + + const all = Promise.all([ + cache.getOrFetch(R1, 'customers', fetch), + cache.getOrFetch(R1, 'customers', fetch), + cache.getOrFetch(R1, 'customers', fetch), + ]); + resolveFetch(makeSchema('customers')); + await all; + + expect(fetch).toHaveBeenCalledTimes(1); + }); + + it('keeps fetches for the same collection in different renderings separate', async () => { + const cache = new SchemaCache(); + const fetch = jest + .fn() + .mockResolvedValueOnce(makeSchema('customers', 'Clients')) + .mockResolvedValueOnce(makeSchema('customers', 'Accounts')); + + const [a, b] = await Promise.all([ + cache.getOrFetch(R1, 'customers', fetch), + cache.getOrFetch(R2, 'customers', fetch), + ]); + + expect(fetch).toHaveBeenCalledTimes(2); + expect(a.collectionDisplayName).toBe('Clients'); + expect(b.collectionDisplayName).toBe('Accounts'); + }); + + it('does not cache a rejected fetch and clears the in-flight slot for retry', async () => { + const cache = new SchemaCache(); + const fetch = jest + .fn() + .mockRejectedValueOnce(new Error('transient')) + .mockResolvedValueOnce(makeSchema('customers')); + + await expect(cache.getOrFetch(R1, 'customers', fetch)).rejects.toThrow('transient'); + expect(cache.get(R1, 'customers')).toBeUndefined(); + + // In-flight slot was cleared — a retry triggers a fresh fetch and succeeds. + await expect(cache.getOrFetch(R1, 'customers', fetch)).resolves.toMatchObject({ + collectionName: 'customers', + }); + expect(fetch).toHaveBeenCalledTimes(2); + }); + }); }); From be7436772c354157f87c779b7c8af65d13f6c6f1 Mon Sep 17 00:00:00 2001 From: Pierre Merlet Date: Tue, 2 Jun 2026 19:42:09 +0200 Subject: [PATCH 8/8] style(workflow-executor): trim over-explanatory comments from the hydration code Co-Authored-By: Claude Opus 4.8 --- .../src/executors/record-step-executor.ts | 2 -- .../src/hydrate-step-execution-data.ts | 10 +++------- packages/workflow-executor/src/runner.ts | 2 -- packages/workflow-executor/src/schema-cache.ts | 15 ++++----------- 4 files changed, 7 insertions(+), 22 deletions(-) diff --git a/packages/workflow-executor/src/executors/record-step-executor.ts b/packages/workflow-executor/src/executors/record-step-executor.ts index fd777c3a77..3027e0f45c 100644 --- a/packages/workflow-executor/src/executors/record-step-executor.ts +++ b/packages/workflow-executor/src/executors/record-step-executor.ts @@ -68,8 +68,6 @@ export default abstract class RecordStepExecutor< } protected async getCollectionSchema(collectionName: string): Promise { - // Lazily build a rendering-scoped getter once per executor instance (one runId/rendering), - // shared across this step's getCollectionSchema calls — same keying as the read path. this.schemaGetter ??= makeSchemaGetter( this.context.schemaCache, this.context.workflowPort, diff --git a/packages/workflow-executor/src/hydrate-step-execution-data.ts b/packages/workflow-executor/src/hydrate-step-execution-data.ts index 4908e8baff..c670c382c9 100644 --- a/packages/workflow-executor/src/hydrate-step-execution-data.ts +++ b/packages/workflow-executor/src/hydrate-step-execution-data.ts @@ -15,9 +15,7 @@ import { extractErrorMessage } from './errors'; export type SchemaGetter = (collectionName: string) => Promise; -// runId scopes the server fetch (it resolves the run's rendering); renderingId scopes the cache so -// entries are never shared across renderings of one environment. Fetch de-duplication and caching -// live in SchemaCache.getOrFetch, so concurrent callers share one fetch regardless of getter. +// runId fetches the run's rendering server-side; renderingId scopes the cache. export function makeSchemaGetter( schemaCache: SchemaCache, workflowPort: WorkflowPort, @@ -51,9 +49,8 @@ function hydrateRelationResult( }; } -// Pure transform — assumes the schema fetch already happened (or failed to null). Split from the -// async wrapper so the wrapper can guard it: a throw here (malformed row) is caught, logged, and -// the raw execution returned rather than failing the whole read. +// Split from the async wrapper so a throw on a malformed row can be caught and the raw execution +// returned instead of failing the whole read. function hydrate( execution: Exclude< StepExecutionData, @@ -160,7 +157,6 @@ export default async function hydrateStepExecutionData( try { return hydrate(execution, schema); } catch (error) { - // A single malformed/legacy row must not take down the whole read — return it un-hydrated. logger?.error('Failed to hydrate step execution displayNames; returning the raw execution', { type: execution.type, stepIndex: execution.stepIndex, diff --git a/packages/workflow-executor/src/runner.ts b/packages/workflow-executor/src/runner.ts index 57c3159b45..3f2e59a49a 100644 --- a/packages/workflow-executor/src/runner.ts +++ b/packages/workflow-executor/src/runner.ts @@ -153,8 +153,6 @@ export default class Runner { } } - // renderingId (from the requesting user) scopes the schema cache; the schema content itself is - // resolved server-side from the run's rendering via runId in getCollectionSchema. async getRunStepExecutions( runId: string, renderingId: number, diff --git a/packages/workflow-executor/src/schema-cache.ts b/packages/workflow-executor/src/schema-cache.ts index 03cb329332..38c4ee6305 100644 --- a/packages/workflow-executor/src/schema-cache.ts +++ b/packages/workflow-executor/src/schema-cache.ts @@ -1,7 +1,6 @@ import type { CollectionSchema } from './types/validated/collection'; -// 10 minutes. By design, a label edited in the rendering can take up to this long to appear on the -// hydration read path (GET /runs/:runId) — an accepted freshness/load trade-off, not a bug. +// A label edit can take up to the TTL to surface on reads — an accepted freshness/load trade-off. const DEFAULT_TTL_MS = 10 * 60 * 1000; interface Entry { @@ -9,14 +8,10 @@ interface Entry { fetchedAt: number; } -// Scoped by renderingId: collection/field/action displayNames AND the visible field set are -// rendering-specific (see forestadmin-server getCollectionSchema, which resolves them from the -// run's rendering layout). The same collection name in two renderings of one environment must NOT -// share an entry, or one team's labels/visible fields would leak into another's run. +// Keyed by renderingId: displayNames and the visible field set are rendering-specific, so the same +// collection in two renderings of one environment must not share an entry. export default class SchemaCache { private readonly store = new Map>(); - // In-flight fetches, so concurrent callers for the same (rendering, collection) — across - // different getters, reads, or runs — share one fetch instead of stampeding a cold cache. private readonly inFlight = new Map>>(); private readonly ttlMs: number; private readonly now: () => number; @@ -26,8 +21,7 @@ export default class SchemaCache { this.now = now; } - // Read-through with global de-duplication. A rejected fetch is not cached and clears the - // in-flight slot, so the next caller retries. + // Read-through with global de-dup; a rejected fetch is not cached (retryable). getOrFetch( renderingId: number, collectionName: string, @@ -89,7 +83,6 @@ export default class SchemaCache { byCollection.set(collectionName, { schema, fetchedAt: this.now() }); } - // Yields the non-expired entries for a single rendering; deletes stale ones along the way. *entriesForRendering(renderingId: number): IterableIterator<[string, CollectionSchema]> { const byCollection = this.store.get(renderingId);