diff --git a/packages/workflow-executor/src/adapters/agent-client-agent-port.ts b/packages/workflow-executor/src/adapters/agent-client-agent-port.ts index 24bec25231..c1889cdea0 100644 --- a/packages/workflow-executor/src/adapters/agent-client-agent-port.ts +++ b/packages/workflow-executor/src/adapters/agent-client-agent-port.ts @@ -73,7 +73,7 @@ export default class AgentClientAgentPort implements AgentPort { async getRecord({ collection, id, fields }: GetRecordQuery, user: StepUser): Promise { return this.callAgent('getRecord', async () => { const client = this.createClient(user); - const schema = this.resolveSchema(collection); + const schema = this.resolveSchema(collection, user.renderingId); const records = await client.collection(collection).list>({ filters: buildPkFilter(schema.primaryKeyFields, id), pagination: { size: 1, number: 1 }, @@ -116,10 +116,10 @@ export default class AgentClientAgentPort implements AgentPort { ): Promise { return this.callAgent('getRelatedData', async () => { const client = this.createClient(user); - const parentSchema = this.resolveSchema(collection); + const parentSchema = this.resolveSchema(collection, user.renderingId); const relationField = parentSchema.fields.find(f => f.fieldName === relation); const relatedCollectionName = relationField?.relatedCollectionName ?? relation; - const relatedSchema = this.resolveSchema(relatedCollectionName); + const relatedSchema = this.resolveSchema(relatedCollectionName, user.renderingId); const records = await client .collection(collection) @@ -186,7 +186,7 @@ export default class AgentClientAgentPort implements AgentPort { return createRemoteAgentClient({ url: this.agentUrl, token, - actionEndpoints: this.buildActionEndpoints(), + actionEndpoints: this.buildActionEndpoints(user.renderingId), }); } @@ -212,10 +212,10 @@ export default class AgentClientAgentPort implements AgentPort { } } - private buildActionEndpoints(): ActionEndpointsByCollection { + private buildActionEndpoints(renderingId: number): ActionEndpointsByCollection { const endpoints: ActionEndpointsByCollection = {}; - for (const [collectionName, schema] of this.schemaCache) { + for (const [collectionName, schema] of this.schemaCache.entriesForRendering(renderingId)) { endpoints[collectionName] = {}; for (const action of schema.actions) { @@ -238,8 +238,8 @@ export default class AgentClientAgentPort implements AgentPort { return endpoints; } - private resolveSchema(collectionName: string): CollectionSchema { - const cached = this.schemaCache.get(collectionName); + private resolveSchema(collectionName: string, renderingId: number): CollectionSchema { + const cached = this.schemaCache.get(renderingId, collectionName); if (!cached) { // eslint-disable-next-line no-console diff --git a/packages/workflow-executor/src/executors/base-step-executor.ts b/packages/workflow-executor/src/executors/base-step-executor.ts index 37b5754c60..406a45870f 100644 --- a/packages/workflow-executor/src/executors/base-step-executor.ts +++ b/packages/workflow-executor/src/executors/base-step-executor.ts @@ -26,6 +26,7 @@ import { extractErrorMessage, } from '../errors'; import patchBodySchemas from '../http/pending-data-validators'; +import hydrateStepExecutionData, { makeSchemaGetter } from '../hydrate-step-execution-data'; import StepSummaryBuilder from './summary/step-summary-builder'; export default abstract class BaseStepExecutor @@ -281,15 +282,24 @@ export default abstract class BaseStepExecutor { + const getSchema = makeSchemaGetter( + this.context.schemaCache, + this.context.workflowPort, + this.context.runId, + this.context.user.renderingId, + ); + const summaries = await Promise.all( + this.context.previousSteps.map(async ({ stepDefinition, stepOutcome }) => { const execution = allStepExecutions.find(e => e.stepIndex === stepOutcome.stepIndex); + const hydrated = execution + ? await hydrateStepExecutionData(execution, getSchema, this.context.logger) + : undefined; - return StepSummaryBuilder.build(stepDefinition, stepOutcome, execution); - }) - .join('\n\n'); + return StepSummaryBuilder.build(stepDefinition, stepOutcome, hydrated); + }), + ); - return [new SystemMessage(summary)]; + return [new SystemMessage(summaries.join('\n\n'))]; } private static mergeLeadingSystemMessages(messages: BaseMessage[]): BaseMessage[] { diff --git a/packages/workflow-executor/src/executors/load-related-record-step-executor.ts b/packages/workflow-executor/src/executors/load-related-record-step-executor.ts index 2d985faf9c..ed58e56eb9 100644 --- a/packages/workflow-executor/src/executors/load-related-record-step-executor.ts +++ b/packages/workflow-executor/src/executors/load-related-record-step-executor.ts @@ -101,7 +101,6 @@ export default class LoadRelatedRecordStepExecutor extends RecordStepExecutor { - const { selectedRecordRef, name, displayName } = target; + const { selectedRecordRef, name } = target; const { relatedData, bestIndex, suggestedFields } = await this.selectBestFromRelatedData( target, @@ -122,7 +121,7 @@ export default class LoadRelatedRecordStepExecutor extends RecordStepExecutor f.fieldName === name); @@ -162,7 +161,7 @@ export default class LoadRelatedRecordStepExecutor extends RecordStepExecutor, + target: Pick, existingExecution: LoadRelatedRecordStepExecutionData | undefined, ): Promise { - const { selectedRecordRef, name, displayName } = target; + const { selectedRecordRef, name } = target; await this.context.runStore.saveStepExecution(this.context.runId, { ...existingExecution, type: 'load-related-record', stepIndex: this.context.stepIndex, - executionParams: { displayName, name }, - executionResult: { relation: { name, displayName }, record }, + executionParams: { name }, + executionResult: { relation: { name }, record }, selectedRecordRef, }); diff --git a/packages/workflow-executor/src/executors/read-record-step-executor.ts b/packages/workflow-executor/src/executors/read-record-step-executor.ts index 950f6c6bc5..b331f5c12b 100644 --- a/packages/workflow-executor/src/executors/read-record-step-executor.ts +++ b/packages/workflow-executor/src/executors/read-record-step-executor.ts @@ -64,7 +64,7 @@ export default class ReadRecordStepExecutor extends RecordStepExecutor ({ name, displayName })), + fields: fieldResults.map(({ name }) => ({ name })), }, executionResult: { fields: fieldResults }, selectedRecordRef, @@ -129,12 +129,11 @@ export default class ReadRecordStepExecutor extends RecordStepExecutor { const field = this.findField(schema, name); - if (!field) return { error: `Field not found: ${name}`, name, displayName: name }; + if (!field) return { error: `Field not found: ${name}`, name }; return { value: values[field.fieldName], name: field.fieldName, - displayName: field.displayName, }; }); } diff --git a/packages/workflow-executor/src/executors/record-step-executor.ts b/packages/workflow-executor/src/executors/record-step-executor.ts index 4430dfacc4..3027e0f45c 100644 --- a/packages/workflow-executor/src/executors/record-step-executor.ts +++ b/packages/workflow-executor/src/executors/record-step-executor.ts @@ -1,3 +1,4 @@ +import type { SchemaGetter } from '../hydrate-step-execution-data'; import type { StepExecutionResult } from '../types/execution-context'; import type { CollectionSchema, FieldSchema, RecordRef } from '../types/validated/collection'; import type { StepDefinition } from '../types/validated/step-definition'; @@ -7,11 +8,14 @@ import { DynamicStructuredTool, HumanMessage, SystemMessage } from '@forestadmin import { z } from 'zod'; import { InvalidAIResponseError, InvalidPreRecordedArgsError, NoRecordsError } from '../errors'; +import { makeSchemaGetter } from '../hydrate-step-execution-data'; import BaseStepExecutor from './base-step-executor'; export default abstract class RecordStepExecutor< TStep extends StepDefinition = StepDefinition, > extends BaseStepExecutor { + private schemaGetter?: SchemaGetter; + protected buildOutcomeResult(outcome: { status: RecordStepStatus; error?: string; @@ -64,16 +68,14 @@ export default abstract class RecordStepExecutor< } protected async getCollectionSchema(collectionName: string): Promise { - const cached = this.context.schemaCache.get(collectionName); - if (cached) return cached; - - const schema = await this.context.workflowPort.getCollectionSchema( - collectionName, + this.schemaGetter ??= makeSchemaGetter( + this.context.schemaCache, + this.context.workflowPort, this.context.runId, + this.context.user.renderingId, ); - this.context.schemaCache.set(collectionName, schema); - return schema; + return this.schemaGetter(collectionName); } protected findField(schema: CollectionSchema, name: string): FieldSchema | undefined { diff --git a/packages/workflow-executor/src/executors/summary/step-execution-formatters.ts b/packages/workflow-executor/src/executors/summary/step-execution-formatters.ts index c9df62c0a8..5b9dd78e0a 100644 --- a/packages/workflow-executor/src/executors/summary/step-execution-formatters.ts +++ b/packages/workflow-executor/src/executors/summary/step-execution-formatters.ts @@ -1,14 +1,14 @@ import type { + DisplayedLoadRelatedRecordStepExecutionData, GuidanceStepExecutionData, - LoadRelatedRecordStepExecutionData, + HydratedStepExecutionData, McpStepExecutionData, - StepExecutionData, } from '../../types/step-execution-data'; export default class StepExecutionFormatters { // Returns null when no custom format is defined for the step type or when execution data // doesn't satisfy formatter preconditions — caller falls back to generic Input:/Output:. - static format(execution: StepExecutionData): string | null { + static format(execution: HydratedStepExecutionData): string | null { switch (execution.type) { case 'load-related-record': return StepExecutionFormatters.formatLoadRelatedRecord(execution); @@ -42,7 +42,7 @@ export default class StepExecutionFormatters { } private static formatLoadRelatedRecord( - execution: LoadRelatedRecordStepExecutionData, + execution: DisplayedLoadRelatedRecordStepExecutionData, ): string | null { const { executionResult } = execution; diff --git a/packages/workflow-executor/src/executors/summary/step-summary-builder.ts b/packages/workflow-executor/src/executors/summary/step-summary-builder.ts index aafbbb6013..23031e8999 100644 --- a/packages/workflow-executor/src/executors/summary/step-summary-builder.ts +++ b/packages/workflow-executor/src/executors/summary/step-summary-builder.ts @@ -1,4 +1,4 @@ -import type { StepExecutionData } from '../../types/step-execution-data'; +import type { HydratedStepExecutionData } from '../../types/step-execution-data'; import type { StepDefinition } from '../../types/validated/step-definition'; import type { StepOutcome } from '../../types/validated/step-outcome'; @@ -8,7 +8,7 @@ export default class StepSummaryBuilder { static build( step: StepDefinition, stepOutcome: StepOutcome, - execution: StepExecutionData | undefined, + execution: HydratedStepExecutionData | undefined, ): string { const prompt = step.prompt ?? '(no prompt)'; const header = `Step "${stepOutcome.stepId}" (index ${stepOutcome.stepIndex}):`; diff --git a/packages/workflow-executor/src/executors/trigger-record-action-step-executor.ts b/packages/workflow-executor/src/executors/trigger-record-action-step-executor.ts index 9e2df88d24..82bcbb7fa0 100644 --- a/packages/workflow-executor/src/executors/trigger-record-action-step-executor.ts +++ b/packages/workflow-executor/src/executors/trigger-record-action-step-executor.ts @@ -84,7 +84,6 @@ export default class TriggerRecordActionStepExecutor extends RecordStepExecutor< const target: ActionTarget = { selectedRecordRef, - displayName: pendingData.displayName, name: pendingData.name, }; @@ -112,7 +111,7 @@ export default class TriggerRecordActionStepExecutor extends RecordStepExecutor< ? { actionName: preRecordedArgs.actionDisplayName } : await this.selectAction(schema, step.prompt); const name = this.resolveActionName(schema, args.actionName); - const target: ActionTarget = { selectedRecordRef, displayName: args.actionName, name }; + const target: ActionTarget = { selectedRecordRef, name }; // Branch B -- fully automated: executor runs the action itself, so it cannot // handle forms (no UI to fill them). Reject form-bearing actions here. When the @@ -126,7 +125,7 @@ export default class TriggerRecordActionStepExecutor extends RecordStepExecutor< }, this.context.user, ); - if (hasForm) throw new UnsupportedActionFormError(target.displayName); + if (hasForm) throw new UnsupportedActionFormError(args.actionName); return this.executeOnExecutor(target); } @@ -135,7 +134,7 @@ export default class TriggerRecordActionStepExecutor extends RecordStepExecutor< await this.context.runStore.saveStepExecution(this.context.runId, { type: 'trigger-action', stepIndex: this.context.stepIndex, - pendingData: { displayName: target.displayName, name: target.name }, + pendingData: { name: target.name }, selectedRecordRef: target.selectedRecordRef, }); @@ -144,7 +143,7 @@ export default class TriggerRecordActionStepExecutor extends RecordStepExecutor< /** Branch B — executor runs the action via agentPort, then persists the result. */ private async executeOnExecutor(target: ActionTarget): Promise { - const { selectedRecordRef, displayName, name } = target; + const { selectedRecordRef, name } = target; await this.context.runStore.saveStepExecution(this.context.runId, { type: 'trigger-action', @@ -165,7 +164,7 @@ export default class TriggerRecordActionStepExecutor extends RecordStepExecutor< await this.context.runStore.saveStepExecution(this.context.runId, { type: 'trigger-action', stepIndex: this.context.stepIndex, - executionParams: { displayName, name }, + executionParams: { name }, executionResult: { success: true, actionResult }, selectedRecordRef, idempotencyPhase: 'done', @@ -180,13 +179,13 @@ export default class TriggerRecordActionStepExecutor extends RecordStepExecutor< actionResult: unknown, existingExecution: TriggerRecordActionStepExecutionData, ): Promise { - const { selectedRecordRef, displayName, name } = target; + const { selectedRecordRef, name } = target; await this.context.runStore.saveStepExecution(this.context.runId, { ...existingExecution, type: 'trigger-action', stepIndex: this.context.stepIndex, - executionParams: { displayName, name }, + executionParams: { name }, executionResult: { success: true, actionResult }, selectedRecordRef, }); diff --git a/packages/workflow-executor/src/executors/update-record-step-executor.ts b/packages/workflow-executor/src/executors/update-record-step-executor.ts index 0b2bccbc8c..9eacac1cf4 100644 --- a/packages/workflow-executor/src/executors/update-record-step-executor.ts +++ b/packages/workflow-executor/src/executors/update-record-step-executor.ts @@ -189,9 +189,7 @@ export default class UpdateRecordStepExecutor extends RecordStepExecutor { const schema = await this.getCollectionSchema(selectedRecordRef.collectionName); - const fieldSchema = - this.findField(schema, pendingData?.name ?? '') ?? - this.findField(schema, pendingData?.displayName ?? ''); + const fieldSchema = this.findField(schema, pendingData?.name ?? ''); return coerceFieldValue(fieldSchema, value, selectedRecordRef.collectionName); } @@ -227,7 +225,6 @@ export default class UpdateRecordStepExecutor extends RecordStepExecutor { - const { selectedRecordRef, displayName, name, value } = target; + const { selectedRecordRef, name, value } = target; await this.context.runStore.saveStepExecution(this.context.runId, { ...existingExecution, @@ -280,7 +276,7 @@ export default class UpdateRecordStepExecutor extends RecordStepExecutor { - const steps = await this.options.runner.getRunStepExecutions(ctx.params.runId); + const rawRenderingId = (ctx.state.user as { renderingId?: unknown })?.renderingId; + const renderingId = + typeof rawRenderingId === 'number' ? rawRenderingId : Number(rawRenderingId); + + if (!Number.isFinite(renderingId)) { + ctx.status = 400; + ctx.body = { error: 'Missing or invalid renderingId in token' }; + + return; + } + + const steps = await this.options.runner.getRunStepExecutions(ctx.params.runId, renderingId); ctx.body = { steps }; } diff --git a/packages/workflow-executor/src/http/pending-data-validators.ts b/packages/workflow-executor/src/http/pending-data-validators.ts index d739e1b40d..8d4ffaa7d7 100644 --- a/packages/workflow-executor/src/http/pending-data-validators.ts +++ b/packages/workflow-executor/src/http/pending-data-validators.ts @@ -27,8 +27,7 @@ const loadRelatedRecordPatchSchema = z .object({ userConfirmed: z.boolean(), // User may intentionally switch to a different relation than the one the AI selected. - // The executor re-derives relatedCollectionName and displayName from FieldSchema when - // processing the confirmation. + // The executor re-derives relatedCollectionName from FieldSchema when processing the confirmation. name: z.string().min(1).optional(), // User may override the AI-selected record; must be non-empty when provided. // Required when overriding the relation name — the original record ID belongs to a diff --git a/packages/workflow-executor/src/hydrate-step-execution-data.ts b/packages/workflow-executor/src/hydrate-step-execution-data.ts new file mode 100644 index 0000000000..c670c382c9 --- /dev/null +++ b/packages/workflow-executor/src/hydrate-step-execution-data.ts @@ -0,0 +1,168 @@ +import type { Logger } from './ports/logger-port'; +import type { WorkflowPort } from './ports/workflow-port'; +import type SchemaCache from './schema-cache'; +import type { + DisplayedLoadRelatedRecordStepExecutionData, + HydratedStepExecutionData, + LoadRelatedRecordStepExecutionData, + StepExecutionData, +} from './types/step-execution-data'; +import type { CollectionSchema } from './types/validated/collection'; + +import { extractErrorMessage } from './errors'; + +// displayName is not persisted — it is rebuilt from the current schema so labels never go stale. + +export type SchemaGetter = (collectionName: string) => Promise; + +// runId fetches the run's rendering server-side; renderingId scopes the cache. +export function makeSchemaGetter( + schemaCache: SchemaCache, + workflowPort: WorkflowPort, + runId: string, + renderingId: number, +): SchemaGetter { + return (collectionName: string) => + schemaCache.getOrFetch(renderingId, collectionName, () => + workflowPort.getCollectionSchema(collectionName, runId), + ); +} + +function fieldDisplayName(schema: CollectionSchema | null, name: string): string { + return schema?.fields.find(f => f.fieldName === name)?.displayName ?? name; +} + +function actionDisplayName(schema: CollectionSchema | null, name: string): string { + return schema?.actions.find(a => a.name === name)?.displayName ?? name; +} + +function hydrateRelationResult( + result: LoadRelatedRecordStepExecutionData['executionResult'], + schema: CollectionSchema | null, +): DisplayedLoadRelatedRecordStepExecutionData['executionResult'] { + if (result === undefined) return undefined; + if ('skipped' in result) return result; + + return { + ...result, + relation: { ...result.relation, displayName: fieldDisplayName(schema, result.relation.name) }, + }; +} + +// Split from the async wrapper so a throw on a malformed row can be caught and the raw execution +// returned instead of failing the whole read. +function hydrate( + execution: Exclude< + StepExecutionData, + { type: 'condition' } | { type: 'mcp' } | { type: 'record' } | { type: 'guidance' } + >, + schema: CollectionSchema | null, +): HydratedStepExecutionData { + switch (execution.type) { + case 'read-record': + return { + ...execution, + executionParams: { + fields: execution.executionParams.fields.map(f => ({ + ...f, + displayName: fieldDisplayName(schema, f.name), + })), + }, + executionResult: { + fields: execution.executionResult.fields.map(f => ({ + ...f, + displayName: fieldDisplayName(schema, f.name), + })), + }, + }; + + case 'update-record': + return { + ...execution, + executionParams: execution.executionParams && { + ...execution.executionParams, + displayName: fieldDisplayName(schema, execution.executionParams.name), + }, + pendingData: execution.pendingData && { + ...execution.pendingData, + displayName: fieldDisplayName(schema, execution.pendingData.name), + }, + }; + + case 'trigger-action': + return { + ...execution, + executionParams: execution.executionParams && { + ...execution.executionParams, + displayName: actionDisplayName(schema, execution.executionParams.name), + }, + pendingData: execution.pendingData && { + ...execution.pendingData, + displayName: actionDisplayName(schema, execution.pendingData.name), + }, + }; + + case 'load-related-record': + return { + ...execution, + pendingData: execution.pendingData && { + ...execution.pendingData, + displayName: fieldDisplayName(schema, execution.pendingData.name), + }, + executionParams: execution.executionParams && { + ...execution.executionParams, + displayName: fieldDisplayName(schema, execution.executionParams.name), + }, + executionResult: hydrateRelationResult(execution.executionResult, schema), + }; + + default: + return execution; + } +} + +export default async function hydrateStepExecutionData( + execution: StepExecutionData, + getSchema: SchemaGetter, + logger?: Logger, +): Promise { + if ( + execution.type === 'condition' || + execution.type === 'mcp' || + execution.type === 'record' || + execution.type === 'guidance' + ) { + return execution; + } + + const { collectionName } = execution.selectedRecordRef; + + // A missing/renamed collection must not break a read — fall back to technical names. + let schema: CollectionSchema | null = null; + + try { + schema = await getSchema(collectionName); + } catch (error) { + logger?.warn( + 'Failed to fetch collection schema for displayName hydration; using technical names', + { + collection: collectionName, + stepIndex: execution.stepIndex, + error: extractErrorMessage(error), + }, + ); + schema = null; + } + + try { + return hydrate(execution, schema); + } catch (error) { + logger?.error('Failed to hydrate step execution displayNames; returning the raw execution', { + type: execution.type, + stepIndex: execution.stepIndex, + error: extractErrorMessage(error), + }); + + return execution as HydratedStepExecutionData; + } +} diff --git a/packages/workflow-executor/src/index.ts b/packages/workflow-executor/src/index.ts index 14435b4850..a1034c9e07 100644 --- a/packages/workflow-executor/src/index.ts +++ b/packages/workflow-executor/src/index.ts @@ -40,6 +40,15 @@ export type { GuidanceStepExecutionData, ExecutedStepExecutionData, StepExecutionData, + DisplayedFieldRef, + DisplayedActionRef, + DisplayedRelationRef, + DisplayedFieldReadResult, + DisplayedReadRecordStepExecutionData, + DisplayedUpdateRecordStepExecutionData, + DisplayedTriggerRecordActionStepExecutionData, + DisplayedLoadRelatedRecordStepExecutionData, + HydratedStepExecutionData, } from './types/step-execution-data'; export type { diff --git a/packages/workflow-executor/src/runner.ts b/packages/workflow-executor/src/runner.ts index c950b19f95..3f2e59a49a 100644 --- a/packages/workflow-executor/src/runner.ts +++ b/packages/workflow-executor/src/runner.ts @@ -7,7 +7,7 @@ import type { RunStore } from './ports/run-store'; import type { AvailableRunDispatch, MalformedRunInfo, WorkflowPort } from './ports/workflow-port'; import type SchemaCache from './schema-cache'; import type { AvailableStepExecution, StepExecutionResult } from './types/execution-context'; -import type { StepExecutionData } from './types/step-execution-data'; +import type { HydratedStepExecutionData } from './types/step-execution-data'; import type { StepOutcome } from './types/validated/step-outcome'; import ConsoleLogger from './adapters/console-logger'; @@ -20,6 +20,7 @@ import { extractErrorMessage, } from './errors'; import StepExecutorFactory from './executors/step-executor-factory'; +import hydrateStepExecutionData, { makeSchemaGetter } from './hydrate-step-execution-data'; import InFlightRunRegistry from './in-flight-run-registry'; import RemoteToolFetcher from './remote-tool-fetcher'; import { stepTypeToOutcomeType } from './types/validated/step-outcome'; @@ -152,8 +153,21 @@ export default class Runner { } } - async getRunStepExecutions(runId: string): Promise { - return this.config.runStore.getStepExecutions(runId); + async getRunStepExecutions( + runId: string, + renderingId: number, + ): Promise { + const executions = await this.config.runStore.getStepExecutions(runId); + const getSchema = makeSchemaGetter( + this.config.schemaCache, + this.config.workflowPort, + runId, + renderingId, + ); + + return Promise.all( + executions.map(execution => hydrateStepExecutionData(execution, getSchema, this.logger)), + ); } async triggerPoll( diff --git a/packages/workflow-executor/src/schema-cache.ts b/packages/workflow-executor/src/schema-cache.ts index 68b1a3db0b..38c4ee6305 100644 --- a/packages/workflow-executor/src/schema-cache.ts +++ b/packages/workflow-executor/src/schema-cache.ts @@ -1,9 +1,18 @@ import type { CollectionSchema } from './types/validated/collection'; -const DEFAULT_TTL_MS = 10 * 60 * 1000; // 10 minutes +// A label edit can take up to the TTL to surface on reads — an accepted freshness/load trade-off. +const DEFAULT_TTL_MS = 10 * 60 * 1000; +interface Entry { + schema: CollectionSchema; + fetchedAt: number; +} + +// Keyed by renderingId: displayNames and the visible field set are rendering-specific, so the same +// collection in two renderings of one environment must not share an entry. export default class SchemaCache { - private readonly store = new Map(); + private readonly store = new Map>(); + private readonly inFlight = new Map>>(); private readonly ttlMs: number; private readonly now: () => number; @@ -12,13 +21,50 @@ export default class SchemaCache { this.now = now; } - get(collectionName: string): CollectionSchema | undefined { - const entry = this.store.get(collectionName); + // Read-through with global de-dup; a rejected fetch is not cached (retryable). + getOrFetch( + renderingId: number, + collectionName: string, + fetch: () => Promise, + ): Promise { + const cached = this.get(renderingId, collectionName); + if (cached) return Promise.resolve(cached); + + const pending = this.inFlight.get(renderingId)?.get(collectionName); + if (pending) return pending; + + let byCollection = this.inFlight.get(renderingId); + + if (!byCollection) { + byCollection = new Map(); + this.inFlight.set(renderingId, byCollection); + } + + const inFlight = byCollection; + const promise = fetch() + .then(schema => { + this.set(renderingId, collectionName, schema); + + return schema; + }) + .finally(() => { + inFlight.delete(collectionName); + if (inFlight.size === 0) this.inFlight.delete(renderingId); + }); + + inFlight.set(collectionName, promise); + + return promise; + } + + get(renderingId: number, collectionName: string): CollectionSchema | undefined { + const byCollection = this.store.get(renderingId); + const entry = byCollection?.get(collectionName); if (!entry) return undefined; if (this.now() - entry.fetchedAt > this.ttlMs) { - this.store.delete(collectionName); + byCollection!.delete(collectionName); return undefined; } @@ -26,19 +72,29 @@ export default class SchemaCache { return entry.schema; } - set(collectionName: string, schema: CollectionSchema): void { - this.store.set(collectionName, { schema, fetchedAt: this.now() }); + set(renderingId: number, collectionName: string, schema: CollectionSchema): void { + let byCollection = this.store.get(renderingId); + + if (!byCollection) { + byCollection = new Map(); + this.store.set(renderingId, byCollection); + } + + byCollection.set(collectionName, { schema, fetchedAt: this.now() }); } - // Yields non-expired entries; deletes stale ones along the way. - *[Symbol.iterator](): IterableIterator<[string, CollectionSchema]> { + *entriesForRendering(renderingId: number): IterableIterator<[string, CollectionSchema]> { + const byCollection = this.store.get(renderingId); + + if (!byCollection) return; + const now = this.now(); - for (const [key, entry] of this.store) { + for (const [collectionName, entry] of byCollection) { if (now - entry.fetchedAt <= this.ttlMs) { - yield [key, entry.schema]; + yield [collectionName, entry.schema]; } else { - this.store.delete(key); + byCollection.delete(collectionName); } } } diff --git a/packages/workflow-executor/src/types/step-execution-data.ts b/packages/workflow-executor/src/types/step-execution-data.ts index 1c37b1460d..26ea1256f6 100644 --- a/packages/workflow-executor/src/types/step-execution-data.ts +++ b/packages/workflow-executor/src/types/step-execution-data.ts @@ -36,9 +36,10 @@ export interface ConditionStepExecutionData extends BaseStepExecutionData { // -- Shared -- +// Persisted refs hold the technical name only; displayName is re-derived from the schema at read +// time (see hydrate-step-execution-data.ts). export interface FieldRef { name: string; - displayName: string; } export type FieldWithValue = FieldRef & { value: unknown }; @@ -79,14 +80,12 @@ export interface UpdateRecordStepExecutionData export interface ActionRef { name: string; - displayName: string; } // Intentionally separate from ActionRef/FieldRef: expected to gain relation-specific // fields (e.g. relationType) in a future iteration. export interface RelationRef { name: string; - displayName: string; } export interface TriggerRecordActionStepExecutionData @@ -176,3 +175,57 @@ export type StepExecutionData = | GuidanceStepExecutionData; export type ExecutedStepExecutionData = StepExecutionData; + +// -- Hydrated view (displayName re-derived from the schema at read time) -- + +export interface DisplayedFieldRef extends FieldRef { + displayName: string; +} + +export interface DisplayedActionRef extends ActionRef { + displayName: string; +} + +export interface DisplayedRelationRef extends RelationRef { + displayName: string; +} + +export type DisplayedFieldReadResult = FieldReadResult & { displayName: string }; + +export interface DisplayedReadRecordStepExecutionData + extends Omit { + executionParams: { fields: DisplayedFieldRef[] }; + executionResult: { fields: DisplayedFieldReadResult[] }; +} + +export interface DisplayedUpdateRecordStepExecutionData + extends Omit { + executionParams?: DisplayedFieldRef & { value: unknown }; + pendingData?: DisplayedFieldRef & { value: unknown }; +} + +export interface DisplayedTriggerRecordActionStepExecutionData + extends Omit { + executionParams?: DisplayedActionRef; + pendingData?: DisplayedActionRef; +} + +export interface DisplayedLoadRelatedRecordStepExecutionData + extends Omit< + LoadRelatedRecordStepExecutionData, + 'executionParams' | 'executionResult' | 'pendingData' + > { + pendingData?: LoadRelatedRecordPendingData & { displayName: string }; + executionParams?: DisplayedRelationRef; + executionResult?: { relation: DisplayedRelationRef; record: RecordRef } | { skipped: true }; +} + +export type HydratedStepExecutionData = + | ConditionStepExecutionData + | DisplayedReadRecordStepExecutionData + | DisplayedUpdateRecordStepExecutionData + | DisplayedTriggerRecordActionStepExecutionData + | RecordStepExecutionData + | DisplayedLoadRelatedRecordStepExecutionData + | McpStepExecutionData + | GuidanceStepExecutionData; diff --git a/packages/workflow-executor/test/adapters/agent-client-agent-port.test.ts b/packages/workflow-executor/test/adapters/agent-client-agent-port.test.ts index aacaff0652..981cd41db7 100644 --- a/packages/workflow-executor/test/adapters/agent-client-agent-port.test.ts +++ b/packages/workflow-executor/test/adapters/agent-client-agent-port.test.ts @@ -46,7 +46,7 @@ describe('AgentClientAgentPort', () => { mockedCreateRemoteAgentClient.mockReturnValue(mocks.client as any); const schemaCache = new SchemaCache(); - schemaCache.set('users', { + schemaCache.set(1, 'users', { collectionName: 'users', collectionDisplayName: 'Users', primaryKeyFields: ['id'], @@ -59,7 +59,7 @@ describe('AgentClientAgentPort', () => { { name: 'archive', displayName: 'Archive', endpoint: '/forest/actions/archive' }, ], }); - schemaCache.set('orders', { + schemaCache.set(1, 'orders', { collectionName: 'orders', collectionDisplayName: 'Orders', primaryKeyFields: ['tenantId', 'orderId'], @@ -69,7 +69,7 @@ describe('AgentClientAgentPort', () => { ], actions: [], }); - schemaCache.set('posts', { + schemaCache.set(1, 'posts', { collectionName: 'posts', collectionDisplayName: 'Posts', primaryKeyFields: ['id'], @@ -369,7 +369,7 @@ describe('AgentClientAgentPort', () => { it('should restore snake_case field names in recordId and values when agent returns camelCase keys', async () => { const cache = new SchemaCache(); - cache.set('users', { + cache.set(1, 'users', { collectionName: 'users', collectionDisplayName: 'Users', primaryKeyFields: ['id'], @@ -383,7 +383,7 @@ describe('AgentClientAgentPort', () => { ], actions: [], }); - cache.set('posts', { + cache.set(1, 'posts', { collectionName: 'posts', collectionDisplayName: 'Posts', primaryKeyFields: ['post_id'], @@ -487,7 +487,7 @@ describe('AgentClientAgentPort', () => { describe('buildActionEndpoints', () => { it('passes fields and hooks from schema to agent-client (supports Ruby agent fallback)', async () => { const schemaCache = new SchemaCache(); - schemaCache.set('users', { + schemaCache.set(1, 'users', { collectionName: 'users', collectionDisplayName: 'Users', primaryKeyFields: ['id'], diff --git a/packages/workflow-executor/test/executors/load-related-record-step-executor.test.ts b/packages/workflow-executor/test/executors/load-related-record-step-executor.test.ts index f7f7a25883..80721ea69b 100644 --- a/packages/workflow-executor/test/executors/load-related-record-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/load-related-record-step-executor.test.ts @@ -164,7 +164,6 @@ function makePendingExecution( type: 'load-related-record', stepIndex: 0, pendingData: { - displayName: 'Order', name: 'order', selectedRecordId: [99], suggestedFields: ['status', 'amount'], @@ -200,7 +199,7 @@ describe('LoadRelatedRecordStepExecutor', () => { expect.objectContaining({ type: 'load-related-record', stepIndex: 0, - executionParams: { displayName: 'Order', name: 'order' }, + executionParams: { name: 'order' }, executionResult: expect.objectContaining({ record: expect.objectContaining({ collectionName: 'orders', @@ -611,7 +610,6 @@ describe('LoadRelatedRecordStepExecutor', () => { type: 'load-related-record', stepIndex: 0, pendingData: { - displayName: 'Order', name: 'order', selectedRecordId: [99], suggestedFields: [], @@ -683,7 +681,6 @@ describe('LoadRelatedRecordStepExecutor', () => { 'run-1', expect.objectContaining({ pendingData: { - displayName: 'Order', name: 'order', selectedRecordId: [2], // record at index 1 suggestedFields: ['status'], @@ -762,7 +759,6 @@ describe('LoadRelatedRecordStepExecutor', () => { const agentPort = makeMockAgentPort(); const execution = makePendingExecution({ pendingData: { - displayName: 'Order', name: 'order', suggestedFields: ['status', 'amount'], selectedRecordId: [99], @@ -783,12 +779,11 @@ describe('LoadRelatedRecordStepExecutor', () => { 'run-1', expect.objectContaining({ type: 'load-related-record', - executionParams: { displayName: 'Order', name: 'order' }, + executionParams: { name: 'order' }, executionResult: expect.objectContaining({ record: expect.objectContaining({ collectionName: 'orders', recordId: [99] }), }), pendingData: expect.objectContaining({ - displayName: 'Order', name: 'order', selectedRecordId: [99], }), @@ -800,7 +795,6 @@ describe('LoadRelatedRecordStepExecutor', () => { const agentPort = makeMockAgentPort(); const execution = makePendingExecution({ pendingData: { - displayName: 'Order', name: 'order', suggestedFields: ['status', 'amount'], selectedRecordId: [42], @@ -833,7 +827,6 @@ describe('LoadRelatedRecordStepExecutor', () => { // Persisted state: AI suggested record [99], awaiting confirmation. const execution = makePendingExecution({ pendingData: { - displayName: 'Order', name: 'order', selectedRecordId: [99], suggestedFields: ['status', 'amount'], @@ -862,7 +855,6 @@ describe('LoadRelatedRecordStepExecutor', () => { expect.objectContaining({ type: 'load-related-record', pendingData: expect.objectContaining({ - displayName: 'Order', name: 'order', selectedRecordId: [99], // AI suggestion preserved }), @@ -879,7 +871,6 @@ describe('LoadRelatedRecordStepExecutor', () => { // AI suggested "order" (→ orders collection). User switches to "address" (→ addresses). const execution = makePendingExecution({ pendingData: { - displayName: 'Order', name: 'order', selectedRecordId: [99], suggestedFields: [], @@ -907,13 +898,12 @@ describe('LoadRelatedRecordStepExecutor', () => { // AI suggestion preserved on pendingData pendingData: expect.objectContaining({ name: 'order', - displayName: 'Order', selectedRecordId: [99], }), // User-overridden relation resolves to the addresses collection - executionParams: { name: 'address', displayName: 'Address' }, + executionParams: { name: 'address' }, executionResult: expect.objectContaining({ - relation: { name: 'address', displayName: 'Address' }, + relation: { name: 'address' }, record: expect.objectContaining({ collectionName: 'addresses', recordId: [7] }), }), }), @@ -936,7 +926,6 @@ describe('LoadRelatedRecordStepExecutor', () => { }); const execution = makePendingExecution({ pendingData: { - displayName: 'Order', name: 'order', suggestedFields: [], selectedRecordId: [99], @@ -977,7 +966,6 @@ describe('LoadRelatedRecordStepExecutor', () => { }); const execution = makePendingExecution({ pendingData: { - displayName: 'Order', name: 'order', suggestedFields: [], selectedRecordId: [99], @@ -1022,7 +1010,6 @@ describe('LoadRelatedRecordStepExecutor', () => { // User overrode AI's suggestion of 'order' to 'address' via PATCH const execution = makePendingExecution({ pendingData: { - displayName: 'Address', name: 'address', suggestedFields: [], selectedRecordId: [77], @@ -1053,7 +1040,6 @@ describe('LoadRelatedRecordStepExecutor', () => { const execution = makePendingExecution({ selectedRecordRef: { collectionName: 'customers', recordId: [42], stepIndex: 0 }, pendingData: { - displayName: 'Order', name: 'order', suggestedFields: [], selectedRecordId: [99], @@ -1081,7 +1067,6 @@ describe('LoadRelatedRecordStepExecutor', () => { const agentPort = makeMockAgentPort(); const execution = makePendingExecution({ pendingData: { - displayName: 'Order', name: 'order', suggestedFields: ['status', 'amount'], selectedRecordId: [99], @@ -1102,7 +1087,7 @@ describe('LoadRelatedRecordStepExecutor', () => { 'run-1', expect.objectContaining({ executionResult: { skipped: true }, - pendingData: expect.objectContaining({ displayName: 'Order', name: 'order' }), + pendingData: expect.objectContaining({ name: 'order' }), }), ); }); @@ -1269,7 +1254,6 @@ describe('LoadRelatedRecordStepExecutor', () => { it('returns error outcome when saveStepExecution fails after load (Branch A confirmed)', async () => { const execution = makePendingExecution({ pendingData: { - displayName: 'Order', name: 'order', suggestedFields: ['status', 'amount'], selectedRecordId: [99], @@ -1489,7 +1473,7 @@ describe('LoadRelatedRecordStepExecutor', () => { type: 'load-related-record', stepIndex: 2, executionResult: { - relation: { name: 'order', displayName: 'Order' }, + relation: { name: 'order' }, record: relatedRecord, }, selectedRecordRef: makeRecordRef(), @@ -1518,7 +1502,6 @@ describe('LoadRelatedRecordStepExecutor', () => { 'run-1', expect.objectContaining({ pendingData: expect.objectContaining({ - displayName: 'Invoice', name: 'invoice', selectedRecordId: [55], }), @@ -1639,7 +1622,6 @@ describe('LoadRelatedRecordStepExecutor', () => { it('returns error outcome when saveStepExecution fails on user reject (Branch A)', async () => { const execution = makePendingExecution({ pendingData: { - displayName: 'Order', name: 'order', suggestedFields: ['status', 'amount'], selectedRecordId: [99], @@ -1722,7 +1704,6 @@ describe('LoadRelatedRecordStepExecutor', () => { stepIndex: 3, selectedRecordRef: makeRecordRef(), pendingData: { - displayName: 'Invoice', name: 'invoice', selectedRecordId: [55], }, @@ -1772,7 +1753,7 @@ describe('LoadRelatedRecordStepExecutor', () => { type: 'load-related-record', stepIndex: 2, executionResult: { - relation: { name: 'order', displayName: 'Order' }, + relation: { name: 'order' }, record: completedRecord, }, selectedRecordRef: makeRecordRef(), diff --git a/packages/workflow-executor/test/executors/read-record-step-executor.test.ts b/packages/workflow-executor/test/executors/read-record-step-executor.test.ts index d483ce2d37..fe70b6ef36 100644 --- a/packages/workflow-executor/test/executors/read-record-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/read-record-step-executor.test.ts @@ -158,9 +158,9 @@ describe('ReadRecordStepExecutor', () => { expect.objectContaining({ type: 'read-record', stepIndex: 0, - executionParams: { fields: [{ name: 'email', displayName: 'Email' }] }, + executionParams: { fields: [{ name: 'email' }] }, executionResult: { - fields: [{ value: 'john@example.com', name: 'email', displayName: 'Email' }], + fields: [{ value: 'john@example.com', name: 'email' }], }, }), ); @@ -181,15 +181,12 @@ describe('ReadRecordStepExecutor', () => { 'run-1', expect.objectContaining({ executionParams: { - fields: [ - { name: 'email', displayName: 'Email' }, - { name: 'name', displayName: 'Full Name' }, - ], + fields: [{ name: 'email' }, { name: 'name' }], }, executionResult: { fields: [ - { value: 'john@example.com', name: 'email', displayName: 'Email' }, - { value: 'John Doe', name: 'name', displayName: 'Full Name' }, + { value: 'john@example.com', name: 'email' }, + { value: 'John Doe', name: 'name' }, ], }, }), @@ -210,9 +207,9 @@ describe('ReadRecordStepExecutor', () => { expect(runStore.saveStepExecution).toHaveBeenCalledWith( 'run-1', expect.objectContaining({ - executionParams: { fields: [{ name: 'name', displayName: 'Full Name' }] }, + executionParams: { fields: [{ name: 'name' }] }, executionResult: { - fields: [{ value: 'John Doe', name: 'name', displayName: 'Full Name' }], + fields: [{ value: 'John Doe', name: 'name' }], }, }), ); @@ -283,11 +280,10 @@ describe('ReadRecordStepExecutor', () => { expect.objectContaining({ executionResult: { fields: [ - { value: 'john@example.com', name: 'email', displayName: 'Email' }, + { value: 'john@example.com', name: 'email' }, { error: 'Field not found: nonexistent', name: 'nonexistent', - displayName: 'nonexistent', }, ], }, @@ -397,7 +393,7 @@ describe('ReadRecordStepExecutor', () => { type: 'load-related-record', stepIndex: 2, executionResult: { - relation: { name: 'order', displayName: 'Order' }, + relation: { name: 'order' }, record: relatedRecord, }, selectedRecordRef: makeRecordRef(), @@ -434,7 +430,7 @@ describe('ReadRecordStepExecutor', () => { 'run-1', expect.objectContaining({ executionResult: { - fields: [{ value: 'john@example.com', name: 'email', displayName: 'Email' }], + fields: [{ value: 'john@example.com', name: 'email' }], }, selectedRecordRef: expect.objectContaining({ recordId: [42], @@ -487,7 +483,7 @@ describe('ReadRecordStepExecutor', () => { type: 'load-related-record', stepIndex: 2, executionResult: { - relation: { name: 'order', displayName: 'Order' }, + relation: { name: 'order' }, record: relatedRecord, }, selectedRecordRef: makeRecordRef(), @@ -511,7 +507,7 @@ describe('ReadRecordStepExecutor', () => { 'run-1', expect.objectContaining({ executionResult: { - fields: [{ value: 150, name: 'total', displayName: 'Total' }], + fields: [{ value: 150, name: 'total' }], }, selectedRecordRef: expect.objectContaining({ recordId: [99], @@ -564,7 +560,7 @@ describe('ReadRecordStepExecutor', () => { type: 'load-related-record', stepIndex: 5, executionResult: { - relation: { name: 'order', displayName: 'Order' }, + relation: { name: 'order' }, record: relatedRecord, }, selectedRecordRef: makeRecordRef(), @@ -620,7 +616,7 @@ describe('ReadRecordStepExecutor', () => { type: 'load-related-record', stepIndex: 1, executionResult: { - relation: { name: 'order', displayName: 'Order' }, + relation: { name: 'order' }, record: relatedRecord, }, selectedRecordRef: makeRecordRef(), @@ -867,15 +863,12 @@ describe('ReadRecordStepExecutor', () => { type: 'read-record', stepIndex: 3, executionParams: { - fields: [ - { name: 'email', displayName: 'Email' }, - { name: 'name', displayName: 'Full Name' }, - ], + fields: [{ name: 'email' }, { name: 'name' }], }, executionResult: { fields: [ - { value: 'john@example.com', name: 'email', displayName: 'Email' }, - { value: 'John Doe', name: 'name', displayName: 'Full Name' }, + { value: 'john@example.com', name: 'email' }, + { value: 'John Doe', name: 'name' }, ], }, selectedRecordRef: { @@ -908,7 +901,7 @@ describe('ReadRecordStepExecutor', () => { 'run-1', expect.objectContaining({ executionResult: { - fields: [{ value: 'john@example.com', name: 'email', displayName: 'Email' }], + fields: [{ value: 'john@example.com', name: 'email' }], }, }), ); diff --git a/packages/workflow-executor/test/executors/step-execution-formatters.test.ts b/packages/workflow-executor/test/executors/step-execution-formatters.test.ts index 9a79847e02..b2d7fcc54b 100644 --- a/packages/workflow-executor/test/executors/step-execution-formatters.test.ts +++ b/packages/workflow-executor/test/executors/step-execution-formatters.test.ts @@ -1,4 +1,4 @@ -import type { StepExecutionData } from '../../src/types/step-execution-data'; +import type { HydratedStepExecutionData } from '../../src/types/step-execution-data'; import StepExecutionFormatters from '../../src/executors/summary/step-execution-formatters'; @@ -6,7 +6,7 @@ describe('StepExecutionFormatters', () => { describe('format', () => { describe('load-related-record', () => { it('returns the full Loaded: line for a completed execution', () => { - const execution: StepExecutionData = { + const execution: HydratedStepExecutionData = { type: 'load-related-record', stepIndex: 1, selectedRecordRef: { collectionName: 'customers', recordId: [42], stepIndex: 0 }, @@ -22,7 +22,7 @@ describe('StepExecutionFormatters', () => { }); it('returns null for a skipped execution', () => { - const execution: StepExecutionData = { + const execution: HydratedStepExecutionData = { type: 'load-related-record', stepIndex: 1, selectedRecordRef: { collectionName: 'customers', recordId: [42], stepIndex: 0 }, @@ -33,7 +33,7 @@ describe('StepExecutionFormatters', () => { }); it('returns null when executionResult is absent (pending phase)', () => { - const execution: StepExecutionData = { + const execution: HydratedStepExecutionData = { type: 'load-related-record', stepIndex: 1, selectedRecordRef: { collectionName: 'customers', recordId: [42], stepIndex: 0 }, @@ -48,7 +48,7 @@ describe('StepExecutionFormatters', () => { }); it('formats composite record IDs joined by ", "', () => { - const execution: StepExecutionData = { + const execution: HydratedStepExecutionData = { type: 'load-related-record', stepIndex: 1, selectedRecordRef: { collectionName: 'customers', recordId: [42, 'abc'], stepIndex: 0 }, @@ -66,7 +66,7 @@ describe('StepExecutionFormatters', () => { describe('mcp', () => { it('returns the Result: line when formattedResponse is present', () => { - const execution: StepExecutionData = { + const execution: HydratedStepExecutionData = { type: 'mcp', stepIndex: 2, executionParams: { @@ -85,7 +85,7 @@ describe('StepExecutionFormatters', () => { }); it('returns a generic Executed: line when formattedResponse is absent', () => { - const execution: StepExecutionData = { + const execution: HydratedStepExecutionData = { type: 'mcp', stepIndex: 2, executionParams: { @@ -102,7 +102,7 @@ describe('StepExecutionFormatters', () => { }); it('returns null when executionResult is absent (pending phase)', () => { - const execution: StepExecutionData = { + const execution: HydratedStepExecutionData = { type: 'mcp', stepIndex: 2, pendingData: { name: 'search_records', sourceId: 'mcp-server-1', input: {} }, @@ -112,7 +112,7 @@ describe('StepExecutionFormatters', () => { }); it('returns null for a skipped execution', () => { - const execution: StepExecutionData = { + const execution: HydratedStepExecutionData = { type: 'mcp', stepIndex: 2, executionResult: { skipped: true }, @@ -124,7 +124,7 @@ describe('StepExecutionFormatters', () => { describe('types without a custom formatter', () => { it('returns null for condition type', () => { - const execution: StepExecutionData = { + const execution: HydratedStepExecutionData = { type: 'condition', stepIndex: 0, executionParams: { answer: 'Yes' }, @@ -135,7 +135,7 @@ describe('StepExecutionFormatters', () => { }); it('returns null for record type', () => { - const execution: StepExecutionData = { + const execution: HydratedStepExecutionData = { type: 'record', stepIndex: 0, executionResult: { success: true }, @@ -147,7 +147,7 @@ describe('StepExecutionFormatters', () => { describe('guidance', () => { it('returns the user input line when executionResult is present', () => { - const execution: StepExecutionData = { + const execution: HydratedStepExecutionData = { type: 'guidance', stepIndex: 0, executionResult: { userInput: 'I called the client and confirmed the delivery date.' }, @@ -159,7 +159,7 @@ describe('StepExecutionFormatters', () => { }); it('returns null when executionResult is absent', () => { - const execution: StepExecutionData = { + const execution: HydratedStepExecutionData = { type: 'guidance', stepIndex: 0, }; diff --git a/packages/workflow-executor/test/executors/step-summary-builder.test.ts b/packages/workflow-executor/test/executors/step-summary-builder.test.ts index 10dc86fea6..8abbd17162 100644 --- a/packages/workflow-executor/test/executors/step-summary-builder.test.ts +++ b/packages/workflow-executor/test/executors/step-summary-builder.test.ts @@ -1,4 +1,4 @@ -import type { StepExecutionData } from '../../src/types/step-execution-data'; +import type { HydratedStepExecutionData } from '../../src/types/step-execution-data'; import type { StepDefinition } from '../../src/types/validated/step-definition'; import type { StepOutcome } from '../../src/types/validated/step-outcome'; @@ -27,7 +27,7 @@ describe('StepSummaryBuilder', () => { it('renders header, prompt, Input, and Output for a condition step with execution data', () => { const step = makeConditionStep('Approve?'); const outcome = makeConditionOutcome('cond-1', 0); - const execution: StepExecutionData = { + const execution: HydratedStepExecutionData = { type: 'condition', stepIndex: 0, executionParams: { answer: 'Yes', reasoning: 'Order is valid' }, @@ -54,7 +54,7 @@ describe('StepSummaryBuilder', () => { stepIndex: 0, status: 'success', }; - const execution: StepExecutionData = { + const execution: HydratedStepExecutionData = { type: 'record', stepIndex: 0, executionResult: { success: true }, @@ -136,7 +136,7 @@ describe('StepSummaryBuilder', () => { stepIndex: 0, status: 'success', }; - const execution: StepExecutionData = { type: 'record', stepIndex: 0 }; + const execution: HydratedStepExecutionData = { type: 'record', stepIndex: 0 }; const result = StepSummaryBuilder.build(step, outcome, execution); @@ -158,7 +158,7 @@ describe('StepSummaryBuilder', () => { stepIndex: 0, status: 'awaiting-input', }; - const execution: StepExecutionData = { + const execution: HydratedStepExecutionData = { type: 'update-record', stepIndex: 0, pendingData: { displayName: 'Status', name: 'status', value: 'active' }, @@ -185,7 +185,7 @@ describe('StepSummaryBuilder', () => { stepIndex: 0, status: 'awaiting-input', }; - const execution: StepExecutionData = { + const execution: HydratedStepExecutionData = { type: 'trigger-action', stepIndex: 0, pendingData: { displayName: 'Archive Customer', name: 'archive' }, @@ -212,7 +212,7 @@ describe('StepSummaryBuilder', () => { stepIndex: 1, status: 'success', }; - const execution: StepExecutionData = { + const execution: HydratedStepExecutionData = { type: 'load-related-record', stepIndex: 1, selectedRecordRef: { collectionName: 'customers', recordId: [42], stepIndex: 0 }, @@ -245,7 +245,7 @@ describe('StepSummaryBuilder', () => { stepIndex: 1, status: 'success', }; - const execution: StepExecutionData = { + const execution: HydratedStepExecutionData = { type: 'load-related-record', stepIndex: 1, selectedRecordRef: { collectionName: 'customers', recordId: [42], stepIndex: 0 }, @@ -270,7 +270,7 @@ describe('StepSummaryBuilder', () => { stepIndex: 1, status: 'awaiting-input', }; - const execution: StepExecutionData = { + const execution: HydratedStepExecutionData = { type: 'load-related-record', stepIndex: 1, selectedRecordRef: { collectionName: 'customers', recordId: [42], stepIndex: 0 }, @@ -303,7 +303,7 @@ describe('StepSummaryBuilder', () => { stepIndex: 0, status: 'success', }; - const execution: StepExecutionData = { + const execution: HydratedStepExecutionData = { type: 'update-record', stepIndex: 0, pendingData: { displayName: 'Status', name: 'status', value: 'active' }, @@ -331,7 +331,7 @@ describe('StepSummaryBuilder', () => { stepIndex: 0, status: 'success', }; - const execution: StepExecutionData = { + const execution: HydratedStepExecutionData = { type: 'trigger-action', stepIndex: 0, pendingData: { displayName: 'Archive Customer', name: 'archive' }, @@ -357,7 +357,7 @@ describe('StepSummaryBuilder', () => { stepIndex: 0, status: 'success', }; - const execution: StepExecutionData = { + const execution: HydratedStepExecutionData = { type: 'update-record', stepIndex: 0, idempotencyPhase: 'done', @@ -383,7 +383,7 @@ describe('StepSummaryBuilder', () => { stepIndex: 0, status: 'awaiting-input', }; - const execution: StepExecutionData = { + const execution: HydratedStepExecutionData = { type: 'update-record', stepIndex: 0, pendingData: { displayName: 'Status', name: 'status', value: 'active' }, @@ -408,7 +408,7 @@ describe('StepSummaryBuilder', () => { stepIndex: 0, status: 'success', }; - const execution: StepExecutionData = { + const execution: HydratedStepExecutionData = { type: 'trigger-action', stepIndex: 0, pendingData: { displayName: 'Archive Customer', name: 'archive' }, @@ -433,7 +433,7 @@ describe('StepSummaryBuilder', () => { stepIndex: 0, status: 'success', }; - const execution: StepExecutionData = { + const execution: HydratedStepExecutionData = { type: 'update-record', stepIndex: 0, pendingData: { displayName: 'Status', name: 'status', value: 'active' }, @@ -458,7 +458,7 @@ describe('StepSummaryBuilder', () => { stepIndex: 1, status: 'success', }; - const execution: StepExecutionData = { + const execution: HydratedStepExecutionData = { type: 'load-related-record', stepIndex: 1, selectedRecordRef: { collectionName: 'customers', recordId: [42], stepIndex: 0 }, @@ -482,7 +482,7 @@ describe('StepSummaryBuilder', () => { options: ['A', 'B'], }; const outcome = makeConditionOutcome('cond-1', 0); - const execution: StepExecutionData = { + const execution: HydratedStepExecutionData = { type: 'condition', stepIndex: 0, executionParams: { answer: 'A', reasoning: 'Only option' }, diff --git a/packages/workflow-executor/test/executors/trigger-record-action-step-executor.test.ts b/packages/workflow-executor/test/executors/trigger-record-action-step-executor.test.ts index 59648d2751..5b7d3aba2e 100644 --- a/packages/workflow-executor/test/executors/trigger-record-action-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/trigger-record-action-step-executor.test.ts @@ -175,7 +175,6 @@ describe('TriggerRecordActionStepExecutor', () => { type: 'trigger-action', stepIndex: 0, executionParams: { - displayName: 'Send Welcome Email', name: 'send-welcome-email', }, executionResult: { success: true, actionResult: { message: 'Email sent' } }, @@ -210,7 +209,6 @@ describe('TriggerRecordActionStepExecutor', () => { type: 'trigger-action', stepIndex: 0, pendingData: { - displayName: 'Send Welcome Email', name: 'send-welcome-email', }, selectedRecordRef: expect.objectContaining({ @@ -249,7 +247,6 @@ describe('TriggerRecordActionStepExecutor', () => { type: 'trigger-action', stepIndex: 0, pendingData: { - displayName: 'Send Welcome Email', name: 'send-welcome-email', }, userConfirmation: { @@ -274,7 +271,6 @@ describe('TriggerRecordActionStepExecutor', () => { expect.objectContaining({ type: 'trigger-action', executionParams: { - displayName: 'Send Welcome Email', name: 'send-welcome-email', }, executionResult: { @@ -282,7 +278,6 @@ describe('TriggerRecordActionStepExecutor', () => { actionResult: { success: 'ok', html: '

Email queued

' }, }, pendingData: { - displayName: 'Send Welcome Email', name: 'send-welcome-email', }, }), @@ -295,7 +290,6 @@ describe('TriggerRecordActionStepExecutor', () => { type: 'trigger-action', stepIndex: 0, pendingData: { - displayName: 'Send Welcome Email', name: 'send-welcome-email', }, userConfirmation: { @@ -328,7 +322,6 @@ describe('TriggerRecordActionStepExecutor', () => { type: 'trigger-action', stepIndex: 0, pendingData: { - displayName: 'Send Welcome Email', name: 'send-welcome-email', }, userConfirmation: { userConfirmed: true }, @@ -358,7 +351,6 @@ describe('TriggerRecordActionStepExecutor', () => { type: 'trigger-action', stepIndex: 0, pendingData: { - displayName: 'Send Welcome Email', name: 'send-welcome-email', }, userConfirmation: { userConfirmed: false }, @@ -379,7 +371,6 @@ describe('TriggerRecordActionStepExecutor', () => { expect.objectContaining({ executionResult: { skipped: true }, pendingData: { - displayName: 'Send Welcome Email', name: 'send-welcome-email', }, }), @@ -408,7 +399,7 @@ describe('TriggerRecordActionStepExecutor', () => { { type: 'trigger-action', stepIndex: 5, - pendingData: { displayName: 'Send Welcome Email' }, + pendingData: { name: 'send-welcome-email' }, selectedRecordRef: makeRecordRef(), }, ]), @@ -526,7 +517,7 @@ describe('TriggerRecordActionStepExecutor', () => { 'run-1', expect.objectContaining({ type: 'trigger-action', - pendingData: { displayName: 'Send Welcome Email', name: 'send-welcome-email' }, + pendingData: { name: 'send-welcome-email' }, }), ); }); @@ -758,7 +749,7 @@ describe('TriggerRecordActionStepExecutor', () => { type: 'load-related-record', stepIndex: 2, executionResult: { - relation: { name: 'order', displayName: 'Order' }, + relation: { name: 'order' }, record: relatedRecord, }, selectedRecordRef: makeRecordRef(), @@ -787,7 +778,7 @@ describe('TriggerRecordActionStepExecutor', () => { expect(runStore.saveStepExecution).toHaveBeenCalledWith( 'run-1', expect.objectContaining({ - pendingData: { displayName: 'Cancel Order', name: 'cancel-order' }, + pendingData: { name: 'cancel-order' }, selectedRecordRef: expect.objectContaining({ recordId: [99], collectionName: 'orders', @@ -898,7 +889,6 @@ describe('TriggerRecordActionStepExecutor', () => { type: 'trigger-action', stepIndex: 0, pendingData: { - displayName: 'Send Welcome Email', name: 'send-welcome-email', }, userConfirmation: { userConfirmed: false }, @@ -949,7 +939,6 @@ describe('TriggerRecordActionStepExecutor', () => { type: 'trigger-action', stepIndex: 0, pendingData: { - displayName: 'Send Welcome Email', name: 'send-welcome-email', }, userConfirmation: { @@ -1113,7 +1102,7 @@ describe('TriggerRecordActionStepExecutor', () => { const doneExecution: TriggerRecordActionStepExecutionData = { type: 'trigger-action', stepIndex: 0, - executionParams: { displayName: 'Send Welcome Email', name: 'send-welcome-email' }, + executionParams: { name: 'send-welcome-email' }, executionResult: { success: true, actionResult: undefined }, selectedRecordRef: makeRecordRef(), idempotencyPhase: 'done', diff --git a/packages/workflow-executor/test/executors/update-record-step-executor.test.ts b/packages/workflow-executor/test/executors/update-record-step-executor.test.ts index 48becbae9f..19bb2f8486 100644 --- a/packages/workflow-executor/test/executors/update-record-step-executor.test.ts +++ b/packages/workflow-executor/test/executors/update-record-step-executor.test.ts @@ -170,7 +170,7 @@ describe('UpdateRecordStepExecutor', () => { expect.objectContaining({ type: 'update-record', stepIndex: 0, - executionParams: { displayName: 'Status', name: 'status', value: 'active' }, + executionParams: { name: 'status', value: 'active' }, executionResult: { updatedValues }, selectedRecordRef: expect.objectContaining({ collectionName: 'customers', @@ -201,7 +201,7 @@ describe('UpdateRecordStepExecutor', () => { expect.objectContaining({ type: 'update-record', stepIndex: 0, - pendingData: { displayName: 'Status', name: 'status', value: 'active' }, + pendingData: { name: 'status', value: 'active' }, selectedRecordRef: expect.objectContaining({ collectionName: 'customers', recordId: [42], @@ -219,7 +219,6 @@ describe('UpdateRecordStepExecutor', () => { type: 'update-record', stepIndex: 0, pendingData: { - displayName: 'Status', name: 'status', value: 'active', }, @@ -243,10 +242,9 @@ describe('UpdateRecordStepExecutor', () => { 'run-1', expect.objectContaining({ type: 'update-record', - executionParams: { displayName: 'Status', name: 'status', value: 'active' }, + executionParams: { name: 'status', value: 'active' }, executionResult: { updatedValues }, pendingData: { - displayName: 'Status', name: 'status', value: 'active', }, @@ -262,7 +260,6 @@ describe('UpdateRecordStepExecutor', () => { type: 'update-record', stepIndex: 0, pendingData: { - displayName: 'Status', name: 'status', value: 'inactive', }, @@ -296,11 +293,10 @@ describe('UpdateRecordStepExecutor', () => { expect.objectContaining({ type: 'update-record', pendingData: expect.objectContaining({ - displayName: 'Status', name: 'status', value: 'inactive', // AI suggestion preserved }), - executionParams: { displayName: 'Status', name: 'status', value: 'active' }, + executionParams: { name: 'status', value: 'active' }, executionResult: { updatedValues }, }), ); @@ -312,7 +308,7 @@ describe('UpdateRecordStepExecutor', () => { const execution: UpdateRecordStepExecutionData = { type: 'update-record', stepIndex: 0, - pendingData: { displayName: 'Status', name: 'status', value: 'active' }, + pendingData: { name: 'status', value: 'active' }, selectedRecordRef: makeRecordRef(), }; const updatedValues = { status: 'active' }; @@ -336,7 +332,7 @@ describe('UpdateRecordStepExecutor', () => { const finalSave = (runStore.saveStepExecution as jest.Mock).mock.calls.at(-1)?.[1]; expect(finalSave).toEqual( expect.objectContaining({ - executionParams: { displayName: 'Status', name: 'status', value: 'active' }, + executionParams: { name: 'status', value: 'active' }, userConfirmation: { userConfirmed: true }, }), ); @@ -348,7 +344,7 @@ describe('UpdateRecordStepExecutor', () => { const execution: UpdateRecordStepExecutionData = { type: 'update-record', stepIndex: 0, - pendingData: { displayName: 'Status', name: 'status', value: 'inactive' }, + pendingData: { name: 'status', value: 'inactive' }, selectedRecordRef: makeRecordRef(), }; const agentPort = makeMockAgentPort(); @@ -385,7 +381,6 @@ describe('UpdateRecordStepExecutor', () => { type: 'update-record', stepIndex: 0, pendingData: { - displayName: 'Status', name: 'status', value: 'active', }, @@ -407,7 +402,6 @@ describe('UpdateRecordStepExecutor', () => { expect.objectContaining({ executionResult: { skipped: true }, pendingData: { - displayName: 'Status', name: 'status', value: 'active', }, @@ -437,7 +431,7 @@ describe('UpdateRecordStepExecutor', () => { { type: 'update-record', stepIndex: 5, - pendingData: { displayName: 'Status', name: 'status', value: 'active' }, + pendingData: { name: 'status', value: 'active' }, selectedRecordRef: makeRecordRef(), }, ]), @@ -535,7 +529,7 @@ describe('UpdateRecordStepExecutor', () => { type: 'load-related-record', stepIndex: 2, executionResult: { - relation: { name: 'order', displayName: 'Order' }, + relation: { name: 'order' }, record: relatedRecord, }, selectedRecordRef: makeRecordRef(), @@ -564,7 +558,6 @@ describe('UpdateRecordStepExecutor', () => { 'run-1', expect.objectContaining({ pendingData: { - displayName: 'Order Status', name: 'status', value: 'shipped', }, @@ -861,7 +854,6 @@ describe('UpdateRecordStepExecutor', () => { type: 'update-record', stepIndex: 0, pendingData: { - displayName: 'Status', name: 'status', value: 'active', }, @@ -911,7 +903,6 @@ describe('UpdateRecordStepExecutor', () => { type: 'update-record', stepIndex: 0, pendingData: { - displayName: 'Status', name: 'status', value: 'active', }, @@ -1033,7 +1024,6 @@ describe('UpdateRecordStepExecutor', () => { type: 'update-record', stepIndex: 0, pendingData: { - displayName: 'Status', name: 'status', value: 'active', }, @@ -1546,7 +1536,7 @@ describe('UpdateRecordStepExecutor', () => { { type: 'update-record', stepIndex: 0, - pendingData: { displayName: 'Status', name: 'status', value: 'active' }, + pendingData: { name: 'status', value: 'active' }, selectedRecordRef: makeRecordRef(), }, ]), @@ -1575,7 +1565,7 @@ describe('UpdateRecordStepExecutor', () => { const doneExecution: UpdateRecordStepExecutionData = { type: 'update-record', stepIndex: 0, - executionParams: { displayName: 'Status', name: 'status', value: 'active' }, + executionParams: { name: 'status', value: 'active' }, executionResult: { updatedValues: { status: 'active' } }, selectedRecordRef: makeRecordRef(), idempotencyPhase: 'done', @@ -1669,7 +1659,7 @@ describe('UpdateRecordStepExecutor', () => { const execution: UpdateRecordStepExecutionData = { type: 'update-record', stepIndex: 0, - pendingData: { displayName: field.displayName, name: field.fieldName, value: pendingValue }, + pendingData: { name: field.fieldName, value: pendingValue }, userConfirmation, selectedRecordRef: makeRecordRef(), }; diff --git a/packages/workflow-executor/test/http/executor-http-server.test.ts b/packages/workflow-executor/test/http/executor-http-server.test.ts index c15abf8780..a5ac5e750d 100644 --- a/packages/workflow-executor/test/http/executor-http-server.test.ts +++ b/packages/workflow-executor/test/http/executor-http-server.test.ts @@ -105,7 +105,7 @@ describe('ExecutorHttpServer', () => { it('should accept valid token in Authorization header', async () => { const server = createServer(); - const token = signToken({ id: 1 }); + const token = signToken({ id: 1, renderingId: 7 }); const response = await request(server.callback) .get('/runs/run-1') @@ -116,7 +116,7 @@ describe('ExecutorHttpServer', () => { it('should accept valid token in forest_session_token cookie', async () => { const server = createServer(); - const token = signToken({ id: 1 }); + const token = signToken({ id: 1, renderingId: 7 }); const response = await request(server.callback) .get('/runs/run-1') @@ -217,7 +217,7 @@ describe('ExecutorHttpServer', () => { it('calls hasRunAccess with the correct runId and decoded user', async () => { const workflowPort = createMockWorkflowPort(); const server = createServer({ workflowPort }); - const token = signToken({ id: 1 }); + const token = signToken({ id: 1, renderingId: 7 }); const response = await request(server.callback) .get('/runs/run-42') @@ -233,7 +233,7 @@ describe('ExecutorHttpServer', () => { it('calls hasRunAccess with decoded user from cookie token', async () => { const workflowPort = createMockWorkflowPort(); const server = createServer({ workflowPort }); - const token = signToken({ id: 1 }); + const token = signToken({ id: 1, renderingId: 7 }); const response = await request(server.callback) .get('/runs/run-cookie') @@ -289,7 +289,7 @@ describe('ExecutorHttpServer', () => { }); const server = createServer({ runner }); - const token = signToken({ id: 1 }); + const token = signToken({ id: 1, renderingId: 7 }); const response = await request(server.callback) .get('/runs/run-1') @@ -297,7 +297,8 @@ describe('ExecutorHttpServer', () => { expect(response.status).toBe(200); expect(response.body).toEqual({ steps }); - expect(runner.getRunStepExecutions).toHaveBeenCalledWith('run-1'); + // Schema hydration is scoped to the requesting user's rendering. + expect(runner.getRunStepExecutions).toHaveBeenCalledWith('run-1', 7); }); it('should return 500 when getRunStepExecutions rejects', async () => { @@ -306,7 +307,7 @@ describe('ExecutorHttpServer', () => { }); const server = createServer({ runner }); - const token = signToken({ id: 1 }); + const token = signToken({ id: 1, renderingId: 7 }); const response = await request(server.callback) .get('/runs/run-1') @@ -315,6 +316,20 @@ describe('ExecutorHttpServer', () => { expect(response.status).toBe(500); expect(response.body).toEqual({ error: 'Internal server error' }); }); + + it('should return 400 when the token has no renderingId', async () => { + const runner = createMockRunner(); + const server = createServer({ runner }); + const token = signToken({ id: 1 }); // no renderingId + + const response = await request(server.callback) + .get('/runs/run-1') + .set('Authorization', `Bearer ${token}`); + + expect(response.status).toBe(400); + expect(response.body).toEqual({ error: 'Missing or invalid renderingId in token' }); + expect(runner.getRunStepExecutions).not.toHaveBeenCalled(); + }); }); describe('POST /runs/:runId/trigger', () => { diff --git a/packages/workflow-executor/test/hydrate-step-execution-data.test.ts b/packages/workflow-executor/test/hydrate-step-execution-data.test.ts new file mode 100644 index 0000000000..7b6e1ec9f3 --- /dev/null +++ b/packages/workflow-executor/test/hydrate-step-execution-data.test.ts @@ -0,0 +1,495 @@ +import type { SchemaGetter } from '../src/hydrate-step-execution-data'; +import type { Logger } from '../src/ports/logger-port'; +import type { WorkflowPort } from '../src/ports/workflow-port'; +import type { StepExecutionData } from '../src/types/step-execution-data'; +import type { CollectionSchema, RecordRef } from '../src/types/validated/collection'; + +import hydrateStepExecutionData, { makeSchemaGetter } from '../src/hydrate-step-execution-data'; +import SchemaCache from '../src/schema-cache'; + +function makeLogger(): jest.Mocked { + return { error: jest.fn(), warn: jest.fn(), info: jest.fn() }; +} + +function makeSchema(overrides: Partial = {}): CollectionSchema { + return { + collectionName: 'customers', + collectionDisplayName: 'Customers', + primaryKeyFields: ['id'], + fields: [ + { fieldName: 'status', displayName: 'Status', isRelationship: false }, + { fieldName: 'email', displayName: 'Email', isRelationship: false }, + { + fieldName: 'orders', + displayName: 'Orders', + isRelationship: true, + relationType: 'HasMany', + relatedCollectionName: 'orders', + }, + ], + actions: [{ name: 'send-email', displayName: 'Send Email', endpoint: '/forest/send' }], + ...overrides, + }; +} + +const recordRef: RecordRef = { collectionName: 'customers', recordId: [42], stepIndex: 0 }; + +// A getter that always returns the same schema, recording which collections were requested. +function makeGetter(schema: CollectionSchema = makeSchema()): SchemaGetter & jest.Mock { + return jest.fn().mockResolvedValue(schema) as SchemaGetter & jest.Mock; +} + +describe('hydrateStepExecutionData', () => { + it('re-derives the field displayName for update-record params and pending data', async () => { + const execution: StepExecutionData = { + type: 'update-record', + stepIndex: 1, + selectedRecordRef: recordRef, + executionParams: { name: 'status', value: 'active' }, + pendingData: { name: 'status', value: 'active' }, + }; + + const result = await hydrateStepExecutionData(execution, makeGetter()); + + expect(result).toMatchObject({ + type: 'update-record', + executionParams: { name: 'status', displayName: 'Status', value: 'active' }, + pendingData: { name: 'status', displayName: 'Status', value: 'active' }, + }); + }); + + it('re-derives the action displayName for trigger-action', async () => { + const execution: StepExecutionData = { + type: 'trigger-action', + stepIndex: 2, + selectedRecordRef: recordRef, + executionParams: { name: 'send-email' }, + pendingData: { name: 'send-email' }, + }; + + const result = await hydrateStepExecutionData(execution, makeGetter()); + + expect(result).toMatchObject({ + executionParams: { name: 'send-email', displayName: 'Send Email' }, + pendingData: { name: 'send-email', displayName: 'Send Email' }, + }); + }); + + it('re-derives field displayNames for every read-record field (success and error)', async () => { + const execution: StepExecutionData = { + type: 'read-record', + stepIndex: 0, + selectedRecordRef: recordRef, + executionParams: { fields: [{ name: 'email' }, { name: 'ghost' }] }, + executionResult: { + fields: [ + { name: 'email', value: 'a@b.c' }, + { name: 'ghost', error: 'Field not found: ghost' }, + ], + }, + }; + + const result = await hydrateStepExecutionData(execution, makeGetter()); + + expect(result).toMatchObject({ + executionParams: { + fields: [ + { name: 'email', displayName: 'Email' }, + // Unknown field falls back to its technical name. + { name: 'ghost', displayName: 'ghost' }, + ], + }, + executionResult: { + fields: [ + { name: 'email', displayName: 'Email', value: 'a@b.c' }, + { name: 'ghost', displayName: 'ghost', error: 'Field not found: ghost' }, + ], + }, + }); + }); + + it('re-derives the relation displayName for load-related-record params, pending and result', async () => { + const execution: StepExecutionData = { + type: 'load-related-record', + stepIndex: 3, + selectedRecordRef: recordRef, + pendingData: { name: 'orders', selectedRecordId: [7] }, + executionParams: { name: 'orders' }, + executionResult: { + relation: { name: 'orders' }, + record: { collectionName: 'orders', recordId: [7], stepIndex: 3 }, + }, + }; + + const result = await hydrateStepExecutionData(execution, makeGetter()); + + expect(result).toMatchObject({ + pendingData: { name: 'orders', displayName: 'Orders', selectedRecordId: [7] }, + executionParams: { name: 'orders', displayName: 'Orders' }, + executionResult: { + relation: { name: 'orders', displayName: 'Orders' }, + record: { collectionName: 'orders', recordId: [7] }, + }, + }); + }); + + it('reflects a CHANGED schema rather than any previously persisted label', async () => { + const execution: StepExecutionData = { + type: 'update-record', + stepIndex: 1, + selectedRecordRef: recordRef, + // Simulate an old row that still carries a (now stale) displayName alongside the name. + executionParams: { name: 'status', value: 'active', displayName: 'OLD LABEL' } as never, + }; + const renamed = makeSchema({ + fields: [{ fieldName: 'status', displayName: 'Lifecycle Stage', isRelationship: false }], + }); + + const result = await hydrateStepExecutionData(execution, makeGetter(renamed)); + + // The stale 'OLD LABEL' is ignored; the label is rebuilt from the (renamed) schema. + expect(result).toMatchObject({ + type: 'update-record', + executionParams: { name: 'status', displayName: 'Lifecycle Stage' }, + }); + }); + + it('falls back to technical names when the schema cannot be fetched', async () => { + const execution: StepExecutionData = { + type: 'trigger-action', + stepIndex: 2, + selectedRecordRef: recordRef, + executionParams: { name: 'send-email' }, + }; + const failingGetter: SchemaGetter = jest.fn().mockRejectedValue(new Error('boom')); + + const result = await hydrateStepExecutionData(execution, failingGetter); + + expect(result).toMatchObject({ + executionParams: { name: 'send-email', displayName: 'send-email' }, + }); + }); + + it('returns condition / mcp / guidance / record executions unchanged without fetching a schema', async () => { + const getter = makeGetter(); + const condition: StepExecutionData = { + type: 'condition', + stepIndex: 0, + executionParams: { answer: 'yes' }, + }; + + const result = await hydrateStepExecutionData(condition, getter); + + expect(result).toEqual(condition); + expect(getter).not.toHaveBeenCalled(); + }); + + it('leaves a skipped load-related result untouched', async () => { + const execution: StepExecutionData = { + type: 'load-related-record', + stepIndex: 3, + selectedRecordRef: recordRef, + executionResult: { skipped: true }, + }; + + const result = await hydrateStepExecutionData(execution, makeGetter()); + + expect(result).toMatchObject({ executionResult: { skipped: true } }); + }); + + // --- optional-field phases: only the present fields get a displayName, absent ones stay absent + + it('hydrates only pendingData when update-record is awaiting input (no executionParams)', async () => { + const execution: StepExecutionData = { + type: 'update-record', + stepIndex: 1, + selectedRecordRef: recordRef, + pendingData: { name: 'status', value: 'active' }, + }; + + const result = await hydrateStepExecutionData(execution, makeGetter()); + + expect(result).toMatchObject({ + pendingData: { name: 'status', displayName: 'Status', value: 'active' }, + }); + expect((result as { executionParams?: unknown }).executionParams).toBeUndefined(); + }); + + it('hydrates only executionParams when update-record is done (no pendingData)', async () => { + const execution: StepExecutionData = { + type: 'update-record', + stepIndex: 1, + selectedRecordRef: recordRef, + executionParams: { name: 'status', value: 'active' }, + executionResult: { updatedValues: { status: 'active' } }, + }; + + const result = await hydrateStepExecutionData(execution, makeGetter()); + + expect(result).toMatchObject({ + executionParams: { name: 'status', displayName: 'Status', value: 'active' }, + executionResult: { updatedValues: { status: 'active' } }, + }); + expect((result as { pendingData?: unknown }).pendingData).toBeUndefined(); + }); + + it('leaves both fields undefined for the update-record executing phase', async () => { + const execution: StepExecutionData = { + type: 'update-record', + stepIndex: 1, + selectedRecordRef: recordRef, + idempotencyPhase: 'executing', + }; + + const result = await hydrateStepExecutionData(execution, makeGetter()); + + expect(result).toMatchObject({ type: 'update-record', idempotencyPhase: 'executing' }); + expect((result as { executionParams?: unknown }).executionParams).toBeUndefined(); + expect((result as { pendingData?: unknown }).pendingData).toBeUndefined(); + }); + + it('leaves both fields undefined for the trigger-action executing phase', async () => { + const execution: StepExecutionData = { + type: 'trigger-action', + stepIndex: 2, + selectedRecordRef: recordRef, + idempotencyPhase: 'executing', + }; + + const result = await hydrateStepExecutionData(execution, makeGetter()); + + expect((result as { executionParams?: unknown }).executionParams).toBeUndefined(); + expect((result as { pendingData?: unknown }).pendingData).toBeUndefined(); + }); + + it('hydrates load-related automatic execution (executionParams + result, no pendingData)', async () => { + const execution: StepExecutionData = { + type: 'load-related-record', + stepIndex: 3, + selectedRecordRef: recordRef, + executionParams: { name: 'orders' }, + executionResult: { + relation: { name: 'orders' }, + record: { collectionName: 'orders', recordId: [7], stepIndex: 3 }, + }, + }; + + const result = await hydrateStepExecutionData(execution, makeGetter()); + + expect(result).toMatchObject({ + executionParams: { name: 'orders', displayName: 'Orders' }, + executionResult: { relation: { name: 'orders', displayName: 'Orders' } }, + }); + expect((result as { pendingData?: unknown }).pendingData).toBeUndefined(); + }); + + it('hydrates load-related pendingData while it is still awaiting input (no executionResult)', async () => { + const execution: StepExecutionData = { + type: 'load-related-record', + stepIndex: 3, + selectedRecordRef: recordRef, + pendingData: { name: 'orders', selectedRecordId: [7] }, + }; + + const result = await hydrateStepExecutionData(execution, makeGetter()); + + expect(result).toMatchObject({ + pendingData: { name: 'orders', displayName: 'Orders', selectedRecordId: [7] }, + }); + // hydrateRelationResult passes an absent executionResult straight through. + expect((result as { executionResult?: unknown }).executionResult).toBeUndefined(); + expect((result as { executionParams?: unknown }).executionParams).toBeUndefined(); + }); + + it('resolves the action displayName from schema.actions, not schema.fields', async () => { + const execution: StepExecutionData = { + type: 'trigger-action', + stepIndex: 2, + selectedRecordRef: recordRef, + // A field shares the name of no action; ensure the action lookup is used (and a matching + // field name would NOT leak its label into an action ref). + executionParams: { name: 'send-email' }, + }; + + const result = await hydrateStepExecutionData(execution, makeGetter()); + + expect(result).toMatchObject({ + executionParams: { name: 'send-email', displayName: 'Send Email' }, + }); + }); + + it('returns an unrecognized step type unchanged after fetching the schema', async () => { + const getter = makeGetter(); + const future = { + type: 'future-step', + stepIndex: 9, + selectedRecordRef: recordRef, + } as unknown as StepExecutionData; + + const result = await hydrateStepExecutionData(future, getter); + + expect(result).toBe(future); + expect(getter).toHaveBeenCalledWith('customers'); + }); + + // --- resilience & observability + + it('warns (but does not throw) when the schema fetch fails', async () => { + const logger = makeLogger(); + const failingGetter: SchemaGetter = jest.fn().mockRejectedValue(new Error('endpoint down')); + const execution: StepExecutionData = { + type: 'trigger-action', + stepIndex: 2, + selectedRecordRef: recordRef, + executionParams: { name: 'send-email' }, + }; + + const result = await hydrateStepExecutionData(execution, failingGetter, logger); + + expect(result).toMatchObject({ + executionParams: { name: 'send-email', displayName: 'send-email' }, + }); + expect(logger.warn).toHaveBeenCalledWith( + expect.stringContaining('Failed to fetch collection schema'), + expect.objectContaining({ collection: 'customers', stepIndex: 2 }), + ); + }); + + it('returns the raw execution and logs an error when a malformed row cannot be hydrated', async () => { + const logger = makeLogger(); + // A corrupted/legacy read-record row missing executionParams would throw inside hydration. + const malformed = { + type: 'read-record', + stepIndex: 4, + selectedRecordRef: recordRef, + } as unknown as StepExecutionData; + + const result = await hydrateStepExecutionData(malformed, makeGetter(), logger); + + expect(result).toBe(malformed); + expect(logger.error).toHaveBeenCalledWith( + expect.stringContaining('Failed to hydrate'), + expect.objectContaining({ type: 'read-record', stepIndex: 4 }), + ); + }); +}); + +describe('makeSchemaGetter', () => { + it('serves the workflowPort schema on a miss and the cache on a hit', async () => { + const schema = makeSchema(); + const cache = new SchemaCache(); + const workflowPort = { + getCollectionSchema: jest.fn().mockResolvedValue(schema), + } as unknown as WorkflowPort; + + const getSchema = makeSchemaGetter(cache, workflowPort, 'run-1', 1); + + await expect(getSchema('customers')).resolves.toBe(schema); + await expect(getSchema('customers')).resolves.toBe(schema); + + // Second call is served from the cache — the port is hit exactly once. + expect(workflowPort.getCollectionSchema).toHaveBeenCalledTimes(1); + expect(workflowPort.getCollectionSchema).toHaveBeenCalledWith('customers', 'run-1'); + }); + + it('scopes the cache by rendering: two renderings of one collection do not share', async () => { + const cache = new SchemaCache(); + const getCollectionSchema = jest + .fn() + .mockResolvedValueOnce(makeSchema({ collectionDisplayName: 'Clients' })) + .mockResolvedValueOnce(makeSchema({ collectionDisplayName: 'Accounts' })); + const workflowPort = { getCollectionSchema } as unknown as WorkflowPort; + + const r1 = await makeSchemaGetter(cache, workflowPort, 'run-1', 1)('customers'); + const r2 = await makeSchemaGetter(cache, workflowPort, 'run-2', 2)('customers'); + + // Each rendering fetched and cached independently — no cross-rendering hit. + expect(getCollectionSchema).toHaveBeenCalledTimes(2); + expect(r1.collectionDisplayName).toBe('Clients'); + expect(r2.collectionDisplayName).toBe('Accounts'); + expect(cache.get(1, 'customers')?.collectionDisplayName).toBe('Clients'); + expect(cache.get(2, 'customers')?.collectionDisplayName).toBe('Accounts'); + }); + + it('de-duplicates concurrent fetches for the same collection (no stampede)', async () => { + const cache = new SchemaCache(); + let resolveFetch: (s: CollectionSchema) => void = () => undefined; + const deferred = new Promise(resolve => { + resolveFetch = resolve; + }); + const getCollectionSchema = jest.fn().mockReturnValue(deferred); + const getSchema = makeSchemaGetter( + cache, + { getCollectionSchema } as unknown as WorkflowPort, + 'run-1', + 1, + ); + + // Five concurrent misses while the first fetch is still in flight. + const all = Promise.all([ + getSchema('customers'), + getSchema('customers'), + getSchema('customers'), + getSchema('customers'), + getSchema('customers'), + ]); + resolveFetch(makeSchema()); + const results = await all; + + expect(getCollectionSchema).toHaveBeenCalledTimes(1); + results.forEach(r => expect(r).toMatchObject({ collectionName: 'customers' })); + }); + + it('de-duplicates across independent getters sharing one cache (global, no stampede)', async () => { + // Two separate getters (e.g. two concurrent reads / two runs of one rendering) over the SAME + // cache must still collapse to a single fetch while one is in flight. + const cache = new SchemaCache(); + let resolveFetch: (s: CollectionSchema) => void = () => undefined; + const deferred = new Promise(resolve => { + resolveFetch = resolve; + }); + const getCollectionSchema = jest.fn().mockReturnValue(deferred); + const workflowPort = { getCollectionSchema } as unknown as WorkflowPort; + + const getterA = makeSchemaGetter(cache, workflowPort, 'run-1', 1); + const getterB = makeSchemaGetter(cache, workflowPort, 'run-2', 1); // same rendering, different getter + + const all = Promise.all([getterA('customers'), getterB('customers')]); + resolveFetch(makeSchema()); + await all; + + expect(getCollectionSchema).toHaveBeenCalledTimes(1); + }); + + it('clears the in-flight entry so a failed fetch can be retried', async () => { + const cache = new SchemaCache(); + const getCollectionSchema = jest + .fn() + .mockRejectedValueOnce(new Error('transient')) + .mockResolvedValueOnce(makeSchema()); + const getSchema = makeSchemaGetter( + cache, + { getCollectionSchema } as unknown as WorkflowPort, + 'run-1', + 1, + ); + + await expect(getSchema('customers')).rejects.toThrow('transient'); + await expect(getSchema('customers')).resolves.toMatchObject({ collectionName: 'customers' }); + expect(getCollectionSchema).toHaveBeenCalledTimes(2); + }); + + it('does not cache a failed fetch', async () => { + const cache = new SchemaCache(); + const getCollectionSchema = jest.fn().mockRejectedValue(new Error('down')); + const getSchema = makeSchemaGetter( + cache, + { getCollectionSchema } as unknown as WorkflowPort, + 'run-1', + 1, + ); + + await expect(getSchema('customers')).rejects.toThrow('down'); + expect(cache.get(1, 'customers')).toBeUndefined(); + }); +}); diff --git a/packages/workflow-executor/test/integration/workflow-execution.test.ts b/packages/workflow-executor/test/integration/workflow-execution.test.ts index 9e57a2dabc..d8da788def 100644 --- a/packages/workflow-executor/test/integration/workflow-execution.test.ts +++ b/packages/workflow-executor/test/integration/workflow-execution.test.ts @@ -293,7 +293,7 @@ describe('workflow execution (integration)', () => { type: 'read-record', stepIndex: 0, executionResult: { - fields: [{ value: 'john@example.com', name: 'email', displayName: 'Email' }], + fields: [{ value: 'john@example.com', name: 'email' }], }, }), ); @@ -546,7 +546,7 @@ describe('workflow execution (integration)', () => { expect.objectContaining({ type: 'load-related-record', executionResult: { - relation: { name: 'order', displayName: 'Order' }, + relation: { name: 'order' }, record: { collectionName: 'orders', recordId: [99], stepIndex: 0 }, }, }), @@ -696,7 +696,7 @@ describe('workflow execution (integration)', () => { const { server, runStore } = createIntegrationSetup({ workflowPort }); await runStore.init(); - const token = signToken({ id: STEP_USER.id }); + const token = signToken({ id: STEP_USER.id, renderingId: STEP_USER.renderingId }); // Trigger the step first await request(server.callback) diff --git a/packages/workflow-executor/test/runner.test.ts b/packages/workflow-executor/test/runner.test.ts index c86f1fd5cb..b359a1db64 100644 --- a/packages/workflow-executor/test/runner.test.ts +++ b/packages/workflow-executor/test/runner.test.ts @@ -2048,11 +2048,154 @@ describe('getRunStepExecutions', () => { }); runner = new Runner(createRunnerConfig({ runStore })); - const result = await runner.getRunStepExecutions('run-1'); + const result = await runner.getRunStepExecutions('run-1', 1); expect(result).toEqual(steps); expect(runStore.getStepExecutions).toHaveBeenCalledWith('run-1'); }); + + it('re-derives displayName from the current schema (not from persisted data)', async () => { + // Persisted execution carries ONLY the technical fieldName. + const persisted = [ + { + type: 'update-record' as const, + stepIndex: 0, + selectedRecordRef: { collectionName: 'customers', recordId: [1], stepIndex: 0 }, + executionParams: { name: 'status', value: 'active' }, + }, + ]; + const runStore = createMockRunStore({ + getStepExecutions: jest.fn().mockResolvedValue(persisted), + }); + const workflowPort = createMockWorkflowPort(); + workflowPort.getCollectionSchema.mockResolvedValue({ + collectionName: 'customers', + collectionDisplayName: 'Customers', + primaryKeyFields: ['id'], + fields: [{ fieldName: 'status', displayName: 'Lifecycle Stage', isRelationship: false }], + actions: [], + }); + runner = new Runner(createRunnerConfig({ runStore, workflowPort })); + + const result = await runner.getRunStepExecutions('run-1', 1); + + expect(result[0]).toMatchObject({ + type: 'update-record', + executionParams: { name: 'status', displayName: 'Lifecycle Stage', value: 'active' }, + }); + expect(workflowPort.getCollectionSchema).toHaveBeenCalledWith('customers', 'run-1'); + }); + + it('fetches the schema once for many steps sharing a collection (no stampede)', async () => { + const ref = { collectionName: 'customers', recordId: [1], stepIndex: 0 }; + const persisted = Array.from({ length: 6 }, (_, i) => ({ + type: 'update-record' as const, + stepIndex: i, + selectedRecordRef: { ...ref, stepIndex: i }, + executionParams: { name: 'status', value: 'active' }, + })); + const runStore = createMockRunStore({ + getStepExecutions: jest.fn().mockResolvedValue(persisted), + }); + const workflowPort = createMockWorkflowPort(); + workflowPort.getCollectionSchema.mockResolvedValue({ + collectionName: 'customers', + collectionDisplayName: 'Customers', + primaryKeyFields: ['id'], + fields: [{ fieldName: 'status', displayName: 'Status', isRelationship: false }], + actions: [], + }); + runner = new Runner(createRunnerConfig({ runStore, workflowPort })); + + const result = await runner.getRunStepExecutions('run-1', 1); + + expect(result).toHaveLength(6); + expect(workflowPort.getCollectionSchema).toHaveBeenCalledTimes(1); + }); + + it('does not fail the whole read when one persisted row is malformed', async () => { + const ref = { collectionName: 'customers', recordId: [1], stepIndex: 0 }; + const persisted = [ + { + type: 'update-record' as const, + stepIndex: 0, + selectedRecordRef: ref, + executionParams: { name: 'status', value: 'active' }, + }, + // Corrupted/legacy read-record row missing executionParams — would throw during hydration. + { type: 'read-record' as const, stepIndex: 1, selectedRecordRef: ref }, + ]; + const runStore = createMockRunStore({ + getStepExecutions: jest.fn().mockResolvedValue(persisted), + }); + const workflowPort = createMockWorkflowPort(); + workflowPort.getCollectionSchema.mockResolvedValue({ + collectionName: 'customers', + collectionDisplayName: 'Customers', + primaryKeyFields: ['id'], + fields: [{ fieldName: 'status', displayName: 'Status', isRelationship: false }], + actions: [], + }); + const logger = createMockLogger(); + runner = new Runner(createRunnerConfig({ runStore, workflowPort, logger })); + + const result = await runner.getRunStepExecutions('run-1', 1); + + // The good row is hydrated; the malformed row is returned raw rather than 500-ing the read. + expect(result).toHaveLength(2); + expect(result[0]).toMatchObject({ + executionParams: { name: 'status', displayName: 'Status', value: 'active' }, + }); + expect(result[1]).toMatchObject({ type: 'read-record', stepIndex: 1 }); + expect(logger.error).toHaveBeenCalledWith( + expect.stringContaining('Failed to hydrate'), + expect.objectContaining({ type: 'read-record', stepIndex: 1 }), + ); + }); + + it('does not contaminate labels across renderings sharing one executor cache', async () => { + const ref = { collectionName: 'customers', recordId: [1], stepIndex: 0 }; + const execution = { + type: 'update-record' as const, + stepIndex: 0, + selectedRecordRef: ref, + executionParams: { name: 'status', value: 'active' }, + }; + const runStore = createMockRunStore({ + getStepExecutions: jest.fn().mockResolvedValue([execution]), + }); + const workflowPort = createMockWorkflowPort(); + // The server resolves the schema from the run's rendering (via runId) — different label per run. + workflowPort.getCollectionSchema.mockImplementation((_collection: string, runId: string) => + Promise.resolve({ + collectionName: 'customers', + collectionDisplayName: 'Customers', + primaryKeyFields: ['id'], + fields: [ + { + fieldName: 'status', + displayName: runId === 'run-A' ? 'Lifecycle Stage' : 'État', + isRelationship: false, + }, + ], + actions: [], + }), + ); + // One Runner == one shared SchemaCache, exercised by two renderings of the same environment. + runner = new Runner(createRunnerConfig({ runStore, workflowPort })); + + const resultA = await runner.getRunStepExecutions('run-A', 1); + const resultB = await runner.getRunStepExecutions('run-B', 2); + + expect(resultA[0]).toMatchObject({ + executionParams: { name: 'status', displayName: 'Lifecycle Stage' }, + }); + expect(resultB[0]).toMatchObject({ + executionParams: { name: 'status', displayName: 'État' }, + }); + // Each rendering triggered its own fetch — rendering 2 did NOT read rendering 1's cached entry. + expect(workflowPort.getCollectionSchema).toHaveBeenCalledTimes(2); + }); }); // --------------------------------------------------------------------------- diff --git a/packages/workflow-executor/test/schema-cache.test.ts b/packages/workflow-executor/test/schema-cache.test.ts index 216721640d..1ee803346c 100644 --- a/packages/workflow-executor/test/schema-cache.test.ts +++ b/packages/workflow-executor/test/schema-cache.test.ts @@ -2,10 +2,13 @@ import type { CollectionSchema } from '../src/types/validated/collection'; import SchemaCache from '../src/schema-cache'; -function makeSchema(collectionName: string): CollectionSchema { +const R1 = 1; +const R2 = 2; + +function makeSchema(collectionName: string, displayName = collectionName): CollectionSchema { return { collectionName, - collectionDisplayName: collectionName, + collectionDisplayName: displayName, primaryKeyFields: ['id'], fields: [], actions: [], @@ -17,16 +20,16 @@ describe('SchemaCache', () => { it('returns undefined for unknown keys', () => { const cache = new SchemaCache(); - expect(cache.get('unknown')).toBeUndefined(); + expect(cache.get(R1, 'unknown')).toBeUndefined(); }); it('returns the schema after set', () => { const cache = new SchemaCache(); const schema = makeSchema('customers'); - cache.set('customers', schema); + cache.set(R1, 'customers', schema); - expect(cache.get('customers')).toBe(schema); + expect(cache.get(R1, 'customers')).toBe(schema); }); it('overwrites existing entry on set', () => { @@ -34,10 +37,33 @@ describe('SchemaCache', () => { const old = makeSchema('customers'); const updated = { ...makeSchema('customers'), primaryKeyFields: ['uid'] }; - cache.set('customers', old); - cache.set('customers', updated); + cache.set(R1, 'customers', old); + cache.set(R1, 'customers', updated); - expect(cache.get('customers')).toBe(updated); + expect(cache.get(R1, 'customers')).toBe(updated); + }); + }); + + describe('rendering isolation', () => { + it('does not share entries for the same collection across renderings', () => { + const cache = new SchemaCache(); + const r1Schema = makeSchema('customers', 'Clients'); + const r2Schema = makeSchema('customers', 'Accounts'); + + cache.set(R1, 'customers', r1Schema); + cache.set(R2, 'customers', r2Schema); + + // Each rendering sees its own labels; no cross-contamination. + expect(cache.get(R1, 'customers')).toBe(r1Schema); + expect(cache.get(R2, 'customers')).toBe(r2Schema); + }); + + it('a miss in one rendering is unaffected by another rendering populating the same collection', () => { + const cache = new SchemaCache(); + + cache.set(R1, 'customers', makeSchema('customers')); + + expect(cache.get(R2, 'customers')).toBeUndefined(); }); }); @@ -47,85 +73,162 @@ describe('SchemaCache', () => { const cache = new SchemaCache(1000, () => time); const schema = makeSchema('customers'); - cache.set('customers', schema); + cache.set(R1, 'customers', schema); time = 999; - expect(cache.get('customers')).toBe(schema); + expect(cache.get(R1, 'customers')).toBe(schema); }); it('returns undefined after TTL expires', () => { let time = 0; const cache = new SchemaCache(1000, () => time); - cache.set('customers', makeSchema('customers')); + cache.set(R1, 'customers', makeSchema('customers')); time = 1001; - expect(cache.get('customers')).toBeUndefined(); + expect(cache.get(R1, 'customers')).toBeUndefined(); }); it('deletes the expired entry from the store on get', () => { let time = 0; const cache = new SchemaCache(1000, () => time); - cache.set('customers', makeSchema('customers')); + cache.set(R1, 'customers', makeSchema('customers')); time = 1001; - cache.get('customers'); // triggers delete + cache.get(R1, 'customers'); // triggers delete time = 0; // rewind — entry should still be gone - expect(cache.get('customers')).toBeUndefined(); + expect(cache.get(R1, 'customers')).toBeUndefined(); }); it('refreshes TTL when entry is re-set', () => { let time = 0; const cache = new SchemaCache(1000, () => time); - cache.set('customers', makeSchema('customers')); + cache.set(R1, 'customers', makeSchema('customers')); time = 800; - cache.set('customers', makeSchema('customers')); // re-set refreshes + cache.set(R1, 'customers', makeSchema('customers')); // re-set refreshes time = 1500; // 700ms since re-set, within TTL - expect(cache.get('customers')).toBeDefined(); + expect(cache.get(R1, 'customers')).toBeDefined(); }); }); - describe('iterator', () => { - it('yields all non-expired entries', () => { + describe('entriesForRendering', () => { + it('yields all non-expired entries for the given rendering only', () => { const cache = new SchemaCache(); - cache.set('customers', makeSchema('customers')); - cache.set('orders', makeSchema('orders')); + cache.set(R1, 'customers', makeSchema('customers')); + cache.set(R1, 'orders', makeSchema('orders')); + cache.set(R2, 'customers', makeSchema('customers')); // other rendering — excluded - const entries = [...cache]; + const entries = [...cache.entriesForRendering(R1)]; - expect(entries).toHaveLength(2); - expect(entries[0][0]).toBe('customers'); - expect(entries[1][0]).toBe('orders'); + expect(entries.map(([name]) => name)).toEqual(['customers', 'orders']); }); it('skips and deletes expired entries', () => { let time = 0; const cache = new SchemaCache(1000, () => time); - cache.set('fresh', makeSchema('fresh')); + cache.set(R1, 'fresh', makeSchema('fresh')); time = 500; - cache.set('also-fresh', makeSchema('also-fresh')); + cache.set(R1, 'also-fresh', makeSchema('also-fresh')); time = 1100; // 'fresh' expired, 'also-fresh' still valid - const entries = [...cache]; + const entries = [...cache.entriesForRendering(R1)]; expect(entries).toHaveLength(1); expect(entries[0][0]).toBe('also-fresh'); // expired entry was cleaned up time = 0; - expect(cache.get('fresh')).toBeUndefined(); + expect(cache.get(R1, 'fresh')).toBeUndefined(); }); - it('returns empty for a fresh cache', () => { + it('returns empty for a rendering with no entries', () => { const cache = new SchemaCache(); - expect([...cache]).toHaveLength(0); + expect([...cache.entriesForRendering(R1)]).toHaveLength(0); + }); + }); + + describe('getOrFetch', () => { + it('returns the cached schema without fetching on a hit', async () => { + const cache = new SchemaCache(); + const cached = makeSchema('customers'); + cache.set(R1, 'customers', cached); + const fetch = jest.fn(); + + await expect(cache.getOrFetch(R1, 'customers', fetch)).resolves.toBe(cached); + expect(fetch).not.toHaveBeenCalled(); + }); + + it('fetches and caches on a miss', async () => { + const cache = new SchemaCache(); + const fetched = makeSchema('customers'); + const fetch = jest.fn().mockResolvedValue(fetched); + + await expect(cache.getOrFetch(R1, 'customers', fetch)).resolves.toBe(fetched); + expect(cache.get(R1, 'customers')).toBe(fetched); + // Second call is now a cache hit — no second fetch. + await cache.getOrFetch(R1, 'customers', fetch); + expect(fetch).toHaveBeenCalledTimes(1); + }); + + it('shares one in-flight fetch among concurrent callers', async () => { + const cache = new SchemaCache(); + let resolveFetch: (s: CollectionSchema) => void = () => undefined; + const fetch = jest.fn().mockReturnValue( + new Promise(r => { + resolveFetch = r; + }), + ); + + const all = Promise.all([ + cache.getOrFetch(R1, 'customers', fetch), + cache.getOrFetch(R1, 'customers', fetch), + cache.getOrFetch(R1, 'customers', fetch), + ]); + resolveFetch(makeSchema('customers')); + await all; + + expect(fetch).toHaveBeenCalledTimes(1); + }); + + it('keeps fetches for the same collection in different renderings separate', async () => { + const cache = new SchemaCache(); + const fetch = jest + .fn() + .mockResolvedValueOnce(makeSchema('customers', 'Clients')) + .mockResolvedValueOnce(makeSchema('customers', 'Accounts')); + + const [a, b] = await Promise.all([ + cache.getOrFetch(R1, 'customers', fetch), + cache.getOrFetch(R2, 'customers', fetch), + ]); + + expect(fetch).toHaveBeenCalledTimes(2); + expect(a.collectionDisplayName).toBe('Clients'); + expect(b.collectionDisplayName).toBe('Accounts'); + }); + + it('does not cache a rejected fetch and clears the in-flight slot for retry', async () => { + const cache = new SchemaCache(); + const fetch = jest + .fn() + .mockRejectedValueOnce(new Error('transient')) + .mockResolvedValueOnce(makeSchema('customers')); + + await expect(cache.getOrFetch(R1, 'customers', fetch)).rejects.toThrow('transient'); + expect(cache.get(R1, 'customers')).toBeUndefined(); + + // In-flight slot was cleared — a retry triggers a fresh fetch and succeeds. + await expect(cache.getOrFetch(R1, 'customers', fetch)).resolves.toMatchObject({ + collectionName: 'customers', + }); + expect(fetch).toHaveBeenCalledTimes(2); }); }); }); diff --git a/packages/workflow-executor/test/stores/database-store.test.ts b/packages/workflow-executor/test/stores/database-store.test.ts index 1f3878a793..3d04897f8f 100644 --- a/packages/workflow-executor/test/stores/database-store.test.ts +++ b/packages/workflow-executor/test/stores/database-store.test.ts @@ -92,9 +92,9 @@ describe('DatabaseStore (SQLite)', () => { const step: StepExecutionData = { type: 'update-record', stepIndex: 0, - executionParams: { displayName: 'Status', name: 'status', value: 'active' }, + executionParams: { name: 'status', value: 'active' }, executionResult: { updatedValues: { status: 'active', nested: { deep: true } } }, - pendingData: { displayName: 'Status', name: 'status', value: 'active' }, + pendingData: { name: 'status', value: 'active' }, selectedRecordRef: { collectionName: 'users', recordId: ['42'], stepIndex: 0 }, };