Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion packages/workflow-executor/CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ src/
├── stores/ # RunStore implementations
│ ├── in-memory-store.ts # InMemoryStore — Map-based, for tests
│ ├── database-store.ts # DatabaseStore — Sequelize + umzug migrations
│ ├── mcp-oauth-credentials-store.ts # McpOAuthCredentialsStore — ai_mcp_oauth_credentials (002 migration)
│ └── build-run-store.ts # Factory functions: buildDatabaseRunStore, buildInMemoryRunStore
├── adapters/ # Port implementations
│ ├── agent-client-agent-port.ts # AgentPort via @forestadmin/agent-client
Expand All @@ -72,7 +73,9 @@ src/
│ ├── load-related-record-step-executor.ts # AI-powered relation loading step (with confirmation flow)
│ └── guidance-step-executor.ts # Manual guidance step (saves user input, no AI)
├── http/ # HTTP server (optional, for frontend data access)
│ └── executor-http-server.ts # Koa server: GET /runs/:runId, POST /runs/:runId/trigger
│ └── executor-http-server.ts # Koa server: GET /runs/:runId, POST /runs/:runId/trigger, POST+DELETE /mcp-oauth-credentials
├── crypto/ # At-rest encryption
│ └── credential-encryption.ts # CredentialEncryption — HKDF (FOREST_EXECUTOR_ENCRYPTION_KEY) + AES-GCM, lazy key, fail-closed
└── index.ts # Barrel exports
```

Expand Down
4 changes: 4 additions & 0 deletions packages/workflow-executor/src/build-workflow-executor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import ConsoleLogger from './adapters/console-logger';
import ForestServerWorkflowPort from './adapters/forest-server-workflow-port';
import ForestadminClientActivityLogPortFactory from './adapters/forestadmin-client-activity-log-port-factory';
import ServerAiAdapter from './adapters/server-ai-adapter';
import CredentialEncryption from './crypto/credential-encryption';
import {
DEFAULT_FOREST_SERVER_URL,
DEFAULT_POLLING_INTERVAL_MS,
Expand All @@ -23,6 +24,7 @@ import Runner from './runner';
import SchemaCache from './schema-cache';
import DatabaseStore from './stores/database-store';
import InMemoryStore from './stores/in-memory-store';
import McpOAuthCredentialsStore from './stores/mcp-oauth-credentials-store';

const FORCE_EXIT_DELAY_MS = 5000;

Expand Down Expand Up @@ -222,6 +224,8 @@ export function buildDatabaseExecutor(options: DatabaseExecutorOptions): Workflo
authSecret: options.authSecret,
workflowPort: deps.workflowPort,
logger: deps.logger,
mcpOAuthCredentialsStore: new McpOAuthCredentialsStore({ sequelize }),
credentialEncryption: new CredentialEncryption(),
});

return createWorkflowExecutor(runner, server, deps.logger);
Expand Down
91 changes: 91 additions & 0 deletions packages/workflow-executor/src/crypto/credential-encryption.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import { createCipheriv, createDecipheriv, hkdfSync, randomFillSync } from 'crypto';

import { ExecutorEncryptionKeyMissingError } from '../errors';

const ENV_KEY = 'FOREST_EXECUTOR_ENCRYPTION_KEY';
// Fixed context label bound into the HKDF derivation — domain-separates this key from any other
// use of the same secret. Changing it would make every existing row undecryptable.
const HKDF_INFO = 'forest-executor:mcp-oauth-credentials';
const HKDF_DIGEST = 'sha256';
const KEY_BYTES = 32; // AES-256
const IV_BYTES = 12; // GCM standard nonce length
const AUTH_TAG_BYTES = 16;
const ALGORITHM = 'aes-256-gcm';
const CURRENT_ENC_KEY_VERSION = 1;

export interface EncryptedValue {
// Packed layout: iv | authTag | ciphertext — stored as a single BLOB column.
ciphertext: Buffer;
encKeyVersion: number;
}

// Concatenate byte arrays without going through Buffer.concat — keeps everything in the concrete
// Uint8Array<ArrayBuffer> domain the Node crypto types expect.
function concatBytes(parts: Uint8Array[]): Uint8Array {
const total = parts.reduce((length, part) => length + part.length, 0);
const out = new Uint8Array(total);
let offset = 0;

for (const part of parts) {
out.set(part, offset);
offset += part.length;
}

return out;
}

// At-rest encryption for OAuth credentials. The key is derived in-process via HKDF from
// FOREST_EXECUTOR_ENCRYPTION_KEY and is read lazily — an executor with no OAuth in use boots and
// runs without the key ever being required. Fails closed: a missing key throws rather than
// persisting or returning an unprotected value.
export default class CredentialEncryption {
private readonly encKeyVersion: number;

constructor(encKeyVersion: number = CURRENT_ENC_KEY_VERSION) {
this.encKeyVersion = encKeyVersion;
}

encrypt(plaintext: string): EncryptedValue {
const iv = randomFillSync(new Uint8Array(IV_BYTES));
const cipher = createCipheriv(ALGORITHM, this.deriveKey(), iv);
const encrypted = concatBytes([
new Uint8Array(cipher.update(plaintext, 'utf8')),
new Uint8Array(cipher.final()),
]);
const authTag = new Uint8Array(cipher.getAuthTag());

return {
ciphertext: Buffer.from(concatBytes([iv, authTag, encrypted])),
encKeyVersion: this.encKeyVersion,
};
}

decrypt(value: Buffer): string {
const bytes = new Uint8Array(value);
const iv = bytes.subarray(0, IV_BYTES);
const authTag = bytes.subarray(IV_BYTES, IV_BYTES + AUTH_TAG_BYTES);
const encrypted = bytes.subarray(IV_BYTES + AUTH_TAG_BYTES);

const decipher = createDecipheriv(ALGORITHM, this.deriveKey(), iv);
decipher.setAuthTag(authTag);

const decrypted = concatBytes([
new Uint8Array(decipher.update(encrypted)),
new Uint8Array(decipher.final()),
]);

return Buffer.from(decrypted).toString('utf8');
}

private deriveKey(): Uint8Array {
const secret = process.env[ENV_KEY];

if (!secret) throw new ExecutorEncryptionKeyMissingError();

// Empty salt is intentional: the fixed HKDF_INFO label provides domain separation and the
// input is a single high-entropy secret, so a salt would add no security here.
// hkdfSync returns an ArrayBuffer; wrap it as a concrete Uint8Array<ArrayBuffer> so it
// satisfies the crypto CipherKey type (Buffer's generic ArrayBufferLike backing does not).
return new Uint8Array(hkdfSync(HKDF_DIGEST, secret, new Uint8Array(0), HKDF_INFO, KEY_BYTES));
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

claude-opus-4-8 — [Preferential] HKDF uses an empty salt. This is acceptable (the fixed info label provides domain separation and the input is a high-entropy secret), but a one-line comment noting the empty salt is intentional would preempt the security-review question.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

claude-opus-4-8 — Agreed. Added a comment noting the empty salt is intentional (domain separation comes from the fixed info label; the input is a single high-entropy secret). Pending in the next push.

}
}
11 changes: 11 additions & 0 deletions packages/workflow-executor/src/errors.ts
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,17 @@ export class ConfigurationError extends Error {
}
}

// Boundary error — the deposit endpoint translates it into a typed HTTP response so the frontend
// can tell an operator to provision the key, rather than treating it as a generic / re-consent failure.
export class ExecutorEncryptionKeyMissingError extends Error {
readonly code = 'executor_encryption_key_missing';

constructor() {
super('FOREST_EXECUTOR_ENCRYPTION_KEY is not set');
this.name = 'ExecutorEncryptionKeyMissingError';
}
}

export class RunNotFoundError extends Error {
cause?: unknown;

Expand Down
121 changes: 118 additions & 3 deletions packages/workflow-executor/src/http/executor-http-server.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import type CredentialEncryption from '../crypto/credential-encryption';
import type { Logger } from '../ports/logger-port';
import type { WorkflowPort } from '../ports/workflow-port';
import type Runner from '../runner';
import type McpOAuthCredentialsStore from '../stores/mcp-oauth-credentials-store';
import type { StepUser } from '../types/execution-context';
import type { Server } from 'http';

Expand All @@ -12,6 +14,7 @@ import koaJwt from 'koa-jwt';

import ConsoleLogger from '../adapters/console-logger';
import {
ExecutorEncryptionKeyMissingError,
RunNotFoundError,
UserMismatchError,
WorkflowExecutorError,
Expand All @@ -24,17 +27,32 @@ export interface ExecutorHttpServerOptions {
authSecret: string;
workflowPort: WorkflowPort;
logger?: Logger;
mcpOAuthCredentialsStore?: McpOAuthCredentialsStore;
credentialEncryption?: CredentialEncryption;
}

interface DepositCredentialsBody {
mcpServerId?: string;
refreshToken?: string;
clientId?: string;
clientSecret?: string;
clientSecretExpiresAt?: string;
tokenEndpoint?: string;
tokenEndpointAuthMethod?: string;
scopes?: string;
}

export default class ExecutorHttpServer {
private readonly app: Koa;
private readonly options: ExecutorHttpServerOptions;
private readonly logger: Logger;
private readonly mcpOAuthCredentialsStore?: McpOAuthCredentialsStore;
private server: Server | null = null;

constructor(options: ExecutorHttpServerOptions) {
this.options = options;
this.logger = options.logger ?? new ConsoleLogger();
this.mcpOAuthCredentialsStore = options.mcpOAuthCredentialsStore;
this.app = new Koa();

// Error middleware — catches all errors (including JWT 401) and returns structured JSON
Expand Down Expand Up @@ -94,11 +112,29 @@ export default class ExecutorHttpServer {
);
router.post('/runs/:runId/trigger', this.handleTrigger.bind(this));

// Registered only when both dependencies are wired (a real executor with a database) — keeps
// the OAuth deposit surface absent (and dormant) on in-memory / OAuth-less deployments.
const credentialsStore = this.options.mcpOAuthCredentialsStore;
const { credentialEncryption } = this.options;

if (credentialsStore && credentialEncryption) {
router.post('/mcp-oauth-credentials', ctx =>
this.handleDepositCredentials(ctx, credentialsStore, credentialEncryption),
);
router.delete('/mcp-oauth-credentials/:mcpServerId', ctx =>
this.handleDeleteCredentials(ctx, credentialsStore),
);
}

this.app.use(router.routes());
this.app.use(router.allowedMethods());
}

async start(): Promise<void> {
if (this.mcpOAuthCredentialsStore) {
await this.mcpOAuthCredentialsStore.init(this.logger);
}

return new Promise((resolve, reject) => {
this.server = http.createServer(this.app.callback());
this.server.once('error', reject);
Expand Down Expand Up @@ -165,10 +201,9 @@ export default class ExecutorHttpServer {

private async handleTrigger(ctx: Koa.Context): Promise<void> {
const { runId } = ctx.params;
const rawId = (ctx.state.user as { id?: unknown })?.id;
const bearerUserId = typeof rawId === 'number' ? rawId : Number(rawId);
const bearerUserId = this.getBearerUserId(ctx);

if (!Number.isFinite(bearerUserId)) {
if (bearerUserId === null) {
ctx.status = 400;
ctx.body = { error: 'Missing or invalid user id in token' };

Expand Down Expand Up @@ -217,4 +252,84 @@ export default class ExecutorHttpServer {
ctx.status = 200;
ctx.body = { triggered: true };
}

private getBearerUserId(ctx: Koa.Context): number | null {
const rawId = (ctx.state.user as { id?: unknown })?.id;
const userId = typeof rawId === 'number' ? rawId : Number(rawId);

return Number.isFinite(userId) ? userId : null;
}

private async handleDepositCredentials(
ctx: Koa.Context,
store: McpOAuthCredentialsStore,
encryption: CredentialEncryption,
): Promise<void> {
const userId = this.getBearerUserId(ctx);

if (userId === null) {
ctx.status = 400;
ctx.body = { error: 'Missing or invalid user id in token' };

return;
}

const body = (ctx.request.body ?? {}) as DepositCredentialsBody;

if (!body.mcpServerId || !body.refreshToken) {
ctx.status = 400;
ctx.body = { error: 'mcpServerId and refreshToken are required' };

return;
}

try {
const refreshToken = encryption.encrypt(body.refreshToken);
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

claude-opus-4-8 — [Preferential] No explicit upper bound on refreshToken/clientSecret length before encryption. bodyParser caps the JSON body (~1mb) and the columns are BLOB, so it is bounded in practice; an explicit length guard would be defensive on this authenticated external surface.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

claude-opus-4-8 — Pushing back; low probability and already bounded: bodyParser caps the JSON body (~1mb) and these are BLOB columns. An explicit length guard is validation surface the spec does not call for, so declining for now.

const clientSecret = body.clientSecret ? encryption.encrypt(body.clientSecret) : null;

await store.upsert({
userId,
mcpServerId: body.mcpServerId,
refreshTokenEnc: refreshToken.ciphertext,
clientId: body.clientId ?? null,
clientSecretEnc: clientSecret?.ciphertext ?? null,
clientSecretExpiresAt: body.clientSecretExpiresAt
? new Date(body.clientSecretExpiresAt)
: null,
tokenEndpoint: body.tokenEndpoint ?? null,
tokenEndpointAuthMethod: body.tokenEndpointAuthMethod ?? null,
scopes: body.scopes ?? null,
encKeyVersion: refreshToken.encKeyVersion,
});
} catch (err) {
if (err instanceof ExecutorEncryptionKeyMissingError) {
ctx.status = 503;
ctx.body = { code: err.code };

return;
}

throw err;
}

ctx.status = 200;
ctx.body = { stored: true };
}

private async handleDeleteCredentials(
ctx: Koa.Context,
store: McpOAuthCredentialsStore,
): Promise<void> {
const userId = this.getBearerUserId(ctx);

if (userId === null) {
ctx.status = 400;
ctx.body = { error: 'Missing or invalid user id in token' };

return;
}

await store.delete(userId, ctx.params.mcpServerId);
ctx.status = 204;
Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

claude-opus-4-8 — [Preferential] DELETE returns 204 no-body while POST returns 200 { stored: true }. 204 is correct REST, but since the frontend will consume this, confirm it expects a no-body delete rather than a JSON envelope (the success-body convention is currently an it.todo in the route test).

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

claude-opus-4-8 — Intentional: 204 No Content is correct REST for a delete. The frontend consumer is a later PR and will be written against this endpoint, so there is no existing contract to break; the success-body shape stays an it.todo until that PR pins it.

}
}
9 changes: 9 additions & 0 deletions packages/workflow-executor/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ export {
AiModelPortError,
AgentProbeError,
ConfigurationError,
ExecutorEncryptionKeyMissingError,
InvalidPreRecordedArgsError,
UnsupportedStepTypeError,
UnsupportedActionFormError,
Expand All @@ -125,6 +126,14 @@ export { default as SchemaCache } from './schema-cache';
export { default as InMemoryStore } from './stores/in-memory-store';
export { default as DatabaseStore } from './stores/database-store';
export type { DatabaseStoreOptions } from './stores/database-store';
export { default as McpOAuthCredentialsStore } from './stores/mcp-oauth-credentials-store';
export type {
McpOAuthCredentialInput,
StoredMcpOAuthCredential,
McpOAuthCredentialsStoreOptions,
} from './stores/mcp-oauth-credentials-store';
export { default as CredentialEncryption } from './crypto/credential-encryption';
export type { EncryptedValue } from './crypto/credential-encryption';
export { buildDatabaseRunStore, buildInMemoryRunStore } from './stores/build-run-store';
export { buildInMemoryExecutor, buildDatabaseExecutor } from './build-workflow-executor';
export { runCli } from './cli-core';
Expand Down
Loading
Loading