Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ export default class AgentClientAgentPort implements AgentPort {
async getRecord({ collection, id, fields }: GetRecordQuery, user: StepUser): Promise<RecordData> {
return this.callAgent('getRecord', async () => {
const client = this.createClient(user);
const schema = this.resolveSchema(collection);
const schema = this.resolveSchema(collection, user.renderingId);
const records = await client.collection(collection).list<Record<string, unknown>>({
filters: buildPkFilter(schema.primaryKeyFields, id),
pagination: { size: 1, number: 1 },
Expand Down Expand Up @@ -116,10 +116,10 @@ export default class AgentClientAgentPort implements AgentPort {
): Promise<RecordData[]> {
return this.callAgent('getRelatedData', async () => {
const client = this.createClient(user);
const parentSchema = this.resolveSchema(collection);
const parentSchema = this.resolveSchema(collection, user.renderingId);
const relationField = parentSchema.fields.find(f => f.fieldName === relation);
const relatedCollectionName = relationField?.relatedCollectionName ?? relation;
const relatedSchema = this.resolveSchema(relatedCollectionName);
const relatedSchema = this.resolveSchema(relatedCollectionName, user.renderingId);

const records = await client
.collection(collection)
Expand Down Expand Up @@ -186,7 +186,7 @@ export default class AgentClientAgentPort implements AgentPort {
return createRemoteAgentClient({
url: this.agentUrl,
token,
actionEndpoints: this.buildActionEndpoints(),
actionEndpoints: this.buildActionEndpoints(user.renderingId),
});
}

Expand All @@ -212,10 +212,10 @@ export default class AgentClientAgentPort implements AgentPort {
}
}

private buildActionEndpoints(): ActionEndpointsByCollection {
private buildActionEndpoints(renderingId: number): ActionEndpointsByCollection {
const endpoints: ActionEndpointsByCollection = {};

for (const [collectionName, schema] of this.schemaCache) {
for (const [collectionName, schema] of this.schemaCache.entriesForRendering(renderingId)) {
endpoints[collectionName] = {};

for (const action of schema.actions) {
Expand All @@ -238,8 +238,8 @@ export default class AgentClientAgentPort implements AgentPort {
return endpoints;
}

private resolveSchema(collectionName: string): CollectionSchema {
const cached = this.schemaCache.get(collectionName);
private resolveSchema(collectionName: string, renderingId: number): CollectionSchema {
const cached = this.schemaCache.get(renderingId, collectionName);

if (!cached) {
// eslint-disable-next-line no-console
Expand Down
22 changes: 16 additions & 6 deletions packages/workflow-executor/src/executors/base-step-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import {
extractErrorMessage,
} from '../errors';
import patchBodySchemas from '../http/pending-data-validators';
import hydrateStepExecutionData, { makeSchemaGetter } from '../hydrate-step-execution-data';
import StepSummaryBuilder from './summary/step-summary-builder';

export default abstract class BaseStepExecutor<TStep extends StepDefinition = StepDefinition>
Expand Down Expand Up @@ -281,15 +282,24 @@ export default abstract class BaseStepExecutor<TStep extends StepDefinition = St
if (!this.context.previousSteps.length) return [];

const allStepExecutions = await this.context.runStore.getStepExecutions(this.context.runId);
const summary = this.context.previousSteps
.map(({ stepDefinition, stepOutcome }) => {
const getSchema = makeSchemaGetter(
this.context.schemaCache,
this.context.workflowPort,
this.context.runId,
this.context.user.renderingId,
);
const summaries = await Promise.all(
this.context.previousSteps.map(async ({ stepDefinition, stepOutcome }) => {
const execution = allStepExecutions.find(e => e.stepIndex === stepOutcome.stepIndex);
const hydrated = execution
? await hydrateStepExecutionData(execution, getSchema, this.context.logger)
: undefined;

return StepSummaryBuilder.build(stepDefinition, stepOutcome, execution);
})
.join('\n\n');
return StepSummaryBuilder.build(stepDefinition, stepOutcome, hydrated);
}),
);

return [new SystemMessage(summary)];
return [new SystemMessage(summaries.join('\n\n'))];
}

private static mergeLeadingSystemMessages(messages: BaseMessage[]): BaseMessage[] {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,6 @@ export default class LoadRelatedRecordStepExecutor extends RecordStepExecutor<Lo

return {
selectedRecordRef,
displayName: field.displayName,
name: field.fieldName,
relationType: field.relationType,
};
Expand All @@ -110,7 +109,7 @@ export default class LoadRelatedRecordStepExecutor extends RecordStepExecutor<Lo
// Branch C: AI suggests the best candidate, then awaits user confirmation. Save errors
// propagate directly — the relation-load hasn't run yet, so the step can be safely retried.
private async saveAndAwaitInput(target: RelationTarget): Promise<StepExecutionResult> {
const { selectedRecordRef, name, displayName } = target;
const { selectedRecordRef, name } = target;

const { relatedData, bestIndex, suggestedFields } = await this.selectBestFromRelatedData(
target,
Expand All @@ -122,7 +121,7 @@ export default class LoadRelatedRecordStepExecutor extends RecordStepExecutor<Lo
await this.context.runStore.saveStepExecution(this.context.runId, {
type: 'load-related-record',
stepIndex: this.context.stepIndex,
pendingData: { displayName, name, suggestedFields, selectedRecordId },
pendingData: { name, suggestedFields, selectedRecordId },
selectedRecordRef,
});

Expand Down Expand Up @@ -152,7 +151,7 @@ export default class LoadRelatedRecordStepExecutor extends RecordStepExecutor<Lo
const name = userConfirmation?.name ?? pendingData.name;
const selectedRecordId = userConfirmation?.selectedRecordId ?? pendingData.selectedRecordId;

// Re-derive relatedCollectionName and displayName because the user may have swapped the relation.
// Re-derive relatedCollectionName because the user may have swapped the relation.
const schema = await this.getCollectionSchema(selectedRecordRef.collectionName);
const field = schema.fields.find(f => f.fieldName === name);

Expand All @@ -162,15 +161,15 @@ export default class LoadRelatedRecordStepExecutor extends RecordStepExecutor<Lo
);
}

const { displayName, relatedCollectionName } = field;
const { relatedCollectionName } = field;

const record: RecordRef = {
collectionName: relatedCollectionName,
recordId: selectedRecordId,
stepIndex: this.context.stepIndex,
};

return this.persistAndReturn(record, { selectedRecordRef, name, displayName }, execution);
return this.persistAndReturn(record, { selectedRecordRef, name }, execution);
}

private async selectBestFromRelatedData(
Expand Down Expand Up @@ -269,17 +268,17 @@ export default class LoadRelatedRecordStepExecutor extends RecordStepExecutor<Lo
/** Persists the loaded record ref and returns a success outcome. */
private async persistAndReturn(
record: RecordRef,
target: Pick<RelationTarget, 'selectedRecordRef' | 'name' | 'displayName'>,
target: Pick<RelationTarget, 'selectedRecordRef' | 'name'>,
existingExecution: LoadRelatedRecordStepExecutionData | undefined,
): Promise<StepExecutionResult> {
const { selectedRecordRef, name, displayName } = target;
const { selectedRecordRef, name } = target;

await this.context.runStore.saveStepExecution(this.context.runId, {
...existingExecution,
type: 'load-related-record',
stepIndex: this.context.stepIndex,
executionParams: { displayName, name },
executionResult: { relation: { name, displayName }, record },
executionParams: { name },
executionResult: { relation: { name }, record },
selectedRecordRef,
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ export default class ReadRecordStepExecutor extends RecordStepExecutor<ReadRecor
type: 'read-record',
stepIndex: this.context.stepIndex,
executionParams: {
fields: fieldResults.map(({ name, displayName }) => ({ name, displayName })),
fields: fieldResults.map(({ name }) => ({ name })),
},
executionResult: { fields: fieldResults },
selectedRecordRef,
Expand Down Expand Up @@ -129,12 +129,11 @@ export default class ReadRecordStepExecutor extends RecordStepExecutor<ReadRecor
return fieldDisplayNames.map(name => {
const field = this.findField(schema, name);

if (!field) return { error: `Field not found: ${name}`, name, displayName: name };
if (!field) return { error: `Field not found: ${name}`, name };

return {
value: values[field.fieldName],
name: field.fieldName,
displayName: field.displayName,
};
});
}
Expand Down
16 changes: 9 additions & 7 deletions packages/workflow-executor/src/executors/record-step-executor.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import type { SchemaGetter } from '../hydrate-step-execution-data';
import type { StepExecutionResult } from '../types/execution-context';
import type { CollectionSchema, FieldSchema, RecordRef } from '../types/validated/collection';
import type { StepDefinition } from '../types/validated/step-definition';
Expand All @@ -7,11 +8,14 @@ import { DynamicStructuredTool, HumanMessage, SystemMessage } from '@forestadmin
import { z } from 'zod';

import { InvalidAIResponseError, InvalidPreRecordedArgsError, NoRecordsError } from '../errors';
import { makeSchemaGetter } from '../hydrate-step-execution-data';
import BaseStepExecutor from './base-step-executor';

export default abstract class RecordStepExecutor<
TStep extends StepDefinition = StepDefinition,
> extends BaseStepExecutor<TStep> {
private schemaGetter?: SchemaGetter;

protected buildOutcomeResult(outcome: {
status: RecordStepStatus;
error?: string;
Expand Down Expand Up @@ -64,16 +68,14 @@ export default abstract class RecordStepExecutor<
}

protected async getCollectionSchema(collectionName: string): Promise<CollectionSchema> {
const cached = this.context.schemaCache.get(collectionName);
if (cached) return cached;

const schema = await this.context.workflowPort.getCollectionSchema(
collectionName,
this.schemaGetter ??= makeSchemaGetter(
this.context.schemaCache,
this.context.workflowPort,
this.context.runId,
this.context.user.renderingId,
);
this.context.schemaCache.set(collectionName, schema);

return schema;
return this.schemaGetter(collectionName);
}

protected findField(schema: CollectionSchema, name: string): FieldSchema | undefined {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import type {
DisplayedLoadRelatedRecordStepExecutionData,
GuidanceStepExecutionData,
LoadRelatedRecordStepExecutionData,
HydratedStepExecutionData,
McpStepExecutionData,
StepExecutionData,
} from '../../types/step-execution-data';

export default class StepExecutionFormatters {
// Returns null when no custom format is defined for the step type or when execution data
// doesn't satisfy formatter preconditions — caller falls back to generic Input:/Output:.
static format(execution: StepExecutionData): string | null {
static format(execution: HydratedStepExecutionData): string | null {
switch (execution.type) {
case 'load-related-record':
return StepExecutionFormatters.formatLoadRelatedRecord(execution);
Expand Down Expand Up @@ -42,7 +42,7 @@ export default class StepExecutionFormatters {
}

private static formatLoadRelatedRecord(
execution: LoadRelatedRecordStepExecutionData,
execution: DisplayedLoadRelatedRecordStepExecutionData,
): string | null {
const { executionResult } = execution;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { StepExecutionData } from '../../types/step-execution-data';
import type { HydratedStepExecutionData } from '../../types/step-execution-data';
import type { StepDefinition } from '../../types/validated/step-definition';
import type { StepOutcome } from '../../types/validated/step-outcome';

Expand All @@ -8,7 +8,7 @@ export default class StepSummaryBuilder {
static build(
step: StepDefinition,
stepOutcome: StepOutcome,
execution: StepExecutionData | undefined,
execution: HydratedStepExecutionData | undefined,
): string {
const prompt = step.prompt ?? '(no prompt)';
const header = `Step "${stepOutcome.stepId}" (index ${stepOutcome.stepIndex}):`;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,6 @@ export default class TriggerRecordActionStepExecutor extends RecordStepExecutor<

const target: ActionTarget = {
selectedRecordRef,
displayName: pendingData.displayName,
name: pendingData.name,
};

Expand Down Expand Up @@ -112,7 +111,7 @@ export default class TriggerRecordActionStepExecutor extends RecordStepExecutor<
? { actionName: preRecordedArgs.actionDisplayName }
: await this.selectAction(schema, step.prompt);
const name = this.resolveActionName(schema, args.actionName);
const target: ActionTarget = { selectedRecordRef, displayName: args.actionName, name };
const target: ActionTarget = { selectedRecordRef, name };

// Branch B -- fully automated: executor runs the action itself, so it cannot
// handle forms (no UI to fill them). Reject form-bearing actions here. When the
Expand All @@ -126,7 +125,7 @@ export default class TriggerRecordActionStepExecutor extends RecordStepExecutor<
},
this.context.user,
);
if (hasForm) throw new UnsupportedActionFormError(target.displayName);
if (hasForm) throw new UnsupportedActionFormError(args.actionName);

return this.executeOnExecutor(target);
}
Expand All @@ -135,7 +134,7 @@ export default class TriggerRecordActionStepExecutor extends RecordStepExecutor<
await this.context.runStore.saveStepExecution(this.context.runId, {
type: 'trigger-action',
stepIndex: this.context.stepIndex,
pendingData: { displayName: target.displayName, name: target.name },
pendingData: { name: target.name },
selectedRecordRef: target.selectedRecordRef,
});

Expand All @@ -144,7 +143,7 @@ export default class TriggerRecordActionStepExecutor extends RecordStepExecutor<

/** Branch B — executor runs the action via agentPort, then persists the result. */
private async executeOnExecutor(target: ActionTarget): Promise<StepExecutionResult> {
const { selectedRecordRef, displayName, name } = target;
const { selectedRecordRef, name } = target;

await this.context.runStore.saveStepExecution(this.context.runId, {
type: 'trigger-action',
Expand All @@ -165,7 +164,7 @@ export default class TriggerRecordActionStepExecutor extends RecordStepExecutor<
await this.context.runStore.saveStepExecution(this.context.runId, {
type: 'trigger-action',
stepIndex: this.context.stepIndex,
executionParams: { displayName, name },
executionParams: { name },
executionResult: { success: true, actionResult },
selectedRecordRef,
idempotencyPhase: 'done',
Expand All @@ -180,13 +179,13 @@ export default class TriggerRecordActionStepExecutor extends RecordStepExecutor<
actionResult: unknown,
existingExecution: TriggerRecordActionStepExecutionData,
): Promise<StepExecutionResult> {
const { selectedRecordRef, displayName, name } = target;
const { selectedRecordRef, name } = target;

await this.context.runStore.saveStepExecution(this.context.runId, {
...existingExecution,
type: 'trigger-action',
stepIndex: this.context.stepIndex,
executionParams: { displayName, name },
executionParams: { name },
executionResult: { success: true, actionResult },
selectedRecordRef,
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,11 +165,11 @@
// A user override of `null` (clearing the field) must win over the AI suggestion, so
// distinguish "no override" (undefined) from "override to null".
const rawValue =
userConfirmation?.value !== undefined ? userConfirmation.value : pendingData!.value;

Check warning on line 168 in packages/workflow-executor/src/executors/update-record-step-executor.ts

View workflow job for this annotation

GitHub Actions / Linting & Testing (workflow-executor)

Forbidden non-null assertion

const target: UpdateTarget = {
selectedRecordRef,
...pendingData!,

Check warning on line 172 in packages/workflow-executor/src/executors/update-record-step-executor.ts

View workflow job for this annotation

GitHub Actions / Linting & Testing (workflow-executor)

Forbidden non-null assertion
// The value comes from an `unknown` HTTP value (may be a boolean or array), so coerce
// it to the field's native type before updating. Idempotent on already-typed values.
value: await this.coerceOverride(selectedRecordRef, pendingData, rawValue),
Expand All @@ -189,9 +189,7 @@
value: unknown,
): Promise<unknown> {
const schema = await this.getCollectionSchema(selectedRecordRef.collectionName);
const fieldSchema =
this.findField(schema, pendingData?.name ?? '') ??
this.findField(schema, pendingData?.displayName ?? '');
const fieldSchema = this.findField(schema, pendingData?.name ?? '');

return coerceFieldValue(fieldSchema, value, selectedRecordRef.collectionName);
}
Expand Down Expand Up @@ -227,7 +225,6 @@
const name = this.resolveFieldName(schema, args.fieldName);
const target: UpdateTarget = {
selectedRecordRef,
displayName: args.fieldName,
name,
value: args.value,
};
Expand All @@ -242,7 +239,6 @@
type: 'update-record',
stepIndex: this.context.stepIndex,
pendingData: {
displayName: target.displayName,
name: target.name,
value: target.value,
},
Expand All @@ -257,7 +253,7 @@
target: UpdateTarget,
existingExecution?: UpdateRecordStepExecutionData,
): Promise<StepExecutionResult> {
const { selectedRecordRef, displayName, name, value } = target;
const { selectedRecordRef, name, value } = target;

await this.context.runStore.saveStepExecution(this.context.runId, {
...existingExecution,
Expand All @@ -280,7 +276,7 @@
...existingExecution,
type: 'update-record',
stepIndex: this.context.stepIndex,
executionParams: { displayName, name, value },
executionParams: { name, value },
executionResult: { updatedValues: updated.values },
selectedRecordRef,
idempotencyPhase: 'done',
Expand Down
Loading
Loading