Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { sqlGranularityMap } from "@/lib/planetscale/granularity";
import { conn } from "@/lib/planetscale/connection";
import { sqlGranularityMap } from "@/lib/planetscale/granularity";
import { InvoiceStatus } from "@dub/prisma/client";
import { ACME_PROGRAM_ID } from "@dub/utils";
import { format } from "date-fns";
Expand Down
66 changes: 66 additions & 0 deletions apps/web/app/(ee)/api/cron/cleanup/orphaned-discounts/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import { withCron } from "@/lib/cron/with-cron";
import { prisma } from "@dub/prisma";
import { subMinutes } from "date-fns";
import { logAndRespond } from "../../utils";

export const dynamic = "force-dynamic";

// The discounts/process cron normally hard-deletes once enrollments are cleared, but a newer
// discount change for the same group can bump the version and skip stale jobs
// (e.g. delete then create), leaving the old row behind. This job is a safety net for those orphans.

// POST /api/cron/cleanup/orphaned-discounts
export const POST = withCron(async () => {
const discounts = await prisma.discount.findMany({
where: {
programId: null,
updatedAt: {
lt: subMinutes(new Date(), 30), // only look for discounts older than 30 minutes ago
},
},
select: {
id: true,
_count: {
select: {
discountCodes: true,
programEnrollments: true,
},
},
},
orderBy: {
updatedAt: "asc",
},
take: 100,
});

if (discounts.length === 0) {
return logAndRespond("No orphaned discounts found.");
}

const discountsToDelete = discounts.filter((discount) => {
return (
discount._count.programEnrollments === 0 &&
discount._count.discountCodes === 0
);
});

console.log(
`Found ${discountsToDelete.length} discounts to delete out of ${discounts.length} discounts (some of them are still referenced by program enrollments or discount codes).`,
);

if (discountsToDelete.length === 0) {
return logAndRespond("No discounts to delete, skipping...");
}

const deletedDiscounts = await prisma.discount.deleteMany({
where: {
id: {
in: discountsToDelete.map((discount) => discount.id),
},
},
});

return logAndRespond(
`Finished deleting orphaned discounts (${deletedDiscounts.count} discounts deleted).`,
);
});
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ export const POST = withCron(async ({ rawBody }) => {
},
});

if (!discount) {
if (!discount || !discount.program) {
return logAndRespond(`Discount ${discountId} not found. Skipping...`);
}

Expand Down
64 changes: 64 additions & 0 deletions apps/web/app/(ee)/api/cron/discount-codes/delete/queue/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
import { withCron } from "@/lib/cron/with-cron";
import { deleteDiscountCodes } from "@/lib/discounts/delete-discount-code";
import { prisma } from "@dub/prisma";
import { DiscountProvider } from "@dub/prisma/client";
import * as z from "zod/v4";
import { logAndRespond } from "../../../utils";

export const dynamic = "force-dynamic";

export const maxDuration = 600;

const inputSchema = z.object({
discountId: z.string(),
provider: z.enum(DiscountProvider),
});

// POST /api/cron/discount-codes/delete/queue
export const POST = withCron(async ({ rawBody }) => {
const { discountId, provider } = inputSchema.parse(JSON.parse(rawBody));

let startingAfter: string | undefined;

while (true) {
const discountCodes = await prisma.discountCode.findMany({
where: {
discountId,
},
select: {
id: true,
code: true,
programId: true,
},
...(startingAfter && {
skip: 1,
cursor: {
id: startingAfter,
},
}),
orderBy: {
id: "asc",
},
take: 500,
});

if (discountCodes.length === 0) {
break;
}

startingAfter = discountCodes[discountCodes.length - 1].id;

await deleteDiscountCodes(
discountCodes.map((discountCode) => ({
...discountCode,
discount: {
provider,
},
})),
);
}

return logAndRespond(
`Finished queuing discount codes for discount ${discountId}.`,
);
});
158 changes: 158 additions & 0 deletions apps/web/app/(ee)/api/cron/discounts/process/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
import { isStaleDiscountVersion } from "@/lib/api/discounts/discount-version";
import {
discountJobSchema,
queueDiscountProcessing,
} from "@/lib/api/discounts/queue-discount-processing";
import { withCron } from "@/lib/cron/with-cron";
import { INACTIVE_ENROLLMENT_STATUSES } from "@/lib/zod/schemas/partners";
import { prisma } from "@dub/prisma";
import { Prisma } from "@dub/prisma/client";
import { logAndRespond } from "../../utils";

export const dynamic = "force-dynamic";

// POST /api/cron/discounts/process
export const POST = withCron(async ({ rawBody }) => {
const input = discountJobSchema.parse(JSON.parse(rawBody));

const {
event,
groupId,
version,
batchNumber,
discountSnapshot,
startAfterProgramEnrollmentId,
} = input;

const { id: discountId } = discountSnapshot;

const discount = await prisma.discount.findUnique({
where: {
id: discountId,
},
select: {
id: true,
},
});

if (!discount) {
return logAndRespond(`Discount ${discountId} not found. Skipping...`);
}

const group = await prisma.partnerGroup.findUnique({
where: {
id: groupId,
},
select: {
id: true,
},
});

if (!group) {
return logAndRespond(`Group ${groupId} not found. Skipping...`);
}

const isStaleVersion = await isStaleDiscountVersion({
version,
groupId,
});

if (isStaleVersion) {
return logAndRespond(
"Discount changed while processing. Skipping stale discount evaluation.",
);
}

let data: Prisma.ProgramEnrollmentUpdateManyArgs["data"] | undefined =
undefined;

switch (event) {
case "discount-created":
data = { discountId: discount.id };
break;

case "discount-deleted":
data = { discountId: null };
break;
}

const programEnrollments = await prisma.programEnrollment.findMany({
where: {
groupId: group.id,
status: {
notIn: INACTIVE_ENROLLMENT_STATUSES,
},
...(startAfterProgramEnrollmentId && {
id: {
gt: startAfterProgramEnrollmentId,
},
}),
},
select: {
id: true,
},
orderBy: {
id: "asc",
},
take: 300,
});

if (programEnrollments.length > 0) {
await prisma.programEnrollment.updateMany({
where: {
id: {
in: programEnrollments.map(({ id }) => id),
},
},
data: {
...data,
},
});

const startingAfter = programEnrollments[programEnrollments.length - 1].id;

await queueDiscountProcessing({
...input,
startAfterProgramEnrollmentId: startingAfter,
batchNumber: batchNumber + 1,
});

return logAndRespond(
`Enqueued next batch (${batchNumber + 1}) for discount ${discountId} for the group ${groupId}.`,
);
}

// No more program enrollments found, hard delete the discount
if (event === "discount-deleted") {
const discountCodes = await prisma.discountCode.count({
where: {
discountId: discount.id,
},
});

if (discountCodes > 0) {
return logAndRespond(
`Found ${discountCodes} discount codes for discount ${discountId}. Skipping hard delete...`,
);
}

try {
await prisma.discount.delete({
where: {
id: discount.id,
},
});
} catch (error) {
// Treat already-deleted discount as success so retries can complete
if (!(error.code === "P2025")) {
throw new Error(
`Failed to hard delete discount ${discount.id}: ${error instanceof Error ? error.message : String(error)}`,
);
}
}
Comment thread
devkiran marked this conversation as resolved.
}

return logAndRespond(
`Finished processing discount ${discountId} for the group ${groupId}.`,
);
});
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@ export default function WebhookLogsPageClient({
);

const [detailsSheetState, setDetailsSheetState] = useState<
| { open: false; eventId: string | null }
| { open: true; eventId: string }
{ open: false; eventId: string | null } | { open: true; eventId: string }
>({ open: false, eventId: null });

const currentEvent = useMemo(
Expand All @@ -61,7 +60,9 @@ export default function WebhookLogsPageClient({

return [
currentIndex > 0 ? events[currentIndex - 1].event_id : null,
currentIndex < events.length - 1 ? events[currentIndex + 1].event_id : null,
currentIndex < events.length - 1
? events[currentIndex + 1].event_id
: null,
];
}, [events, detailsSheetState.eventId]);

Expand Down
6 changes: 5 additions & 1 deletion apps/web/app/providers.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@ export default function RootProviders({ children }: { children: ReactNode }) {
<TooltipProvider>
<PlausibleProvider enabled>
<KeyboardShortcutProvider>
<Toaster className="pointer-events-auto" closeButton duration={3000} />
<Toaster
className="pointer-events-auto"
closeButton
duration={3000}
/>
{children}
</KeyboardShortcutProvider>
</PlausibleProvider>
Expand Down
29 changes: 9 additions & 20 deletions apps/web/lib/actions/partners/create-discount.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import { recordAuditLog } from "@/lib/api/audit-logs/record-audit-log";
import { createId } from "@/lib/api/create-id";
import { queueDiscountProcessing } from "@/lib/api/discounts/queue-discount-processing";
import { getGroupOrThrow } from "@/lib/api/groups/get-group-or-throw";
import { getDefaultProgramIdOrThrow } from "@/lib/api/programs/get-default-program-id-or-throw";
import { qstash } from "@/lib/cron";
Expand Down Expand Up @@ -97,29 +98,17 @@ export const createDiscountAction = authActionClient
},
});

await tx.programEnrollment.updateMany({
where: {
groupId,
},
data: {
discountId: discount.id,
},
});

await tx.discountCode.updateMany({
where: {
programEnrollment: {
groupId,
},
},
data: {
discountId: discount.id,
},
});

return discount;
});

await queueDiscountProcessing({
event: "discount-created",
groupId,
discountSnapshot: {
id: discount.id,
},
});

waitUntil(
Promise.allSettled([
qstash.publishJSON({
Expand Down
Loading
Loading