diff --git a/.server-changes/stop-creating-taskruntag-records.md b/.server-changes/stop-creating-taskruntag-records.md new file mode 100644 index 00000000000..0b068d3c3ac --- /dev/null +++ b/.server-changes/stop-creating-taskruntag-records.md @@ -0,0 +1,6 @@ +--- +area: webapp +type: improvement +--- + +Stop creating TaskRunTag records and _TaskRunToTaskRunTag join table entries during task triggering. The denormalized runTags string array on TaskRun already stores tag names, making the M2M relation redundant write overhead. diff --git a/apps/webapp/app/models/taskRunTag.server.ts b/apps/webapp/app/models/taskRunTag.server.ts index 812d1c86109..29bd43a7ea9 100644 --- a/apps/webapp/app/models/taskRunTag.server.ts +++ b/apps/webapp/app/models/taskRunTag.server.ts @@ -1,108 +1 @@ -import { Prisma } from "@trigger.dev/database"; -import { prisma } from "~/db.server"; -import { generateFriendlyId } from "~/v3/friendlyIdentifiers"; -import { PrismaClientOrTransaction } from "@trigger.dev/database"; - export const MAX_TAGS_PER_RUN = 10; -const MAX_RETRIES = 3; - -export async function createTag( - { tag, projectId }: { tag: string; projectId: string }, - prismaClient: PrismaClientOrTransaction = prisma -) { - if (tag.trim().length === 0) return; - - let attempts = 0; - const friendlyId = generateFriendlyId("runtag"); - - while (attempts < MAX_RETRIES) { - try { - return await prisma.taskRunTag.upsert({ - where: { - projectId_name: { - projectId, - name: tag, - }, - }, - create: { - friendlyId, - name: tag, - projectId, - }, - update: {}, - }); - } catch (error) { - if (error instanceof Prisma.PrismaClientKnownRequestError && error.code === "P2002") { - // Handle unique constraint violation (conflict) - attempts++; - if (attempts >= MAX_RETRIES) { - throw new Error(`Failed to create tag after ${MAX_RETRIES} attempts due to conflicts.`); - } - } else { - throw error; // Re-throw other errors - } - } - } -} - -export type TagRecord = { - id: string; - name: string; -}; - -export async function createTags( - { - tags, - projectId, - }: { - tags: string | string[] | undefined; - projectId: string; - }, - prismaClient: PrismaClientOrTransaction = prisma -): Promise { - if (!tags) { - return []; - } - - const tagsArray = typeof tags === "string" ? [tags] : tags; - - if (tagsArray.length === 0) { - return []; - } - - const tagRecords: TagRecord[] = []; - for (const tag of tagsArray) { - const tagRecord = await createTag( - { - tag, - projectId, - }, - prismaClient - ); - if (tagRecord) { - tagRecords.push({ id: tagRecord.id, name: tagRecord.name }); - } - } - - return tagRecords; -} - -export async function getTagsForRunId({ - friendlyId, - environmentId, -}: { - friendlyId: string; - environmentId: string; -}) { - const run = await prisma.taskRun.findFirst({ - where: { - friendlyId, - runtimeEnvironmentId: environmentId, - }, - select: { - tags: true, - }, - }); - - return run?.tags ?? undefined; -} diff --git a/apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts b/apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts index dc19457cdd1..ebac8e089f5 100644 --- a/apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts +++ b/apps/webapp/app/presenters/v3/ApiRetrieveRunPresenter.server.ts @@ -34,7 +34,6 @@ const commonRunSelect = { metadata: true, metadataType: true, ttl: true, - tags: true, costInCents: true, baseCostInCents: true, usageDurationMs: true, @@ -459,9 +458,7 @@ async function createCommonRunStructure(run: CommonRelatedRun, apiVersion: API_V durationMs: run.usageDurationMs, isTest: run.isTest, depth: run.depth, - tags: run.tags - .map((t: { name: string }) => t.name) - .sort((a: string, b: string) => a.localeCompare(b)), + tags: [...(run.runTags ?? [])].sort((a: string, b: string) => a.localeCompare(b)), ...ApiRetrieveRunPresenter.apiBooleanHelpersFromTaskRunStatus(run.status, apiVersion), triggerFunction: resolveTriggerFunction(run), batchId: run.batch?.friendlyId, diff --git a/apps/webapp/app/routes/api.v1.runs.$runId.tags.ts b/apps/webapp/app/routes/api.v1.runs.$runId.tags.ts index a02e3ecefa9..2a48ded529e 100644 --- a/apps/webapp/app/routes/api.v1.runs.$runId.tags.ts +++ b/apps/webapp/app/routes/api.v1.runs.$runId.tags.ts @@ -2,7 +2,7 @@ import { type ActionFunctionArgs, json } from "@remix-run/server-runtime"; import { AddTagsRequestBody } from "@trigger.dev/core/v3"; import { z } from "zod"; import { prisma } from "~/db.server"; -import { createTag, getTagsForRunId, MAX_TAGS_PER_RUN } from "~/models/taskRunTag.server"; +import { MAX_TAGS_PER_RUN } from "~/models/taskRunTag.server"; import { authenticateApiRequest } from "~/services/apiAuth.server"; const ParamsSchema = z.object({ @@ -37,17 +37,23 @@ export async function action({ request, params }: ActionFunctionArgs) { return json({ error: "Invalid request body", issues: body.error.issues }, { status: 400 }); } - const existingTags = - (await getTagsForRunId({ + const run = await prisma.taskRun.findFirst({ + where: { friendlyId: parsedParams.data.runId, - environmentId: authenticationResult.environment.id, - })) ?? []; + runtimeEnvironmentId: authenticationResult.environment.id, + }, + select: { + runTags: true, + }, + }); + + const existingTags = run?.runTags ?? []; //remove duplicate tags from the new tags const bodyTags = typeof body.data.tags === "string" ? [body.data.tags] : body.data.tags; const newTags = bodyTags.filter((tag) => { if (tag.trim().length === 0) return false; - return !existingTags.map((t) => t.name).includes(tag); + return !existingTags.includes(tag); }); if (existingTags.length + newTags.length > MAX_TAGS_PER_RUN) { @@ -65,29 +71,12 @@ export async function action({ request, params }: ActionFunctionArgs) { return json({ message: "No new tags to add" }, { status: 200 }); } - //create tags - let tagIds: string[] = existingTags.map((t) => t.id); - if (newTags.length > 0) { - for (const tag of newTags) { - const tagRecord = await createTag({ - tag, - projectId: authenticationResult.environment.projectId, - }); - if (tagRecord) { - tagIds.push(tagRecord.id); - } - } - } - await prisma.taskRun.update({ where: { friendlyId: parsedParams.data.runId, runtimeEnvironmentId: authenticationResult.environment.id, }, data: { - tags: { - connect: tagIds.map((id) => ({ id })), - }, runTags: { push: newTags, }, diff --git a/apps/webapp/app/routes/resources.runs.$runParam.ts b/apps/webapp/app/routes/resources.runs.$runParam.ts index 7b116b31c3c..c5e467533a3 100644 --- a/apps/webapp/app/routes/resources.runs.$runParam.ts +++ b/apps/webapp/app/routes/resources.runs.$runParam.ts @@ -23,11 +23,7 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => { taskIdentifier: true, friendlyId: true, isTest: true, - tags: { - select: { - name: true, - }, - }, + runTags: true, machinePreset: true, lockedToVersion: { select: { @@ -178,7 +174,7 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => { run: { id: run.friendlyId, createdAt: run.createdAt, - tags: run.tags.map((tag) => tag.name), + tags: run.runTags ?? [], isTest: run.isTest, idempotencyKey: run.idempotencyKey ?? undefined, startedAt: run.startedAt ?? run.createdAt, @@ -244,7 +240,7 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => { isCustomQueue: !run.queue.startsWith("task/"), concurrencyKey: run.concurrencyKey, }, - tags: run.tags.map((tag) => tag.name), + tags: run.runTags ?? [], baseCostInCents: run.baseCostInCents, costInCents: run.costInCents, totalCostInCents: run.costInCents + run.baseCostInCents, diff --git a/apps/webapp/app/runEngine/services/triggerTask.server.ts b/apps/webapp/app/runEngine/services/triggerTask.server.ts index c8a6dca2c93..55f169de517 100644 --- a/apps/webapp/app/runEngine/services/triggerTask.server.ts +++ b/apps/webapp/app/runEngine/services/triggerTask.server.ts @@ -20,7 +20,6 @@ import { stringifyDuration, } from "@trigger.dev/core/v3/isomorphic"; import type { PrismaClientOrTransaction } from "@trigger.dev/database"; -import { createTags } from "~/models/taskRunTag.server"; import type { AuthenticatedEnvironment } from "~/services/apiAuth.server"; import { logger } from "~/services/logger.server"; import { parseDelay } from "~/utils/delays"; @@ -287,14 +286,13 @@ export class RunEngineTriggerTaskService { ) : undefined; - //upsert tags - const tags = await createTags( - { - tags: body.options?.tags, - projectId: environment.projectId, - }, - this.prisma - ); + const tags = ( + body.options?.tags + ? typeof body.options.tags === "string" + ? [body.options.tags] + : body.options.tags + : [] + ).filter((tag) => tag.trim().length > 0); const depth = parentRun ? parentRun.depth + 1 : 0; diff --git a/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts b/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts index 8cc10fd5c08..0d6327f0640 100644 --- a/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts +++ b/apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts @@ -750,7 +750,6 @@ export class SharedQueueConsumer { take: 1, orderBy: { number: "desc" }, }, - tags: true, checkpoints: { take: 1, orderBy: { @@ -1648,7 +1647,7 @@ export const AttemptForExecutionGetPayload = { costInCents: true, baseCostInCents: true, maxDurationInSeconds: true, - tags: true, + runTags: true, taskEventStore: true, }, }, @@ -1725,7 +1724,7 @@ class SharedQueueTasks { context: taskRun.context, createdAt: taskRun.createdAt, startedAt: taskRun.startedAt ?? taskRun.createdAt, - tags: taskRun.tags.map((tag) => tag.name), + tags: taskRun.runTags ?? [], isTest: taskRun.isTest, idempotencyKey: taskRun.idempotencyKey ?? undefined, durationMs: taskRun.usageDurationMs, diff --git a/apps/webapp/app/v3/services/createTaskRunAttempt.server.ts b/apps/webapp/app/v3/services/createTaskRunAttempt.server.ts index df5e4e2b744..7e0b40dd826 100644 --- a/apps/webapp/app/v3/services/createTaskRunAttempt.server.ts +++ b/apps/webapp/app/v3/services/createTaskRunAttempt.server.ts @@ -52,7 +52,6 @@ export class CreateTaskRunAttemptService extends BaseService { runtimeEnvironmentId: environment.id, }, include: { - tags: true, attempts: { take: 1, orderBy: { @@ -209,7 +208,7 @@ export class CreateTaskRunAttemptService extends BaseService { payloadType: taskRun.payloadType, context: taskRun.context, createdAt: taskRun.createdAt, - tags: taskRun.tags.map((tag) => tag.name), + tags: taskRun.runTags ?? [], isTest: taskRun.isTest, idempotencyKey: taskRun.idempotencyKey ?? undefined, startedAt: taskRun.startedAt ?? taskRun.createdAt, diff --git a/apps/webapp/app/v3/services/triggerTaskV1.server.ts b/apps/webapp/app/v3/services/triggerTaskV1.server.ts index e9e1f291e2d..6a927765bed 100644 --- a/apps/webapp/app/v3/services/triggerTaskV1.server.ts +++ b/apps/webapp/app/v3/services/triggerTaskV1.server.ts @@ -13,7 +13,7 @@ import { import { Prisma } from "@trigger.dev/database"; import { z } from "zod"; import { env } from "~/env.server"; -import { createTag, MAX_TAGS_PER_RUN } from "~/models/taskRunTag.server"; +import { MAX_TAGS_PER_RUN } from "~/models/taskRunTag.server"; import { AuthenticatedEnvironment } from "~/services/apiAuth.server"; import { autoIncrementCounter } from "~/services/autoIncrementCounter.server"; import { logger } from "~/services/logger.server"; @@ -345,21 +345,8 @@ export class TriggerTaskServiceV1 extends BaseService { span.setAttribute("queueName", queueName); - //upsert tags - let tagIds: string[] = []; const bodyTags = typeof body.options?.tags === "string" ? [body.options.tags] : body.options?.tags; - if (bodyTags && bodyTags.length > 0) { - for (const tag of bodyTags) { - const tagRecord = await createTag({ - tag, - projectId: environment.projectId, - }); - if (tagRecord) { - tagIds.push(tagRecord.id); - } - } - } const depth = dependentAttempt ? dependentAttempt.taskRun.depth + 1 @@ -409,12 +396,6 @@ export class TriggerTaskServiceV1 extends BaseService { maxAttempts: body.options?.maxAttempts, taskEventStore: store, ttl, - tags: - tagIds.length === 0 - ? undefined - : { - connect: tagIds.map((id) => ({ id })), - }, parentTaskRunId: dependentAttempt?.taskRun.id ?? parentAttempt?.taskRun.id ?? diff --git a/internal-packages/run-engine/src/engine/index.ts b/internal-packages/run-engine/src/engine/index.ts index a1446d54b20..6757894fbbe 100644 --- a/internal-packages/run-engine/src/engine/index.ts +++ b/internal-packages/run-engine/src/engine/index.ts @@ -637,13 +637,7 @@ export class RunEngine { priorityMs, queueTimestamp: queueTimestamp ?? delayUntil ?? new Date(), ttl: resolvedTtl, - tags: - tags.length === 0 - ? undefined - : { - connect: tags, - }, - runTags: tags.length === 0 ? undefined : tags.map((tag) => tag.name), + runTags: tags.length === 0 ? undefined : tags, oneTimeUseToken, parentTaskRunId, rootTaskRunId, diff --git a/internal-packages/run-engine/src/engine/systems/debounceSystem.ts b/internal-packages/run-engine/src/engine/systems/debounceSystem.ts index 8cd06d07732..ef711a19577 100644 --- a/internal-packages/run-engine/src/engine/systems/debounceSystem.ts +++ b/internal-packages/run-engine/src/engine/systems/debounceSystem.ts @@ -32,7 +32,7 @@ export type DebounceOptions = { payloadType: string; metadata?: string; metadataType?: string; - tags?: { id: string; name: string }[]; + tags?: string[]; maxAttempts?: number; maxDurationInSeconds?: number; machine?: string; @@ -876,10 +876,7 @@ return 0 // Handle tags update - replace existing tags if (updateData.tags !== undefined) { - updatePayload.runTags = updateData.tags.map((t) => t.name); - updatePayload.tags = { - set: updateData.tags.map((t) => ({ id: t.id })), - }; + updatePayload.runTags = updateData.tags; } const updatedRun = await prisma.taskRun.update({ diff --git a/internal-packages/run-engine/src/engine/types.ts b/internal-packages/run-engine/src/engine/types.ts index 6ecb726e3af..cd90e0b8ac4 100644 --- a/internal-packages/run-engine/src/engine/types.ts +++ b/internal-packages/run-engine/src/engine/types.ts @@ -194,7 +194,7 @@ export type TriggerParams = { priorityMs?: number; queueTimestamp?: Date; ttl?: string; - tags: { id: string; name: string }[]; + tags: string[]; parentTaskRunId?: string; rootTaskRunId?: string; replayedFromTaskRunFriendlyId?: string;