Skip to content
66 changes: 65 additions & 1 deletion apps/sim/background/cleanup-soft-deletes.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ import { beforeEach, describe, expect, it, vi } from 'vitest'

const {
mockBatchDeleteByWorkspaceAndTimestamp,
mockDeleteDefinition,
mockDeleteFileMetadata,
mockDeleteFiles,
mockDeleteRowsById,
mockDrainRowsByColumn,
mockIsUsingCloudStorage,
mockLimit,
mockOrderBy,
Expand All @@ -26,12 +28,15 @@ const {
leftJoin: vi.fn(() => ({ where: mockWhere })),
}))
const mockSelect = vi.fn(() => ({ from: mockFrom }))
const mockDeleteDefinition = vi.fn(async () => undefined)

return {
mockBatchDeleteByWorkspaceAndTimestamp: vi.fn(async () => ({ deleted: 0, failed: 0 })),
mockDeleteDefinition,
mockDeleteFileMetadata: vi.fn(async () => true),
mockDeleteFiles: vi.fn(async () => ({ deleted: 0, failed: [] as Array<{ key: string }> })),
mockDeleteRowsById: vi.fn(async () => ({ deleted: 0, failed: 0 })),
mockDrainRowsByColumn: vi.fn(async () => ({ deleted: 0, fullyDrained: false })),
mockIsUsingCloudStorage: vi.fn(() => true),
mockLimit,
mockOrderBy,
Expand All @@ -43,7 +48,9 @@ const {
}
})

vi.mock('@sim/db', () => ({ db: { select: mockSelect } }))
vi.mock('@sim/db', () => ({
db: { select: mockSelect, delete: vi.fn(() => ({ where: mockDeleteDefinition })) },
}))

vi.mock('@sim/db/schema', () => {
const table = (cols: string[]) =>
Expand All @@ -58,6 +65,7 @@ vi.mock('@sim/db/schema', () => {
mcpServers: table(softCols),
memory: table(softCols),
userTableDefinitions: table(softCols),
userTableRows: table(['id', 'tableId', 'workspaceId']),
workflow: table(softCols),
workflowFolder: table(softCols),
workflowMcpServer: table(softCols),
Expand Down Expand Up @@ -93,6 +101,7 @@ vi.mock('@/lib/cleanup/batch-delete', () => ({
return chunks
},
deleteRowsById: mockDeleteRowsById,
drainRowsByColumn: mockDrainRowsByColumn,
selectRowsByIdChunks: mockSelectRowsByIdChunks,
}))

Expand Down Expand Up @@ -162,3 +171,58 @@ describe('cleanup soft deletes — orphan KB binding sweep', () => {
expect(mockDeleteFileMetadata).not.toHaveBeenCalled()
})
})

describe('cleanup soft deletes — archived user tables', () => {
beforeEach(() => {
vi.clearAllMocks()
mockIsUsingCloudStorage.mockReturnValue(true)
mockLimit.mockResolvedValue([])
// selectRowsByIdChunks call order: workflows, file legacy, file multi, doomed
// tables. Each test queues the doomed-tables result (4th call) itself.
mockSelectRowsByIdChunks
.mockResolvedValueOnce([])
.mockResolvedValueOnce([])
.mockResolvedValueOnce([])
})

it('drains rows before deleting the table definition', async () => {
mockSelectRowsByIdChunks.mockResolvedValueOnce([{ id: 'tbl-1' }])
mockDrainRowsByColumn.mockResolvedValue({ deleted: 5, fullyDrained: true })

await runCleanupSoftDeletes(basePayload)

expect(mockDrainRowsByColumn).toHaveBeenCalledWith(
expect.objectContaining({ matchValue: 'tbl-1' })
)
expect(mockDeleteDefinition).toHaveBeenCalledTimes(1)
// Rows are drained first, then the definition is deleted.
expect(mockDrainRowsByColumn.mock.invocationCallOrder[0]).toBeLessThan(
mockDeleteDefinition.mock.invocationCallOrder[0]
)
})

it('defers the definition delete when the budget is exhausted', async () => {
mockSelectRowsByIdChunks.mockResolvedValueOnce([{ id: 'tbl-1' }])
// Budget stop consumes the whole budget — deleted equals the per-run cap.
mockDrainRowsByColumn.mockResolvedValue({ deleted: 1_000_000, fullyDrained: false })

await runCleanupSoftDeletes(basePayload)

expect(mockDrainRowsByColumn).toHaveBeenCalledTimes(1)
expect(mockDeleteDefinition).not.toHaveBeenCalled()
})

it('skips a table that errors mid-drain but keeps cleaning the rest', async () => {
mockSelectRowsByIdChunks.mockResolvedValueOnce([{ id: 'tbl-err' }, { id: 'tbl-ok' }])
// First table errors mid-drain (budget remains); second drains fully.
mockDrainRowsByColumn
.mockResolvedValueOnce({ deleted: 2, fullyDrained: false })
.mockResolvedValueOnce({ deleted: 5, fullyDrained: true })

await runCleanupSoftDeletes(basePayload)

expect(mockDrainRowsByColumn).toHaveBeenCalledTimes(2)
// Only the fully-drained table's definition is deleted.
expect(mockDeleteDefinition).toHaveBeenCalledTimes(1)
})
})
83 changes: 77 additions & 6 deletions apps/sim/background/cleanup-soft-deletes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
mcpServers,
memory,
userTableDefinitions,
userTableRows,
workflow,
workflowFolder,
workflowMcpServer,
Expand All @@ -21,6 +22,7 @@ import {
batchDeleteByWorkspaceAndTimestamp,
chunkArray,
deleteRowsById,
drainRowsByColumn,
selectRowsByIdChunks,
} from '@/lib/cleanup/batch-delete'
import { prepareChatCleanup } from '@/lib/cleanup/chat-cleanup'
Expand Down Expand Up @@ -148,12 +150,6 @@ const CLEANUP_TARGETS = [
wsCol: knowledgeBase.workspaceId,
name: 'knowledgeBase',
},
{
table: userTableDefinitions,
softDeleteCol: userTableDefinitions.archivedAt,
wsCol: userTableDefinitions.workspaceId,
name: 'userTableDefinitions',
},
{ table: memory, softDeleteCol: memory.deletedAt, wsCol: memory.workspaceId, name: 'memory' },
{
table: mcpServers,
Expand Down Expand Up @@ -256,6 +252,79 @@ async function cleanupOrphanedKnowledgeBaseBindings(
return stats
}

const TABLE_ROW_DRAIN_BATCH_SIZE = 5_000
/**
* Per-run cap on user-table rows drained before deleting their definitions.
* Each batch is its own transaction, so this only bounds total job duration; a
* table larger than this drains across several cron runs. A definition is only
* deleted once its rows are fully drained, so its ON DELETE CASCADE never fires
* on a large set.
*/
const TABLE_ROW_DRAIN_TOTAL_LIMIT = 1_000_000

/**
* Hard-delete archived user tables. Rows are drained in bounded batches first
* (each batch cascades its `table_row_executions`), then the definition is
* deleted — turning what would be a single multi-million-row cascade into many
* small transactions. Tables whose rows can't be fully drained within this
* run's budget keep their archived definition for the next run.
*/
async function cleanupArchivedUserTables(
workspaceIds: string[],
retentionDate: Date,
label: string
): Promise<number> {
const doomedTables = await selectRowsByIdChunks(workspaceIds, (chunkIds, chunkLimit) =>
db
.select({ id: userTableDefinitions.id })
.from(userTableDefinitions)
.where(
and(
inArray(userTableDefinitions.workspaceId, chunkIds),
isNotNull(userTableDefinitions.archivedAt),
lt(userTableDefinitions.archivedAt, retentionDate)
)
)
.limit(chunkLimit)
)

if (doomedTables.length === 0) return 0

let rowBudget = TABLE_ROW_DRAIN_TOTAL_LIMIT
let definitionsDeleted = 0

for (const { id: tableId } of doomedTables) {
if (rowBudget <= 0) break

const drain = await drainRowsByColumn({
tableDef: userTableRows,
idCol: userTableRows.id,
matchCol: userTableRows.tableId,
matchValue: tableId,
tableName: `${label}/userTableRows`,
batchSize: TABLE_ROW_DRAIN_BATCH_SIZE,
rowBudget,
})
rowBudget -= drain.deleted

if (drain.fullyDrained) {
await db.delete(userTableDefinitions).where(eq(userTableDefinitions.id, tableId))
definitionsDeleted++
Comment thread
cursor[bot] marked this conversation as resolved.
Outdated
continue
}

// Not fully drained: a budget stop consumes the whole budget, so a positive
// remainder means this one table errored mid-drain. Leave its definition for
// a later run and keep cleaning the rest; only stop once the budget is spent.
if (rowBudget <= 0) break
}

logger.info(
`[${label}/userTableDefinitions] Deleted ${definitionsDeleted}/${doomedTables.length} archived tables (row budget left: ${rowBudget})`
)
return definitionsDeleted
}

export async function runCleanupSoftDeletes(payload: CleanupJobPayload): Promise<void> {
const startTime = Date.now()
const { workspaceIds, retentionHours, label } = payload
Expand Down Expand Up @@ -350,6 +419,8 @@ export async function runCleanupSoftDeletes(payload: CleanupJobPayload): Promise
totalDeleted += result.deleted
}

totalDeleted += await cleanupArchivedUserTables(workspaceIds, retentionDate, label)

const orphanBindingStats = await cleanupOrphanedKnowledgeBaseBindings(workspaceIds, label)

logger.info(
Expand Down
69 changes: 69 additions & 0 deletions apps/sim/lib/cleanup/batch-delete.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/**
* @vitest-environment node
*/

import { dbChainMock, dbChainMockFns } from '@sim/testing'
import { beforeEach, describe, expect, it, vi } from 'vitest'

vi.mock('@sim/db', () => dbChainMock)

import { drainRowsByColumn } from '@/lib/cleanup/batch-delete'

const baseOpts = {
tableDef: {} as never,
idCol: 'col.id' as never,
matchCol: 'col.tableId' as never,
matchValue: 'tbl-1',
tableName: 'test/userTableRows',
}

function returnRows(count: number) {
return Array.from({ length: count }, (_, i) => ({ id: `row-${i}` }))
}

describe('drainRowsByColumn', () => {
beforeEach(() => {
vi.clearAllMocks()
})

it('drains in batches until a short batch and reports the set fully drained', async () => {
dbChainMockFns.returning
.mockResolvedValueOnce(returnRows(2))
.mockResolvedValueOnce(returnRows(1))

const result = await drainRowsByColumn({ ...baseOpts, batchSize: 2, rowBudget: 10 })

expect(result).toEqual({ deleted: 3, fullyDrained: true })
expect(dbChainMockFns.returning).toHaveBeenCalledTimes(2)
})

it('stops at the row budget and reports the set not fully drained', async () => {
dbChainMockFns.returning
.mockResolvedValueOnce(returnRows(2))
.mockResolvedValueOnce(returnRows(2))

const result = await drainRowsByColumn({ ...baseOpts, batchSize: 2, rowBudget: 4 })

expect(result).toEqual({ deleted: 4, fullyDrained: false })
expect(dbChainMockFns.returning).toHaveBeenCalledTimes(2)
})

it('reports fully drained immediately when the match set is already empty', async () => {
dbChainMockFns.returning.mockResolvedValueOnce([])

const result = await drainRowsByColumn({ ...baseOpts, batchSize: 2, rowBudget: 10 })

expect(result).toEqual({ deleted: 0, fullyDrained: true })
expect(dbChainMockFns.returning).toHaveBeenCalledTimes(1)
})

it('reports not fully drained on a batch error so the caller defers the cascade', async () => {
dbChainMockFns.returning
.mockResolvedValueOnce(returnRows(2))
.mockRejectedValueOnce(new Error('db down'))

const result = await drainRowsByColumn({ ...baseOpts, batchSize: 2, rowBudget: 10 })

expect(result).toEqual({ deleted: 2, fullyDrained: false })
})
})
74 changes: 73 additions & 1 deletion apps/sim/lib/cleanup/batch-delete.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { db } from '@sim/db'
import { createLogger } from '@sim/logger'
import { and, inArray, isNotNull, lt, sql } from 'drizzle-orm'
import { and, eq, inArray, isNotNull, lt, sql } from 'drizzle-orm'
import type { PgColumn, PgTable } from 'drizzle-orm/pg-core'

const logger = createLogger('BatchDelete')
Expand Down Expand Up @@ -262,3 +262,75 @@ export async function deleteRowsById(
)
return result
}

export interface DrainByColumnOptions {
tableDef: PgTable
/** Single-column primary key used to batch the delete. */
idCol: PgColumn
/** Column matched against `matchValue` to scope the drain (e.g. a parent FK). */
matchCol: PgColumn
matchValue: string
tableName: string
batchSize?: number
/** Max rows to delete in this call across all batches. */
rowBudget: number
}

export interface DrainResult {
deleted: number
/**
* True only when the match set was confirmed empty (a short final batch).
* Budget exhaustion and batch errors both yield `false` — callers must treat
* `false` as "rows may remain" and defer any dependent parent-delete (whose
* ON DELETE CASCADE would otherwise fire on the leftovers) to a later run.
*/
fullyDrained: boolean
}

/**
* Delete every row matching `matchCol = matchValue` in self-bounded batches,
* each its own transaction. Use to empty a large child set before deleting its
* parent so the parent's ON DELETE CASCADE fires on a small (or empty) set
* instead of millions of rows in one statement.
*/
export async function drainRowsByColumn({
tableDef,
idCol,
matchCol,
matchValue,
tableName,
batchSize = DEFAULT_DELETE_CHUNK_SIZE,
rowBudget,
}: DrainByColumnOptions): Promise<DrainResult> {
let deleted = 0
let remaining = rowBudget

while (remaining > 0) {
const limit = Math.min(batchSize, remaining)
const targetIds = db
.select({ id: idCol })
.from(tableDef)
.where(eq(matchCol, matchValue))
.limit(limit)

let batchDeleted: { id: unknown }[]
try {
batchDeleted = await db
.delete(tableDef)
.where(inArray(idCol, targetIds))
.returning({ id: idCol })
} catch (error) {
logger.error(`[${tableName}] Drain batch failed for ${matchValue}:`, { error })
return { deleted, fullyDrained: false }
}
Comment thread
greptile-apps[bot] marked this conversation as resolved.
Comment thread
greptile-apps[bot] marked this conversation as resolved.

deleted += batchDeleted.length
remaining -= batchDeleted.length

// Short batch means the match set is exhausted.
if (batchDeleted.length < limit) return { deleted, fullyDrained: true }
}

// Hit the per-call budget on a full batch — rows may remain.
return { deleted, fullyDrained: false }
Comment thread
cursor[bot] marked this conversation as resolved.
Outdated
}
Loading
Loading