From 09143e88a6963a80fce4631c732fe761689e5e72 Mon Sep 17 00:00:00 2001 From: Kiran K Date: Mon, 1 Jun 2026 12:18:02 +0530 Subject: [PATCH 1/2] Add async discount processing with versioned cron jobs and soft delete --- .../admin/payouts/get-payouts-timeseries.ts | 2 +- .../cron/cleanup/orphaned-discounts/route.ts | 66 ++++++++ .../create/queue-batches/route.ts | 2 +- .../cron/discount-codes/delete/queue/route.ts | 64 +++++++ .../(ee)/api/cron/discounts/process/route.ts | 158 ++++++++++++++++++ .../webhooks/[webhookId]/page-client.tsx | 7 +- apps/web/app/providers.tsx | 6 +- .../lib/actions/partners/create-discount.ts | 29 +--- .../lib/actions/partners/delete-discount.ts | 65 +++---- .../lib/actions/partners/update-discount.ts | 6 +- .../web/lib/api/discounts/discount-version.ts | 33 ++++ .../discounts/queue-discount-processing.ts | 72 ++++++++ .../workspaces/billing-trial.spec.ts | 8 +- .../web/ui/partners/partner-link-selector.tsx | 3 +- packages/prisma/schema/discount.prisma | 4 +- 15 files changed, 450 insertions(+), 75 deletions(-) create mode 100644 apps/web/app/(ee)/api/cron/cleanup/orphaned-discounts/route.ts create mode 100644 apps/web/app/(ee)/api/cron/discount-codes/delete/queue/route.ts create mode 100644 apps/web/app/(ee)/api/cron/discounts/process/route.ts create mode 100644 apps/web/lib/api/discounts/discount-version.ts create mode 100644 apps/web/lib/api/discounts/queue-discount-processing.ts diff --git a/apps/web/app/(ee)/api/admin/payouts/get-payouts-timeseries.ts b/apps/web/app/(ee)/api/admin/payouts/get-payouts-timeseries.ts index 5256f4931e9..fa559dc71d2 100644 --- a/apps/web/app/(ee)/api/admin/payouts/get-payouts-timeseries.ts +++ b/apps/web/app/(ee)/api/admin/payouts/get-payouts-timeseries.ts @@ -1,5 +1,5 @@ -import { sqlGranularityMap } from "@/lib/planetscale/granularity"; import { conn } from "@/lib/planetscale/connection"; +import { sqlGranularityMap } from "@/lib/planetscale/granularity"; import { InvoiceStatus } from "@dub/prisma/client"; import { ACME_PROGRAM_ID } from "@dub/utils"; import { format } from "date-fns"; diff --git a/apps/web/app/(ee)/api/cron/cleanup/orphaned-discounts/route.ts b/apps/web/app/(ee)/api/cron/cleanup/orphaned-discounts/route.ts new file mode 100644 index 00000000000..16b1c40d78a --- /dev/null +++ b/apps/web/app/(ee)/api/cron/cleanup/orphaned-discounts/route.ts @@ -0,0 +1,66 @@ +import { withCron } from "@/lib/cron/with-cron"; +import { prisma } from "@dub/prisma"; +import { subMinutes } from "date-fns"; +import { logAndRespond } from "../../utils"; + +export const dynamic = "force-dynamic"; + +// The discounts/process cron normally hard-deletes once enrollments are cleared, but a newer +// discount change for the same group can bump the version and skip stale jobs +// (e.g. delete then create), leaving the old row behind. This job is a safety net for those orphans. + +// POST /api/cron/cleanup/orphaned-discounts +export const POST = withCron(async () => { + const discounts = await prisma.discount.findMany({ + where: { + programId: null, + updatedAt: { + lt: subMinutes(new Date(), 30), // only look for discounts older than 30 minutes ago + }, + }, + select: { + id: true, + _count: { + select: { + discountCodes: true, + programEnrollments: true, + }, + }, + }, + orderBy: { + updatedAt: "asc", + }, + take: 100, + }); + + if (discounts.length === 0) { + return logAndRespond("No orphaned discounts found."); + } + + const discountsToDelete = discounts.filter((discount) => { + return ( + discount._count.programEnrollments === 0 && + discount._count.discountCodes === 0 + ); + }); + + console.log( + `Found ${discountsToDelete.length} discounts to delete out of ${discounts.length} discounts (some of them are still referenced by program enrollments or discount codes).`, + ); + + if (discountsToDelete.length === 0) { + return logAndRespond("No discounts to delete, skipping..."); + } + + const deletedDiscounts = await prisma.discount.deleteMany({ + where: { + id: { + in: discountsToDelete.map((discount) => discount.id), + }, + }, + }); + + return logAndRespond( + `Finished deleting orphaned discounts (${deletedDiscounts.count} discounts deleted).`, + ); +}); diff --git a/apps/web/app/(ee)/api/cron/discount-codes/create/queue-batches/route.ts b/apps/web/app/(ee)/api/cron/discount-codes/create/queue-batches/route.ts index 97dc74dfdd0..66cf174a7d3 100644 --- a/apps/web/app/(ee)/api/cron/discount-codes/create/queue-batches/route.ts +++ b/apps/web/app/(ee)/api/cron/discount-codes/create/queue-batches/route.ts @@ -40,7 +40,7 @@ export const POST = withCron(async ({ rawBody }) => { }, }); - if (!discount) { + if (!discount || !discount.program) { return logAndRespond(`Discount ${discountId} not found. Skipping...`); } diff --git a/apps/web/app/(ee)/api/cron/discount-codes/delete/queue/route.ts b/apps/web/app/(ee)/api/cron/discount-codes/delete/queue/route.ts new file mode 100644 index 00000000000..1a712159b3b --- /dev/null +++ b/apps/web/app/(ee)/api/cron/discount-codes/delete/queue/route.ts @@ -0,0 +1,64 @@ +import { withCron } from "@/lib/cron/with-cron"; +import { deleteDiscountCodes } from "@/lib/discounts/delete-discount-code"; +import { prisma } from "@dub/prisma"; +import { DiscountProvider } from "@dub/prisma/client"; +import * as z from "zod/v4"; +import { logAndRespond } from "../../../utils"; + +export const dynamic = "force-dynamic"; + +export const maxDuration = 600; + +const inputSchema = z.object({ + discountId: z.string(), + provider: z.enum(DiscountProvider), +}); + +// POST /api/cron/discount-codes/delete/queue +export const POST = withCron(async ({ rawBody }) => { + const { discountId, provider } = inputSchema.parse(JSON.parse(rawBody)); + + let startingAfter: string | undefined; + + while (true) { + const discountCodes = await prisma.discountCode.findMany({ + where: { + discountId, + }, + select: { + id: true, + code: true, + programId: true, + }, + ...(startingAfter && { + skip: 1, + cursor: { + id: startingAfter, + }, + }), + orderBy: { + id: "asc", + }, + take: 500, + }); + + if (discountCodes.length === 0) { + break; + } + + startingAfter = discountCodes[discountCodes.length - 1].id; + + await deleteDiscountCodes( + discountCodes.map((discountCode) => ({ + ...discountCode, + discount: { + provider, + }, + })), + ); + } + + return logAndRespond( + `Finished queuing discount codes for discount ${discountId}.`, + ); +}); diff --git a/apps/web/app/(ee)/api/cron/discounts/process/route.ts b/apps/web/app/(ee)/api/cron/discounts/process/route.ts new file mode 100644 index 00000000000..7d0802362bb --- /dev/null +++ b/apps/web/app/(ee)/api/cron/discounts/process/route.ts @@ -0,0 +1,158 @@ +import { isStaleDiscountVersion } from "@/lib/api/discounts/discount-version"; +import { + discountJobSchema, + queueDiscountProcessing, +} from "@/lib/api/discounts/queue-discount-processing"; +import { withCron } from "@/lib/cron/with-cron"; +import { INACTIVE_ENROLLMENT_STATUSES } from "@/lib/zod/schemas/partners"; +import { prisma } from "@dub/prisma"; +import { Prisma } from "@dub/prisma/client"; +import { logAndRespond } from "../../utils"; + +export const dynamic = "force-dynamic"; + +// POST /api/cron/discounts/process +export const POST = withCron(async ({ rawBody }) => { + const input = discountJobSchema.parse(JSON.parse(rawBody)); + + const { + event, + groupId, + version, + batchNumber, + discountSnapshot, + startAfterProgramEnrollmentId, + } = input; + + const { id: discountId } = discountSnapshot; + + const discount = await prisma.discount.findUnique({ + where: { + id: discountId, + }, + select: { + id: true, + }, + }); + + if (!discount) { + return logAndRespond(`Discount ${discountId} not found. Skipping...`); + } + + const group = await prisma.partnerGroup.findUnique({ + where: { + id: groupId, + }, + select: { + id: true, + }, + }); + + if (!group) { + return logAndRespond(`Group ${groupId} not found. Skipping...`); + } + + const isStaleVersion = await isStaleDiscountVersion({ + version, + groupId, + }); + + if (isStaleVersion) { + return logAndRespond( + "Discount changed while processing. Skipping stale discount evaluation.", + ); + } + + let data: Prisma.ProgramEnrollmentUpdateManyArgs["data"] | undefined = + undefined; + + switch (event) { + case "discount-created": + data = { discountId: discount.id }; + break; + + case "discount-deleted": + data = { discountId: null }; + break; + } + + const programEnrollments = await prisma.programEnrollment.findMany({ + where: { + groupId: group.id, + status: { + notIn: INACTIVE_ENROLLMENT_STATUSES, + }, + ...(startAfterProgramEnrollmentId && { + id: { + gt: startAfterProgramEnrollmentId, + }, + }), + }, + select: { + id: true, + }, + orderBy: { + id: "asc", + }, + take: 300, + }); + + if (programEnrollments.length > 0) { + await prisma.programEnrollment.updateMany({ + where: { + id: { + in: programEnrollments.map(({ id }) => id), + }, + }, + data: { + ...data, + }, + }); + + const startingAfter = programEnrollments[programEnrollments.length - 1].id; + + await queueDiscountProcessing({ + ...input, + startAfterProgramEnrollmentId: startingAfter, + batchNumber: batchNumber + 1, + }); + + return logAndRespond( + `Enqueued next batch (${batchNumber + 1}) for discount ${discountId} for the group ${groupId}.`, + ); + } + + // No more program enrollments found, hard delete the discount + if (event === "discount-deleted") { + const discountCodes = await prisma.discountCode.count({ + where: { + discountId: discount.id, + }, + }); + + if (discountCodes > 0) { + return logAndRespond( + `Found ${discountCodes} discount codes for discount ${discountId}. Skipping hard delete...`, + ); + } + + try { + await prisma.discount.delete({ + where: { + id: discount.id, + }, + }); + } catch (error) { + // Treat already-deleted discount as success so retries can complete + if (!(error.code === "P2025")) { + throw new Error( + `Failed to hard delete discount ${discount.id}: ${error instanceof Error ? error.message : String(error)}`, + ); + } + } + } + + return logAndRespond( + `Finished processing discount ${discountId} for the group ${groupId}.`, + ); +}); diff --git a/apps/web/app/app.dub.co/(dashboard)/[slug]/(ee)/settings/webhooks/[webhookId]/page-client.tsx b/apps/web/app/app.dub.co/(dashboard)/[slug]/(ee)/settings/webhooks/[webhookId]/page-client.tsx index fe7a9f3c65c..e7bb7c51cf8 100644 --- a/apps/web/app/app.dub.co/(dashboard)/[slug]/(ee)/settings/webhooks/[webhookId]/page-client.tsx +++ b/apps/web/app/app.dub.co/(dashboard)/[slug]/(ee)/settings/webhooks/[webhookId]/page-client.tsx @@ -39,8 +39,7 @@ export default function WebhookLogsPageClient({ ); const [detailsSheetState, setDetailsSheetState] = useState< - | { open: false; eventId: string | null } - | { open: true; eventId: string } + { open: false; eventId: string | null } | { open: true; eventId: string } >({ open: false, eventId: null }); const currentEvent = useMemo( @@ -61,7 +60,9 @@ export default function WebhookLogsPageClient({ return [ currentIndex > 0 ? events[currentIndex - 1].event_id : null, - currentIndex < events.length - 1 ? events[currentIndex + 1].event_id : null, + currentIndex < events.length - 1 + ? events[currentIndex + 1].event_id + : null, ]; }, [events, detailsSheetState.eventId]); diff --git a/apps/web/app/providers.tsx b/apps/web/app/providers.tsx index 6f084b8ca46..fc4781a794b 100644 --- a/apps/web/app/providers.tsx +++ b/apps/web/app/providers.tsx @@ -10,7 +10,11 @@ export default function RootProviders({ children }: { children: ReactNode }) { - + {children} diff --git a/apps/web/lib/actions/partners/create-discount.ts b/apps/web/lib/actions/partners/create-discount.ts index 61e3000cd8d..289796a5019 100644 --- a/apps/web/lib/actions/partners/create-discount.ts +++ b/apps/web/lib/actions/partners/create-discount.ts @@ -2,6 +2,7 @@ import { recordAuditLog } from "@/lib/api/audit-logs/record-audit-log"; import { createId } from "@/lib/api/create-id"; +import { queueDiscountProcessing } from "@/lib/api/discounts/queue-discount-processing"; import { getGroupOrThrow } from "@/lib/api/groups/get-group-or-throw"; import { getDefaultProgramIdOrThrow } from "@/lib/api/programs/get-default-program-id-or-throw"; import { qstash } from "@/lib/cron"; @@ -97,29 +98,17 @@ export const createDiscountAction = authActionClient }, }); - await tx.programEnrollment.updateMany({ - where: { - groupId, - }, - data: { - discountId: discount.id, - }, - }); - - await tx.discountCode.updateMany({ - where: { - programEnrollment: { - groupId, - }, - }, - data: { - discountId: discount.id, - }, - }); - return discount; }); + await queueDiscountProcessing({ + event: "discount-created", + groupId, + discountSnapshot: { + id: discount.id, + }, + }); + waitUntil( Promise.allSettled([ qstash.publishJSON({ diff --git a/apps/web/lib/actions/partners/delete-discount.ts b/apps/web/lib/actions/partners/delete-discount.ts index b10864b38d1..e64583c435b 100644 --- a/apps/web/lib/actions/partners/delete-discount.ts +++ b/apps/web/lib/actions/partners/delete-discount.ts @@ -1,10 +1,10 @@ "use server"; import { recordAuditLog } from "@/lib/api/audit-logs/record-audit-log"; +import { queueDiscountProcessing } from "@/lib/api/discounts/queue-discount-processing"; import { getDiscountOrThrow } from "@/lib/api/partners/get-discount-or-throw"; import { getDefaultProgramIdOrThrow } from "@/lib/api/programs/get-default-program-id-or-throw"; import { qstash } from "@/lib/cron"; -import { deleteDiscountCodes } from "@/lib/discounts/delete-discount-code"; import { prisma } from "@dub/prisma"; import { APP_DOMAIN_WITH_NGROK } from "@dub/utils"; import { waitUntil } from "@vercel/functions"; @@ -35,58 +35,38 @@ export const deleteDiscountAction = authActionClient discountId, }); - // Cache discount codes to delete them later - const discountCodes = await prisma.discountCode.findMany({ - where: { - discountId: discount.id, - }, - select: { - id: true, - code: true, - programId: true, - discount: { - select: { - provider: true, - }, - }, - }, - }); - - const group = await prisma.$transaction(async (tx) => { - const group = await tx.partnerGroup.update({ + const partnerGroup = await prisma.$transaction(async (tx) => { + const partnerGroup = await tx.partnerGroup.update({ where: { discountId: discount.id, }, data: { discountId: null, }, - }); - - await tx.programEnrollment.updateMany({ - where: { - discountId: discount.id, - }, - data: { - discountId: null, + select: { + id: true, }, }); - await tx.discountCode.updateMany({ + // Soft delete discount, we will hard delete it in the cron job + await tx.discount.update({ where: { - discountId: discount.id, + id: discount.id, }, data: { - discountId: null, + programId: null, }, }); - await tx.discount.delete({ - where: { - id: discount.id, - }, - }); + return partnerGroup; + }); - return group; + await queueDiscountProcessing({ + event: "discount-deleted", + groupId: partnerGroup.id, + discountSnapshot: { + id: discount.id, + }, }); waitUntil( @@ -94,11 +74,18 @@ export const deleteDiscountAction = authActionClient qstash.publishJSON({ url: `${APP_DOMAIN_WITH_NGROK}/api/cron/links/invalidate-for-discounts`, body: { - groupId: group.id, + groupId: partnerGroup.id, }, }), - deleteDiscountCodes(discountCodes), + qstash.publishJSON({ + url: `${APP_DOMAIN_WITH_NGROK}/api/cron/discount-codes/delete/queue`, + method: "POST", + body: { + discountId: discount.id, + provider: discount.provider, + }, + }), recordAuditLog({ workspaceId: workspace.id, diff --git a/apps/web/lib/actions/partners/update-discount.ts b/apps/web/lib/actions/partners/update-discount.ts index c9803c69fd8..a22e7f82106 100644 --- a/apps/web/lib/actions/partners/update-discount.ts +++ b/apps/web/lib/actions/partners/update-discount.ts @@ -68,9 +68,9 @@ export const updateDiscountAction = authActionClient // we only cache default group pages for now so we need to invalidate them ...(partnerGroup?.slug === DEFAULT_PARTNER_GROUP.slug ? [ - revalidatePath(`/partners.dub.co/${program.slug}`), - revalidatePath(`/partners.dub.co/${program.slug}/apply`), - program.addedToMarketplaceAt && + revalidatePath(`/partners.dub.co/${program?.slug}`), + revalidatePath(`/partners.dub.co/${program?.slug}/apply`), + program?.addedToMarketplaceAt && revalidatePath( `/partners.dub.co/programs/marketplace/${program.slug}`, ), diff --git a/apps/web/lib/api/discounts/discount-version.ts b/apps/web/lib/api/discounts/discount-version.ts new file mode 100644 index 00000000000..5044b0bd039 --- /dev/null +++ b/apps/web/lib/api/discounts/discount-version.ts @@ -0,0 +1,33 @@ +import { redis } from "@/lib/upstash"; + +const DISCOUNT_VERSION_TTL_SECONDS = 24 * 60 * 60; // 24 hours + +export function getDiscountVersionKey({ groupId }: { groupId: string }) { + return `discount-version:${groupId}`; +} + +export async function incrementDiscountVersion({ + groupId, +}: { + groupId: string; +}) { + const key = getDiscountVersionKey({ groupId }); + + const version = await redis.incr(key); + await redis.expire(key, DISCOUNT_VERSION_TTL_SECONDS); + return version; +} + +export async function isStaleDiscountVersion({ + version, + groupId, +}: { + version: number; + groupId: string; +}) { + const key = getDiscountVersionKey({ groupId }); + + const currentVersion = await redis.get(key); + + return currentVersion != null && version < Number(currentVersion); +} diff --git a/apps/web/lib/api/discounts/queue-discount-processing.ts b/apps/web/lib/api/discounts/queue-discount-processing.ts new file mode 100644 index 00000000000..d38beffe9ad --- /dev/null +++ b/apps/web/lib/api/discounts/queue-discount-processing.ts @@ -0,0 +1,72 @@ +import { logger } from "@/lib/axiom/server"; +import { qstash } from "@/lib/cron"; +import { APP_DOMAIN_WITH_NGROK } from "@dub/utils"; +import * as z from "zod/v4"; +import { incrementDiscountVersion } from "./discount-version"; + +export const discountJobSchema = z.object({ + event: z.enum(["discount-created", "discount-deleted"]), + groupId: z.string(), + version: z + .number() + .optional() + .default(1) + .describe( + "Incremented by 1 for each new discount change (create/delete) in a group.", + ), + batchNumber: z.number().optional().default(1), + startAfterProgramEnrollmentId: z.string().nullish(), + discountSnapshot: z.object({ + id: z.string(), + }), +}); + +export type DiscountJob = z.input; + +export async function queueDiscountProcessing(params: DiscountJob) { + try { + // If version is provided (recursive cron job), use it, otherwise increment the version + const version = + params.version !== undefined + ? params.version + : await incrementDiscountVersion({ + groupId: params.groupId, + }); + + const response = await qstash.publishJSON({ + url: `${APP_DOMAIN_WITH_NGROK}/api/cron/discounts/process`, + method: "POST", + body: { + ...params, + version, + }, + }); + + if (!response?.messageId) { + throw new Error( + "We couldn't start discount processing right now. Please try again in a few moments.", + ); + } + + return response; + } catch (error) { + logger.error("publishJSON.failed", { + service: "qstash", + event: "publishJSON.failed", + url: `/api/cron/discounts/process`, + errorName: error instanceof Error ? error.name : undefined, + errorStack: error instanceof Error ? error.stack : undefined, + correlation: { + event: params.event, + groupId: params.groupId, + discountId: params.discountSnapshot.id, + }, + }); + + await logger.flush(); + + throw new Error( + "We couldn't start discount processing right now. Please try again in a few moments.", + ); + } +} diff --git a/apps/web/playwright/workspaces/billing-trial.spec.ts b/apps/web/playwright/workspaces/billing-trial.spec.ts index d676405ac0a..cdb689e0fe2 100644 --- a/apps/web/playwright/workspaces/billing-trial.spec.ts +++ b/apps/web/playwright/workspaces/billing-trial.spec.ts @@ -122,7 +122,9 @@ test.describe("Free trial user navigation", () => { } }); - test("billing settings page shows trial banner and CTAs", async ({ page }) => { + test("billing settings page shows trial banner and CTAs", async ({ + page, + }) => { await page.goto(`/${slug}/settings/billing`); await expect(page.getByText(/Trial ends on/)).toBeVisible({ @@ -184,9 +186,7 @@ test.describe("Free trial user navigation", () => { "You'll be charged today and your trial will end.", ), ).toBeVisible(); - await confirmModal - .getByRole("button", { name: "Start paid plan" }) - .click(); + await confirmModal.getByRole("button", { name: "Start paid plan" }).click(); await page.waitForURL((u) => { const url = new URL(u); diff --git a/apps/web/ui/partners/partner-link-selector.tsx b/apps/web/ui/partners/partner-link-selector.tsx index 63cca71a6d2..58a776a55bd 100644 --- a/apps/web/ui/partners/partner-link-selector.tsx +++ b/apps/web/ui/partners/partner-link-selector.tsx @@ -86,7 +86,8 @@ export function PartnerLinkSelector({ }, [selectedLink]); const showLoadingPlaceholder = - (selectedLinkId && !selectedLink) || (!selectedLinkId && isValidating && !links); + (selectedLinkId && !selectedLink) || + (!selectedLinkId && isValidating && !links); return ( <> diff --git a/packages/prisma/schema/discount.prisma b/packages/prisma/schema/discount.prisma index 7704fc60c67..a07defbff69 100644 --- a/packages/prisma/schema/discount.prisma +++ b/packages/prisma/schema/discount.prisma @@ -5,7 +5,7 @@ enum DiscountProvider { model Discount { id String @id @default(cuid()) - programId String + programId String? amount Int @default(0) type RewardStructure @default(percentage) maxDuration Int? // in months (0 -> one-time purchase, 1, 3, 6, 12, 24 -> months, null -> lifetime) @@ -18,7 +18,7 @@ model Discount { updatedAt DateTime @updatedAt programEnrollments ProgramEnrollment[] - program Program @relation("ProgramDiscounts", fields: [programId], references: [id]) + program Program? @relation("ProgramDiscounts", fields: [programId], references: [id]) partnerGroup PartnerGroup? discountCodes DiscountCode[] From fa7eb3ad2a79d99f753516fd812ade0c404c8067 Mon Sep 17 00:00:00 2001 From: Kiran K Date: Mon, 1 Jun 2026 12:27:16 +0530 Subject: [PATCH 2/2] Fix build --- apps/web/lib/discounts/create-discount-code.ts | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/apps/web/lib/discounts/create-discount-code.ts b/apps/web/lib/discounts/create-discount-code.ts index fbc55590275..b9f32ddbbdc 100644 --- a/apps/web/lib/discounts/create-discount-code.ts +++ b/apps/web/lib/discounts/create-discount-code.ts @@ -19,6 +19,10 @@ export async function createDiscountCode({ discount, code, }: CreateDiscountCodeArgs) { + if (!discount.programId) { + throw new Error("Discount does not have a program ID."); + } + let finalCode = code; // Construct the discount code if no code is provided