Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .server-changes/stop-creating-taskruntag-records.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
area: webapp
type: improvement
---

Stop creating TaskRunTag records and _TaskRunToTaskRunTag join table entries during task triggering. The denormalized runTags string array on TaskRun already stores tag names, making the M2M relation redundant write overhead.
107 changes: 0 additions & 107 deletions apps/webapp/app/models/taskRunTag.server.ts
Comment thread
ericallam marked this conversation as resolved.
Original file line number Diff line number Diff line change
@@ -1,108 +1 @@
import { Prisma } from "@trigger.dev/database";
import { prisma } from "~/db.server";
import { generateFriendlyId } from "~/v3/friendlyIdentifiers";
import { PrismaClientOrTransaction } from "@trigger.dev/database";

export const MAX_TAGS_PER_RUN = 10;
const MAX_RETRIES = 3;

export async function createTag(
{ tag, projectId }: { tag: string; projectId: string },
prismaClient: PrismaClientOrTransaction = prisma
) {
if (tag.trim().length === 0) return;

let attempts = 0;
const friendlyId = generateFriendlyId("runtag");

while (attempts < MAX_RETRIES) {
try {
return await prisma.taskRunTag.upsert({
where: {
projectId_name: {
projectId,
name: tag,
},
},
create: {
friendlyId,
name: tag,
projectId,
},
update: {},
});
} catch (error) {
if (error instanceof Prisma.PrismaClientKnownRequestError && error.code === "P2002") {
// Handle unique constraint violation (conflict)
attempts++;
if (attempts >= MAX_RETRIES) {
throw new Error(`Failed to create tag after ${MAX_RETRIES} attempts due to conflicts.`);
}
} else {
throw error; // Re-throw other errors
}
}
}
}

export type TagRecord = {
id: string;
name: string;
};

export async function createTags(
{
tags,
projectId,
}: {
tags: string | string[] | undefined;
projectId: string;
},
prismaClient: PrismaClientOrTransaction = prisma
): Promise<TagRecord[]> {
if (!tags) {
return [];
}

const tagsArray = typeof tags === "string" ? [tags] : tags;

if (tagsArray.length === 0) {
return [];
}

const tagRecords: TagRecord[] = [];
for (const tag of tagsArray) {
const tagRecord = await createTag(
{
tag,
projectId,
},
prismaClient
);
if (tagRecord) {
tagRecords.push({ id: tagRecord.id, name: tagRecord.name });
}
}

return tagRecords;
}

export async function getTagsForRunId({
friendlyId,
environmentId,
}: {
friendlyId: string;
environmentId: string;
}) {
const run = await prisma.taskRun.findFirst({
where: {
friendlyId,
runtimeEnvironmentId: environmentId,
},
select: {
tags: true,
},
});

return run?.tags ?? undefined;
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@ const commonRunSelect = {
metadata: true,
metadataType: true,
ttl: true,
tags: true,
costInCents: true,
baseCostInCents: true,
usageDurationMs: true,
Expand Down Expand Up @@ -459,9 +458,7 @@ async function createCommonRunStructure(run: CommonRelatedRun, apiVersion: API_V
durationMs: run.usageDurationMs,
isTest: run.isTest,
depth: run.depth,
tags: run.tags
.map((t: { name: string }) => t.name)
.sort((a: string, b: string) => a.localeCompare(b)),
tags: [...(run.runTags ?? [])].sort((a: string, b: string) => a.localeCompare(b)),
...ApiRetrieveRunPresenter.apiBooleanHelpersFromTaskRunStatus(run.status, apiVersion),
triggerFunction: resolveTriggerFunction(run),
batchId: run.batch?.friendlyId,
Expand Down
35 changes: 12 additions & 23 deletions apps/webapp/app/routes/api.v1.runs.$runId.tags.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { type ActionFunctionArgs, json } from "@remix-run/server-runtime";
import { AddTagsRequestBody } from "@trigger.dev/core/v3";
import { z } from "zod";
import { prisma } from "~/db.server";
import { createTag, getTagsForRunId, MAX_TAGS_PER_RUN } from "~/models/taskRunTag.server";
import { MAX_TAGS_PER_RUN } from "~/models/taskRunTag.server";
import { authenticateApiRequest } from "~/services/apiAuth.server";

const ParamsSchema = z.object({
Expand Down Expand Up @@ -37,17 +37,23 @@ export async function action({ request, params }: ActionFunctionArgs) {
return json({ error: "Invalid request body", issues: body.error.issues }, { status: 400 });
}

const existingTags =
(await getTagsForRunId({
const run = await prisma.taskRun.findFirst({
where: {
friendlyId: parsedParams.data.runId,
environmentId: authenticationResult.environment.id,
})) ?? [];
runtimeEnvironmentId: authenticationResult.environment.id,
},
select: {
runTags: true,
},
});

const existingTags = run?.runTags ?? [];

//remove duplicate tags from the new tags
const bodyTags = typeof body.data.tags === "string" ? [body.data.tags] : body.data.tags;
const newTags = bodyTags.filter((tag) => {
if (tag.trim().length === 0) return false;
return !existingTags.map((t) => t.name).includes(tag);
return !existingTags.includes(tag);
});

if (existingTags.length + newTags.length > MAX_TAGS_PER_RUN) {
Expand All @@ -65,29 +71,12 @@ export async function action({ request, params }: ActionFunctionArgs) {
return json({ message: "No new tags to add" }, { status: 200 });
}

//create tags
let tagIds: string[] = existingTags.map((t) => t.id);
if (newTags.length > 0) {
for (const tag of newTags) {
const tagRecord = await createTag({
tag,
projectId: authenticationResult.environment.projectId,
});
if (tagRecord) {
tagIds.push(tagRecord.id);
}
}
}

await prisma.taskRun.update({
where: {
friendlyId: parsedParams.data.runId,
runtimeEnvironmentId: authenticationResult.environment.id,
},
data: {
tags: {
connect: tagIds.map((id) => ({ id })),
},
runTags: {
push: newTags,
},
Comment thread
ericallam marked this conversation as resolved.
Expand Down
10 changes: 3 additions & 7 deletions apps/webapp/app/routes/resources.runs.$runParam.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,7 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => {
taskIdentifier: true,
friendlyId: true,
isTest: true,
tags: {
select: {
name: true,
},
},
runTags: true,
machinePreset: true,
lockedToVersion: {
select: {
Expand Down Expand Up @@ -178,7 +174,7 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => {
run: {
id: run.friendlyId,
createdAt: run.createdAt,
tags: run.tags.map((tag) => tag.name),
tags: run.runTags ?? [],
isTest: run.isTest,
idempotencyKey: run.idempotencyKey ?? undefined,
startedAt: run.startedAt ?? run.createdAt,
Expand Down Expand Up @@ -244,7 +240,7 @@ export const loader = async ({ request, params }: LoaderFunctionArgs) => {
isCustomQueue: !run.queue.startsWith("task/"),
concurrencyKey: run.concurrencyKey,
},
tags: run.tags.map((tag) => tag.name),
tags: run.runTags ?? [],
baseCostInCents: run.baseCostInCents,
costInCents: run.costInCents,
totalCostInCents: run.costInCents + run.baseCostInCents,
Expand Down
16 changes: 7 additions & 9 deletions apps/webapp/app/runEngine/services/triggerTask.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import {
stringifyDuration,
} from "@trigger.dev/core/v3/isomorphic";
import type { PrismaClientOrTransaction } from "@trigger.dev/database";
import { createTags } from "~/models/taskRunTag.server";
import type { AuthenticatedEnvironment } from "~/services/apiAuth.server";
import { logger } from "~/services/logger.server";
import { parseDelay } from "~/utils/delays";
Expand Down Expand Up @@ -287,14 +286,13 @@ export class RunEngineTriggerTaskService {
)
: undefined;

//upsert tags
const tags = await createTags(
{
tags: body.options?.tags,
projectId: environment.projectId,
},
this.prisma
);
const tags = (
body.options?.tags
? typeof body.options.tags === "string"
? [body.options.tags]
: body.options.tags
: []
).filter((tag) => tag.trim().length > 0);

const depth = parentRun ? parentRun.depth + 1 : 0;

Expand Down
5 changes: 2 additions & 3 deletions apps/webapp/app/v3/marqs/sharedQueueConsumer.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -750,7 +750,6 @@ export class SharedQueueConsumer {
take: 1,
orderBy: { number: "desc" },
},
tags: true,
checkpoints: {
take: 1,
orderBy: {
Expand Down Expand Up @@ -1648,7 +1647,7 @@ export const AttemptForExecutionGetPayload = {
costInCents: true,
baseCostInCents: true,
maxDurationInSeconds: true,
tags: true,
runTags: true,
taskEventStore: true,
},
},
Expand Down Expand Up @@ -1725,7 +1724,7 @@ class SharedQueueTasks {
context: taskRun.context,
createdAt: taskRun.createdAt,
startedAt: taskRun.startedAt ?? taskRun.createdAt,
tags: taskRun.tags.map((tag) => tag.name),
tags: taskRun.runTags ?? [],
isTest: taskRun.isTest,
idempotencyKey: taskRun.idempotencyKey ?? undefined,
durationMs: taskRun.usageDurationMs,
Expand Down
3 changes: 1 addition & 2 deletions apps/webapp/app/v3/services/createTaskRunAttempt.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@ export class CreateTaskRunAttemptService extends BaseService {
runtimeEnvironmentId: environment.id,
},
include: {
tags: true,
attempts: {
take: 1,
orderBy: {
Expand Down Expand Up @@ -209,7 +208,7 @@ export class CreateTaskRunAttemptService extends BaseService {
payloadType: taskRun.payloadType,
context: taskRun.context,
createdAt: taskRun.createdAt,
tags: taskRun.tags.map((tag) => tag.name),
tags: taskRun.runTags ?? [],
isTest: taskRun.isTest,
idempotencyKey: taskRun.idempotencyKey ?? undefined,
startedAt: taskRun.startedAt ?? taskRun.createdAt,
Expand Down
21 changes: 1 addition & 20 deletions apps/webapp/app/v3/services/triggerTaskV1.server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import {
import { Prisma } from "@trigger.dev/database";
import { z } from "zod";
import { env } from "~/env.server";
import { createTag, MAX_TAGS_PER_RUN } from "~/models/taskRunTag.server";
import { MAX_TAGS_PER_RUN } from "~/models/taskRunTag.server";
import { AuthenticatedEnvironment } from "~/services/apiAuth.server";
import { autoIncrementCounter } from "~/services/autoIncrementCounter.server";
import { logger } from "~/services/logger.server";
Expand Down Expand Up @@ -345,21 +345,8 @@ export class TriggerTaskServiceV1 extends BaseService {

span.setAttribute("queueName", queueName);

//upsert tags
let tagIds: string[] = [];
const bodyTags =
typeof body.options?.tags === "string" ? [body.options.tags] : body.options?.tags;
if (bodyTags && bodyTags.length > 0) {
for (const tag of bodyTags) {
const tagRecord = await createTag({
tag,
projectId: environment.projectId,
});
if (tagRecord) {
tagIds.push(tagRecord.id);
}
}
}

const depth = dependentAttempt
? dependentAttempt.taskRun.depth + 1
Expand Down Expand Up @@ -409,12 +396,6 @@ export class TriggerTaskServiceV1 extends BaseService {
maxAttempts: body.options?.maxAttempts,
taskEventStore: store,
ttl,
tags:
tagIds.length === 0
? undefined
: {
connect: tagIds.map((id) => ({ id })),
},
parentTaskRunId:
dependentAttempt?.taskRun.id ??
parentAttempt?.taskRun.id ??
Expand Down
8 changes: 1 addition & 7 deletions internal-packages/run-engine/src/engine/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -637,13 +637,7 @@ export class RunEngine {
priorityMs,
queueTimestamp: queueTimestamp ?? delayUntil ?? new Date(),
ttl: resolvedTtl,
tags:
tags.length === 0
? undefined
: {
connect: tags,
},
runTags: tags.length === 0 ? undefined : tags.map((tag) => tag.name),
runTags: tags.length === 0 ? undefined : tags,
Comment thread
coderabbitai[bot] marked this conversation as resolved.
oneTimeUseToken,
parentTaskRunId,
rootTaskRunId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ export type DebounceOptions = {
payloadType: string;
metadata?: string;
metadataType?: string;
tags?: { id: string; name: string }[];
tags?: string[];
maxAttempts?: number;
maxDurationInSeconds?: number;
machine?: string;
Expand Down Expand Up @@ -876,10 +876,7 @@ return 0

// Handle tags update - replace existing tags
if (updateData.tags !== undefined) {
updatePayload.runTags = updateData.tags.map((t) => t.name);
updatePayload.tags = {
set: updateData.tags.map((t) => ({ id: t.id })),
};
updatePayload.runTags = updateData.tags;
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}

const updatedRun = await prisma.taskRun.update({
Expand Down
2 changes: 1 addition & 1 deletion internal-packages/run-engine/src/engine/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ export type TriggerParams = {
priorityMs?: number;
queueTimestamp?: Date;
ttl?: string;
tags: { id: string; name: string }[];
tags: string[];
parentTaskRunId?: string;
rootTaskRunId?: string;
replayedFromTaskRunFriendlyId?: string;
Expand Down
Loading