Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .server-changes/batch-fast-fail-queue-size-limit.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
area: webapp
type: fix
---

Batch items that hit the environment queue size limit now fast-fail without
retries and without creating pre-failed TaskRuns.
5 changes: 3 additions & 2 deletions apps/webapp/app/runEngine/services/triggerTask.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ import type {
TriggerTaskRequest,
TriggerTaskValidator,
} from "../types";
import { ServiceValidationError } from "~/v3/services/common.server";
import { QueueSizeLimitExceededError, ServiceValidationError } from "~/v3/services/common.server";

class NoopTriggerRacepointSystem implements TriggerRacepointSystem {
async waitForRacepoint(options: { racepoint: TriggerRacepoints; id: string }): Promise<void> {
Expand Down Expand Up @@ -271,8 +271,9 @@ export class RunEngineTriggerTaskService {
);

if (!queueSizeGuard.ok) {
throw new ServiceValidationError(
throw new QueueSizeLimitExceededError(
`Cannot trigger ${taskId} as the queue size limit for this environment has been reached. The maximum size is ${queueSizeGuard.maximumSize}`,
queueSizeGuard.maximumSize ?? 0,
undefined,
"warn"
);
Expand Down
98 changes: 85 additions & 13 deletions apps/webapp/app/v3/runEngineHandlers.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import { logger } from "~/services/logger.server";
import { updateMetadataService } from "~/services/metadata/updateMetadataInstance.server";
import { reportInvocationUsage } from "~/services/platform.v3.server";
import { MetadataTooLargeError } from "~/utils/packets";
import { QueueSizeLimitExceededError } from "~/v3/services/common.server";
import { TriggerTaskService } from "~/v3/services/triggerTask.server";
import { tracer } from "~/v3/tracer.server";
import { createExceptionPropertiesFromError } from "./eventRepository/common.server";
Expand Down Expand Up @@ -637,6 +638,15 @@ export function registerRunEngineEventBusHandlers() {
});
}

/**
* errorCode returned by the batch process-item callback when the trigger was
* rejected because the environment's queue is at its maximum size. The
* BatchQueue (via `skipRetries`) short-circuits retries for this code, and the
* batch completion callback collapses per-item errors into a single aggregate
* `BatchTaskRunError` row instead of writing one per item.
*/
const QUEUE_SIZE_LIMIT_EXCEEDED_ERROR_CODE = "QUEUE_SIZE_LIMIT_EXCEEDED";

/**
* Set up the BatchQueue processing callbacks.
* These handle creating runs from batch items and completing batches.
Expand Down Expand Up @@ -808,6 +818,37 @@ export function setupBatchQueueCallbacks() {
} catch (error) {
const errorMessage = error instanceof Error ? error.message : String(error);

// Queue-size-limit rejections are a customer-overload scenario (the
// env's queue is at its configured max). Retrying is pointless — the
// same item will fail again — and creating pre-failed TaskRuns for
// every item of every retried batch is exactly what chews through
// DB capacity when a noisy tenant fills their queue. Signal the
// BatchQueue to skip retries and skip pre-failed run creation, and
// let the completion callback collapse the per-item errors into a
// single summary row.
if (error instanceof QueueSizeLimitExceededError) {
logger.warn("[BatchQueue] Batch item rejected: queue size limit reached", {
batchId,
friendlyId,
itemIndex,
task: item.task,
environmentId: meta.environmentId,
maximumSize: error.maximumSize,
});

span.setAttribute("batch.result.error", errorMessage);
span.setAttribute("batch.result.errorCode", QUEUE_SIZE_LIMIT_EXCEEDED_ERROR_CODE);
span.setAttribute("batch.result.skipRetries", true);
span.end();

return {
success: false as const,
error: errorMessage,
errorCode: QUEUE_SIZE_LIMIT_EXCEEDED_ERROR_CODE,
skipRetries: true,
};
}

logger.error("[BatchQueue] Failed to trigger batch item", {
batchId,
friendlyId,
Expand Down Expand Up @@ -889,20 +930,51 @@ export function setupBatchQueueCallbacks() {
},
});

// Create error records if there were failures
// Create error records if there were failures.
//
// Fast-path for queue-size-limit overload: when every failure is the
// same QUEUE_SIZE_LIMIT_EXCEEDED error, collapse them into a single
// aggregate row instead of writing one per item. This keeps the DB
// write volume bounded to O(batches) instead of O(items) when a noisy
// tenant fills their queue and all of their batches start bouncing.
if (failures.length > 0) {
await tx.batchTaskRunError.createMany({
data: failures.map((failure) => ({
batchTaskRunId: batchId,
index: failure.index,
taskIdentifier: failure.taskIdentifier,
payload: failure.payload,
options: failure.options as Prisma.InputJsonValue | undefined,
error: failure.error,
errorCode: failure.errorCode,
})),
skipDuplicates: true,
});
const allQueueSizeLimit = failures.every(
(f) => f.errorCode === QUEUE_SIZE_LIMIT_EXCEEDED_ERROR_CODE
);

if (allQueueSizeLimit) {
const sample = failures[0]!;
await tx.batchTaskRunError.createMany({
data: [
{
batchTaskRunId: batchId,
// Use the first item's index as a stable anchor for the
// (batchTaskRunId, index) unique constraint so callback
// retries remain idempotent.
index: sample.index,
taskIdentifier: sample.taskIdentifier,
payload: sample.payload,
options: sample.options as Prisma.InputJsonValue | undefined,
error: `${sample.error} (${failures.length} items in this batch failed with the same error)`,
errorCode: sample.errorCode,
},
],
skipDuplicates: true,
});
} else {
await tx.batchTaskRunError.createMany({
data: failures.map((failure) => ({
batchTaskRunId: batchId,
index: failure.index,
taskIdentifier: failure.taskIdentifier,
payload: failure.payload,
options: failure.options as Prisma.InputJsonValue | undefined,
error: failure.error,
errorCode: failure.errorCode,
})),
skipDuplicates: true,
});
}
}
});

Expand Down
19 changes: 19 additions & 0 deletions apps/webapp/app/v3/services/common.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,22 @@ export class ServiceValidationError extends Error {
this.name = "ServiceValidationError";
}
}

/**
* Thrown when a trigger is rejected because the environment's queue is at its
* maximum size. This is identified separately from other validation errors so
* the batch queue worker can short-circuit retries and skip pre-failed run
* creation for this specific overload scenario — see the batch process item
* callback in `runEngineHandlers.server.ts`.
*/
export class QueueSizeLimitExceededError extends ServiceValidationError {
constructor(
message: string,
public maximumSize: number,
status?: number,
logLevel?: ServiceValidationErrorLevel
) {
super(message, status, logLevel);
this.name = "QueueSizeLimitExceededError";
}
}
14 changes: 11 additions & 3 deletions internal-packages/run-engine/src/batch-queue/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -865,8 +865,16 @@ export class BatchQueue {
span?.setAttribute("batch.errorCode", result.errorCode);
}

// If retries are available, use FairQueue retry scheduling
if (!isFinalAttempt) {
const skipRetries = result.skipRetries === true;
if (skipRetries) {
span?.setAttribute("batch.skipRetries", true);
}

// If retries are available AND the callback didn't opt out, use
// FairQueue retry scheduling. `skipRetries` short-circuits this
// regardless of attempt number so the batch can finalize quickly
// when the error is known to be non-recoverable on retry.
if (!isFinalAttempt && !skipRetries) {
span?.setAttribute("batch.retry", true);
span?.setAttribute("batch.attempt", attempt);

Expand All @@ -890,7 +898,7 @@ export class BatchQueue {
return;
}

// Final attempt exhausted - record permanent failure
// Final attempt exhausted (or retries skipped) - record permanent failure
const payloadStr = await this.#startSpan(
"BatchQueue.serializePayload",
async (innerSpan) => {
Expand Down
180 changes: 180 additions & 0 deletions internal-packages/run-engine/src/batch-queue/tests/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -776,4 +776,184 @@ describe("BatchQueue", () => {
}
);
});

describe("skipRetries on failed items", () => {
function createBatchQueueWithRetry(
redisContainer: { getHost: () => string; getPort: () => number },
maxAttempts: number
) {
return new BatchQueue({
redis: {
host: redisContainer.getHost(),
port: redisContainer.getPort(),
keyPrefix: "test:",
},
drr: { quantum: 5, maxDeficit: 50 },
consumerCount: 1,
consumerIntervalMs: 50,
startConsumers: true,
retry: {
maxAttempts,
// Keep the ladder tiny so a regression (items retrying N times)
// still finishes inside the waitFor timeout and the test surfaces
// the problem as a failed attempt-count assertion rather than a
// timeout.
minTimeoutInMs: 20,
maxTimeoutInMs: 100,
factor: 2,
randomize: false,
},
});
}

redisTest(
"should not retry when callback returns skipRetries: true",
async ({ redisContainer }) => {
const queue = createBatchQueueWithRetry(redisContainer, 6);
const itemAttempts = new Map<number, number>();
let completionResult: CompleteBatchResult | null = null;

try {
queue.onProcessItem(async ({ itemIndex }) => {
itemAttempts.set(itemIndex, (itemAttempts.get(itemIndex) ?? 0) + 1);
return {
success: false as const,
error: "Queue at maximum size",
errorCode: "QUEUE_SIZE_LIMIT_EXCEEDED",
skipRetries: true,
};
});

queue.onBatchComplete(async (result) => {
completionResult = result;
});

await queue.initializeBatch(createInitOptions("batch1", "env1", 3));
await enqueueItems(queue, "batch1", "env1", createBatchItems(3));

await vi.waitFor(
() => {
expect(completionResult).not.toBeNull();
},
{ timeout: 5000 }
);

// Every item should have been called exactly once — skipRetries
// must bypass the 6-attempt retry ladder on the very first attempt.
expect(itemAttempts.get(0)).toBe(1);
expect(itemAttempts.get(1)).toBe(1);
expect(itemAttempts.get(2)).toBe(1);

expect(completionResult!.successfulRunCount).toBe(0);
expect(completionResult!.failedRunCount).toBe(3);
expect(completionResult!.failures).toHaveLength(3);
for (const failure of completionResult!.failures) {
expect(failure.errorCode).toBe("QUEUE_SIZE_LIMIT_EXCEEDED");
}
} finally {
await queue.close();
}
}
);

redisTest(
"should still retry up to maxAttempts when skipRetries is not set (regression guard)",
async ({ redisContainer }) => {
const maxAttempts = 3;
const queue = createBatchQueueWithRetry(redisContainer, maxAttempts);
const itemAttempts = new Map<number, number>();
let completionResult: CompleteBatchResult | null = null;

try {
queue.onProcessItem(async ({ itemIndex }) => {
itemAttempts.set(itemIndex, (itemAttempts.get(itemIndex) ?? 0) + 1);
return {
success: false as const,
error: "Transient error",
errorCode: "TRIGGER_ERROR",
// Intentionally NOT setting skipRetries — the existing
// exponential-backoff retry path should still be honored.
};
});

queue.onBatchComplete(async (result) => {
completionResult = result;
});

await queue.initializeBatch(createInitOptions("batch1", "env1", 2));
await enqueueItems(queue, "batch1", "env1", createBatchItems(2));

await vi.waitFor(
() => {
expect(completionResult).not.toBeNull();
},
{ timeout: 5000 }
);

expect(itemAttempts.get(0)).toBe(maxAttempts);
expect(itemAttempts.get(1)).toBe(maxAttempts);

expect(completionResult!.failedRunCount).toBe(2);
} finally {
await queue.close();
}
}
);

redisTest(
"should honor skipRetries on a per-item basis within the same batch",
async ({ redisContainer }) => {
const maxAttempts = 4;
const queue = createBatchQueueWithRetry(redisContainer, maxAttempts);
const itemAttempts = new Map<number, number>();
let completionResult: CompleteBatchResult | null = null;

try {
queue.onProcessItem(async ({ itemIndex }) => {
itemAttempts.set(itemIndex, (itemAttempts.get(itemIndex) ?? 0) + 1);
// Even items fast-fail (queue-size-limit style),
// odd items retry the full ladder.
if (itemIndex % 2 === 0) {
return {
success: false as const,
error: "Queue at maximum size",
errorCode: "QUEUE_SIZE_LIMIT_EXCEEDED",
skipRetries: true,
};
}
return {
success: false as const,
error: "Transient error",
errorCode: "TRIGGER_ERROR",
};
});

queue.onBatchComplete(async (result) => {
completionResult = result;
});

await queue.initializeBatch(createInitOptions("batch1", "env1", 4));
await enqueueItems(queue, "batch1", "env1", createBatchItems(4));

await vi.waitFor(
() => {
expect(completionResult).not.toBeNull();
},
{ timeout: 5000 }
);

// Even-indexed items should fast-fail (1 attempt each)
expect(itemAttempts.get(0)).toBe(1);
expect(itemAttempts.get(2)).toBe(1);
// Odd-indexed items should exhaust the retry ladder
expect(itemAttempts.get(1)).toBe(maxAttempts);
expect(itemAttempts.get(3)).toBe(maxAttempts);

expect(completionResult!.failedRunCount).toBe(4);
} finally {
await queue.close();
}
}
);
});
});
Loading
Loading