diff --git a/.server-changes/batch-fast-fail-queue-size-limit.md b/.server-changes/batch-fast-fail-queue-size-limit.md new file mode 100644 index 00000000000..77b926a5a80 --- /dev/null +++ b/.server-changes/batch-fast-fail-queue-size-limit.md @@ -0,0 +1,7 @@ +--- +area: webapp +type: fix +--- + +Batch items that hit the environment queue size limit now fast-fail without +retries and without creating pre-failed TaskRuns. diff --git a/apps/webapp/app/runEngine/services/triggerTask.server.ts b/apps/webapp/app/runEngine/services/triggerTask.server.ts index c8a6dca2c93..8e183d43235 100644 --- a/apps/webapp/app/runEngine/services/triggerTask.server.ts +++ b/apps/webapp/app/runEngine/services/triggerTask.server.ts @@ -41,7 +41,7 @@ import type { TriggerTaskRequest, TriggerTaskValidator, } from "../types"; -import { ServiceValidationError } from "~/v3/services/common.server"; +import { QueueSizeLimitExceededError, ServiceValidationError } from "~/v3/services/common.server"; class NoopTriggerRacepointSystem implements TriggerRacepointSystem { async waitForRacepoint(options: { racepoint: TriggerRacepoints; id: string }): Promise { @@ -271,8 +271,9 @@ export class RunEngineTriggerTaskService { ); if (!queueSizeGuard.ok) { - throw new ServiceValidationError( + throw new QueueSizeLimitExceededError( `Cannot trigger ${taskId} as the queue size limit for this environment has been reached. The maximum size is ${queueSizeGuard.maximumSize}`, + queueSizeGuard.maximumSize ?? 0, undefined, "warn" ); diff --git a/apps/webapp/app/v3/runEngineHandlers.server.ts b/apps/webapp/app/v3/runEngineHandlers.server.ts index 411f91ff75d..e728160f00f 100644 --- a/apps/webapp/app/v3/runEngineHandlers.server.ts +++ b/apps/webapp/app/v3/runEngineHandlers.server.ts @@ -13,6 +13,7 @@ import { logger } from "~/services/logger.server"; import { updateMetadataService } from "~/services/metadata/updateMetadataInstance.server"; import { reportInvocationUsage } from "~/services/platform.v3.server"; import { MetadataTooLargeError } from "~/utils/packets"; +import { QueueSizeLimitExceededError } from "~/v3/services/common.server"; import { TriggerTaskService } from "~/v3/services/triggerTask.server"; import { tracer } from "~/v3/tracer.server"; import { createExceptionPropertiesFromError } from "./eventRepository/common.server"; @@ -637,6 +638,15 @@ export function registerRunEngineEventBusHandlers() { }); } +/** + * errorCode returned by the batch process-item callback when the trigger was + * rejected because the environment's queue is at its maximum size. The + * BatchQueue (via `skipRetries`) short-circuits retries for this code, and the + * batch completion callback collapses per-item errors into a single aggregate + * `BatchTaskRunError` row instead of writing one per item. + */ +const QUEUE_SIZE_LIMIT_EXCEEDED_ERROR_CODE = "QUEUE_SIZE_LIMIT_EXCEEDED"; + /** * Set up the BatchQueue processing callbacks. * These handle creating runs from batch items and completing batches. @@ -808,6 +818,37 @@ export function setupBatchQueueCallbacks() { } catch (error) { const errorMessage = error instanceof Error ? error.message : String(error); + // Queue-size-limit rejections are a customer-overload scenario (the + // env's queue is at its configured max). Retrying is pointless — the + // same item will fail again — and creating pre-failed TaskRuns for + // every item of every retried batch is exactly what chews through + // DB capacity when a noisy tenant fills their queue. Signal the + // BatchQueue to skip retries and skip pre-failed run creation, and + // let the completion callback collapse the per-item errors into a + // single summary row. + if (error instanceof QueueSizeLimitExceededError) { + logger.warn("[BatchQueue] Batch item rejected: queue size limit reached", { + batchId, + friendlyId, + itemIndex, + task: item.task, + environmentId: meta.environmentId, + maximumSize: error.maximumSize, + }); + + span.setAttribute("batch.result.error", errorMessage); + span.setAttribute("batch.result.errorCode", QUEUE_SIZE_LIMIT_EXCEEDED_ERROR_CODE); + span.setAttribute("batch.result.skipRetries", true); + span.end(); + + return { + success: false as const, + error: errorMessage, + errorCode: QUEUE_SIZE_LIMIT_EXCEEDED_ERROR_CODE, + skipRetries: true, + }; + } + logger.error("[BatchQueue] Failed to trigger batch item", { batchId, friendlyId, @@ -889,20 +930,51 @@ export function setupBatchQueueCallbacks() { }, }); - // Create error records if there were failures + // Create error records if there were failures. + // + // Fast-path for queue-size-limit overload: when every failure is the + // same QUEUE_SIZE_LIMIT_EXCEEDED error, collapse them into a single + // aggregate row instead of writing one per item. This keeps the DB + // write volume bounded to O(batches) instead of O(items) when a noisy + // tenant fills their queue and all of their batches start bouncing. if (failures.length > 0) { - await tx.batchTaskRunError.createMany({ - data: failures.map((failure) => ({ - batchTaskRunId: batchId, - index: failure.index, - taskIdentifier: failure.taskIdentifier, - payload: failure.payload, - options: failure.options as Prisma.InputJsonValue | undefined, - error: failure.error, - errorCode: failure.errorCode, - })), - skipDuplicates: true, - }); + const allQueueSizeLimit = failures.every( + (f) => f.errorCode === QUEUE_SIZE_LIMIT_EXCEEDED_ERROR_CODE + ); + + if (allQueueSizeLimit) { + const sample = failures[0]!; + await tx.batchTaskRunError.createMany({ + data: [ + { + batchTaskRunId: batchId, + // Use the first item's index as a stable anchor for the + // (batchTaskRunId, index) unique constraint so callback + // retries remain idempotent. + index: sample.index, + taskIdentifier: sample.taskIdentifier, + payload: sample.payload, + options: sample.options as Prisma.InputJsonValue | undefined, + error: `${sample.error} (${failures.length} items in this batch failed with the same error)`, + errorCode: sample.errorCode, + }, + ], + skipDuplicates: true, + }); + } else { + await tx.batchTaskRunError.createMany({ + data: failures.map((failure) => ({ + batchTaskRunId: batchId, + index: failure.index, + taskIdentifier: failure.taskIdentifier, + payload: failure.payload, + options: failure.options as Prisma.InputJsonValue | undefined, + error: failure.error, + errorCode: failure.errorCode, + })), + skipDuplicates: true, + }); + } } }); diff --git a/apps/webapp/app/v3/services/common.server.ts b/apps/webapp/app/v3/services/common.server.ts index 015e8e23d53..e12d2cc3b5e 100644 --- a/apps/webapp/app/v3/services/common.server.ts +++ b/apps/webapp/app/v3/services/common.server.ts @@ -10,3 +10,22 @@ export class ServiceValidationError extends Error { this.name = "ServiceValidationError"; } } + +/** + * Thrown when a trigger is rejected because the environment's queue is at its + * maximum size. This is identified separately from other validation errors so + * the batch queue worker can short-circuit retries and skip pre-failed run + * creation for this specific overload scenario — see the batch process item + * callback in `runEngineHandlers.server.ts`. + */ +export class QueueSizeLimitExceededError extends ServiceValidationError { + constructor( + message: string, + public maximumSize: number, + status?: number, + logLevel?: ServiceValidationErrorLevel + ) { + super(message, status, logLevel); + this.name = "QueueSizeLimitExceededError"; + } +} diff --git a/internal-packages/run-engine/src/batch-queue/index.ts b/internal-packages/run-engine/src/batch-queue/index.ts index 96a827b53ce..6b49795da52 100644 --- a/internal-packages/run-engine/src/batch-queue/index.ts +++ b/internal-packages/run-engine/src/batch-queue/index.ts @@ -865,8 +865,16 @@ export class BatchQueue { span?.setAttribute("batch.errorCode", result.errorCode); } - // If retries are available, use FairQueue retry scheduling - if (!isFinalAttempt) { + const skipRetries = result.skipRetries === true; + if (skipRetries) { + span?.setAttribute("batch.skipRetries", true); + } + + // If retries are available AND the callback didn't opt out, use + // FairQueue retry scheduling. `skipRetries` short-circuits this + // regardless of attempt number so the batch can finalize quickly + // when the error is known to be non-recoverable on retry. + if (!isFinalAttempt && !skipRetries) { span?.setAttribute("batch.retry", true); span?.setAttribute("batch.attempt", attempt); @@ -890,7 +898,7 @@ export class BatchQueue { return; } - // Final attempt exhausted - record permanent failure + // Final attempt exhausted (or retries skipped) - record permanent failure const payloadStr = await this.#startSpan( "BatchQueue.serializePayload", async (innerSpan) => { diff --git a/internal-packages/run-engine/src/batch-queue/tests/index.test.ts b/internal-packages/run-engine/src/batch-queue/tests/index.test.ts index a4d4e338089..ac44dae7083 100644 --- a/internal-packages/run-engine/src/batch-queue/tests/index.test.ts +++ b/internal-packages/run-engine/src/batch-queue/tests/index.test.ts @@ -776,4 +776,184 @@ describe("BatchQueue", () => { } ); }); + + describe("skipRetries on failed items", () => { + function createBatchQueueWithRetry( + redisContainer: { getHost: () => string; getPort: () => number }, + maxAttempts: number + ) { + return new BatchQueue({ + redis: { + host: redisContainer.getHost(), + port: redisContainer.getPort(), + keyPrefix: "test:", + }, + drr: { quantum: 5, maxDeficit: 50 }, + consumerCount: 1, + consumerIntervalMs: 50, + startConsumers: true, + retry: { + maxAttempts, + // Keep the ladder tiny so a regression (items retrying N times) + // still finishes inside the waitFor timeout and the test surfaces + // the problem as a failed attempt-count assertion rather than a + // timeout. + minTimeoutInMs: 20, + maxTimeoutInMs: 100, + factor: 2, + randomize: false, + }, + }); + } + + redisTest( + "should not retry when callback returns skipRetries: true", + async ({ redisContainer }) => { + const queue = createBatchQueueWithRetry(redisContainer, 6); + const itemAttempts = new Map(); + let completionResult: CompleteBatchResult | null = null; + + try { + queue.onProcessItem(async ({ itemIndex }) => { + itemAttempts.set(itemIndex, (itemAttempts.get(itemIndex) ?? 0) + 1); + return { + success: false as const, + error: "Queue at maximum size", + errorCode: "QUEUE_SIZE_LIMIT_EXCEEDED", + skipRetries: true, + }; + }); + + queue.onBatchComplete(async (result) => { + completionResult = result; + }); + + await queue.initializeBatch(createInitOptions("batch1", "env1", 3)); + await enqueueItems(queue, "batch1", "env1", createBatchItems(3)); + + await vi.waitFor( + () => { + expect(completionResult).not.toBeNull(); + }, + { timeout: 5000 } + ); + + // Every item should have been called exactly once — skipRetries + // must bypass the 6-attempt retry ladder on the very first attempt. + expect(itemAttempts.get(0)).toBe(1); + expect(itemAttempts.get(1)).toBe(1); + expect(itemAttempts.get(2)).toBe(1); + + expect(completionResult!.successfulRunCount).toBe(0); + expect(completionResult!.failedRunCount).toBe(3); + expect(completionResult!.failures).toHaveLength(3); + for (const failure of completionResult!.failures) { + expect(failure.errorCode).toBe("QUEUE_SIZE_LIMIT_EXCEEDED"); + } + } finally { + await queue.close(); + } + } + ); + + redisTest( + "should still retry up to maxAttempts when skipRetries is not set (regression guard)", + async ({ redisContainer }) => { + const maxAttempts = 3; + const queue = createBatchQueueWithRetry(redisContainer, maxAttempts); + const itemAttempts = new Map(); + let completionResult: CompleteBatchResult | null = null; + + try { + queue.onProcessItem(async ({ itemIndex }) => { + itemAttempts.set(itemIndex, (itemAttempts.get(itemIndex) ?? 0) + 1); + return { + success: false as const, + error: "Transient error", + errorCode: "TRIGGER_ERROR", + // Intentionally NOT setting skipRetries — the existing + // exponential-backoff retry path should still be honored. + }; + }); + + queue.onBatchComplete(async (result) => { + completionResult = result; + }); + + await queue.initializeBatch(createInitOptions("batch1", "env1", 2)); + await enqueueItems(queue, "batch1", "env1", createBatchItems(2)); + + await vi.waitFor( + () => { + expect(completionResult).not.toBeNull(); + }, + { timeout: 5000 } + ); + + expect(itemAttempts.get(0)).toBe(maxAttempts); + expect(itemAttempts.get(1)).toBe(maxAttempts); + + expect(completionResult!.failedRunCount).toBe(2); + } finally { + await queue.close(); + } + } + ); + + redisTest( + "should honor skipRetries on a per-item basis within the same batch", + async ({ redisContainer }) => { + const maxAttempts = 4; + const queue = createBatchQueueWithRetry(redisContainer, maxAttempts); + const itemAttempts = new Map(); + let completionResult: CompleteBatchResult | null = null; + + try { + queue.onProcessItem(async ({ itemIndex }) => { + itemAttempts.set(itemIndex, (itemAttempts.get(itemIndex) ?? 0) + 1); + // Even items fast-fail (queue-size-limit style), + // odd items retry the full ladder. + if (itemIndex % 2 === 0) { + return { + success: false as const, + error: "Queue at maximum size", + errorCode: "QUEUE_SIZE_LIMIT_EXCEEDED", + skipRetries: true, + }; + } + return { + success: false as const, + error: "Transient error", + errorCode: "TRIGGER_ERROR", + }; + }); + + queue.onBatchComplete(async (result) => { + completionResult = result; + }); + + await queue.initializeBatch(createInitOptions("batch1", "env1", 4)); + await enqueueItems(queue, "batch1", "env1", createBatchItems(4)); + + await vi.waitFor( + () => { + expect(completionResult).not.toBeNull(); + }, + { timeout: 5000 } + ); + + // Even-indexed items should fast-fail (1 attempt each) + expect(itemAttempts.get(0)).toBe(1); + expect(itemAttempts.get(2)).toBe(1); + // Odd-indexed items should exhaust the retry ladder + expect(itemAttempts.get(1)).toBe(maxAttempts); + expect(itemAttempts.get(3)).toBe(maxAttempts); + + expect(completionResult!.failedRunCount).toBe(4); + } finally { + await queue.close(); + } + } + ); + }); }); diff --git a/internal-packages/run-engine/src/batch-queue/types.ts b/internal-packages/run-engine/src/batch-queue/types.ts index 3695faf120e..dfb89c3eb2d 100644 --- a/internal-packages/run-engine/src/batch-queue/types.ts +++ b/internal-packages/run-engine/src/batch-queue/types.ts @@ -269,7 +269,20 @@ export type ProcessBatchItemCallback = (params: { /** Whether this is the final attempt (no more retries after this). */ isFinalAttempt: boolean; }) => Promise< - { success: true; runId: string } | { success: false; error: string; errorCode?: string } + | { success: true; runId: string } + | { + success: false; + error: string; + errorCode?: string; + /** + * When true, the BatchQueue will skip any remaining retries for this item + * and record the failure immediately, regardless of the current attempt + * number. Use this for errors that will deterministically fail again on + * retry (e.g. the environment queue is at its size limit), so the batch + * can finalize quickly without burning through the retry ladder. + */ + skipRetries?: boolean; + } >; /**