Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
f865ed5
feat(tables): paginated background row-delete jobs via table_jobs
TheodoreSpeaks Jun 9, 2026
3866cad
fix(tables): address review on async row-delete (filtered count, scop…
TheodoreSpeaks Jun 9, 2026
b324ac0
Merge remote-tracking branch 'origin/staging' into improvement/table-…
TheodoreSpeaks Jun 9, 2026
03c98b0
improvement(tables): filter-aware select-all runs, delete-job read ma…
TheodoreSpeaks Jun 9, 2026
3f8a67a
feat(tables): run import/delete/export/backfill jobs on trigger.dev w…
TheodoreSpeaks Jun 9, 2026
e46c5b5
improvement(tables): raise delete page to 10k and export batch to 5k
TheodoreSpeaks Jun 9, 2026
e2dafbc
improvement(tables): raise CSV import batch to 5k rows (param-cap bou…
TheodoreSpeaks Jun 9, 2026
f8a2aee
feat(tables): surface export jobs in the header tray with progress, c…
TheodoreSpeaks Jun 9, 2026
1ea5871
improvement(tables): surface exports as derived tables-scoped toasts …
TheodoreSpeaks Jun 9, 2026
cdbf43f
Revert "improvement(tables): surface exports as derived tables-scoped…
TheodoreSpeaks Jun 10, 2026
a1465d7
fix(tables): preserve export storage key (NoSuchKey) and unify jobs i…
TheodoreSpeaks Jun 10, 2026
e9cae45
improvement(tables): jobs tray icon reflects aggregate state (spinner…
TheodoreSpeaks Jun 10, 2026
ef14a38
fix(tables): restore jobs tray on the tables list (dropped in staging…
TheodoreSpeaks Jun 10, 2026
6a9d409
improvement(tables): keyset-paginate export row reads (offset paging …
TheodoreSpeaks Jun 10, 2026
d9d688c
perf(tables): keyset pagination for grid infinite scroll
TheodoreSpeaks Jun 10, 2026
a0be974
fix(tables): show export in job tray immediately on kickoff
TheodoreSpeaks Jun 10, 2026
194a939
fix(tables): surface real row/column write errors in toasts
TheodoreSpeaks Jun 10, 2026
f9d8e90
Merge remote-tracking branch 'origin/staging' into improvement/table-…
TheodoreSpeaks Jun 10, 2026
38f9470
perf(tables): tenant-bound filtered row counts (12.7s -> 0.6s)
TheodoreSpeaks Jun 10, 2026
1fd7368
perf(tables): tenant-bound Cmd+F search and stream its window (75s ->…
TheodoreSpeaks Jun 10, 2026
5464ce7
perf(tables): tenant-bound sorted pages and filtered write selections
TheodoreSpeaks Jun 10, 2026
30b950d
perf(tables): tenant-scoped containment index (migration 0232)
TheodoreSpeaks Jun 10, 2026
2f44ac0
perf(tables): tenant-bound unique-constraint checks (3.5s -> <1s per …
TheodoreSpeaks Jun 10, 2026
d0abf3e
perf(tables): tenant-bound upsert conflict lookup
TheodoreSpeaks Jun 10, 2026
d339c0e
refactor(tables): consolidate executor types onto planner exports
TheodoreSpeaks Jun 10, 2026
6849408
fix(tests): drop narrow schema mock override in process-contents test
TheodoreSpeaks Jun 10, 2026
fb3c6c6
fix(tables): scope cancels and counts to the filtered selection (review)
TheodoreSpeaks Jun 10, 2026
93ed482
chore: retrigger CI (Actions dropped the previous push events)
TheodoreSpeaks Jun 10, 2026
3a41a03
Merge remote-tracking branch 'origin/staging' into improvement/table-…
TheodoreSpeaks Jun 10, 2026
3a75b9c
chore: bump api-validation route baseline to 807 (staging route + merge)
TheodoreSpeaks Jun 10, 2026
4954807
fix(tables): release job claim when trigger.dev dispatch fails
TheodoreSpeaks Jun 10, 2026
11c4c4d
chore(db): squash 0232 into 0231 (one migration for the PR)
TheodoreSpeaks Jun 10, 2026
c95349d
fix(tables): context-menu delete label shows the true select-all count
TheodoreSpeaks Jun 10, 2026
fab4ab4
fix(tables): context-menu bulk actions act on the full select-all scope
TheodoreSpeaks Jun 10, 2026
3546723
feat(tables): exclusion set for select-all runs and stops
TheodoreSpeaks Jun 10, 2026
c4b2891
fix(tables): spare excluded-row dispatches on Stop; no orphan placeho…
TheodoreSpeaks Jun 11, 2026
eb146b1
Merge remote-tracking branch 'origin/staging' into improvement/table-…
TheodoreSpeaks Jun 11, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 28 additions & 18 deletions apps/sim/app/api/cron/cleanup-stale-executions/route.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { asyncJobs, db } from '@sim/db'
import { userTableDefinitions, workflowExecutionLogs } from '@sim/db/schema'
import { tableJobs, workflowExecutionLogs } from '@sim/db/schema'
import { createLogger } from '@sim/logger'
import { toError } from '@sim/utils/errors'
import { and, eq, inArray, lt, sql } from 'drizzle-orm'
Expand All @@ -14,6 +14,8 @@ const logger = createLogger('CleanupStaleExecutions')
const STALE_THRESHOLD_MS = getMaxExecutionTimeout() + 5 * 60 * 1000
const STALE_THRESHOLD_MINUTES = Math.ceil(STALE_THRESHOLD_MS / 60000)
const MAX_INT32 = 2_147_483_647
/** Terminal table-jobs older than this are pruned; only the latest job per table is ever read. */
const TABLE_JOB_RETENTION_HOURS = 24

export const GET = withRouteHandler(async (request: NextRequest) => {
try {
Expand Down Expand Up @@ -110,33 +112,41 @@ export const GET = withRouteHandler(async (request: NextRequest) => {
})
}

// Mark stale table imports as failed. Imports run detached on the web container and
// are lost if the pod is killed mid-load. `updatedAt` is bumped by progress updates, so
// an `importing` table with no recent update has stalled (not merely slow). Rows are
// left in place (no rollback); the user re-imports.
// Mark stale table jobs (import or delete) as failed. Jobs run detached on the web container
// and are lost if the pod is killed mid-run. `updated_at` is bumped by progress updates, so a
// `running` job with no recent update has stalled (not merely slow). Committed work is left in
// place (no rollback); the user retries. Also prune long-settled terminal jobs so the table
// doesn't grow unbounded (the latest job per table is what list/detail reads surface).
let staleImportsMarkedFailed = 0
try {
const now = new Date()
const staleImports = await db
.update(userTableDefinitions)
.update(tableJobs)
.set({
importStatus: 'failed',
importError: `Import terminated: no progress for more than ${STALE_THRESHOLD_MINUTES} minutes (worker timeout or crash)`,
updatedAt: new Date(),
status: 'failed',
error: `Job terminated: no progress for more than ${STALE_THRESHOLD_MINUTES} minutes (worker timeout or crash)`,
completedAt: now,
updatedAt: now,
})
.where(
and(
eq(userTableDefinitions.importStatus, 'importing'),
lt(userTableDefinitions.updatedAt, staleThreshold)
)
)
.returning({ id: userTableDefinitions.id })
.where(and(eq(tableJobs.status, 'running'), lt(tableJobs.updatedAt, staleThreshold)))
Comment thread
cursor[bot] marked this conversation as resolved.
.returning({ id: tableJobs.id })

staleImportsMarkedFailed = staleImports.length
if (staleImportsMarkedFailed > 0) {
logger.info(`Marked ${staleImportsMarkedFailed} stale table imports as failed`)
logger.info(`Marked ${staleImportsMarkedFailed} stale table jobs as failed`)
}

const terminalRetention = new Date(Date.now() - TABLE_JOB_RETENTION_HOURS * 60 * 60 * 1000)
await db
.delete(tableJobs)
.where(
and(
inArray(tableJobs.status, ['ready', 'failed', 'canceled']),
lt(tableJobs.updatedAt, terminalRetention)
)
)
} catch (error) {
logger.error('Failed to clean up stale table imports:', {
logger.error('Failed to clean up stale table jobs:', {
error: toError(error).message,
})
}
Expand Down
165 changes: 165 additions & 0 deletions apps/sim/app/api/table/[tableId]/delete-async/route.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
/**
* @vitest-environment node
*/
import { hybridAuthMockFns } from '@sim/testing'
import { NextRequest } from 'next/server'
import { beforeEach, describe, expect, it, vi } from 'vitest'
import type { TableDefinition } from '@/lib/table'

const {
mockCheckAccess,
mockMarkTableJobRunning,
mockRunTableDelete,
mockBuildFilterClause,
TableQueryValidationError,
} = vi.hoisted(() => ({
mockCheckAccess: vi.fn(),
mockMarkTableJobRunning: vi.fn(),
mockRunTableDelete: vi.fn(),
mockBuildFilterClause: vi.fn(),
TableQueryValidationError: class extends Error {},
}))

vi.mock('@sim/utils/id', () => ({
generateId: vi.fn().mockReturnValue('job-id-xyz'),
generateShortId: vi.fn().mockReturnValue('short-id'),
}))
vi.mock('@/lib/table/service', () => ({ markTableJobRunning: mockMarkTableJobRunning }))
vi.mock('@/lib/table/delete-runner', () => ({ runTableDelete: mockRunTableDelete }))
vi.mock('@/lib/table/sql', () => ({
buildFilterClause: mockBuildFilterClause,
TableQueryValidationError,
}))
vi.mock('@/lib/core/utils/background', () => ({
runDetached: (_label: string, work: () => Promise<unknown>) => {
void work()
},
}))
vi.mock('@/app/api/table/utils', async () => {
const { NextResponse } = await import('next/server')
return {
checkAccess: mockCheckAccess,
accessError: (result: { status: number }) =>
NextResponse.json({ error: 'denied' }, { status: result.status }),
}
})

import { POST } from '@/app/api/table/[tableId]/delete-async/route'

function buildTable(overrides: Partial<TableDefinition> = {}): TableDefinition {
return {
id: 'tbl_1',
name: 'People',
description: null,
schema: { columns: [{ name: 'status', type: 'string' }] },
metadata: null,
rowCount: 1000,
maxRows: 1_000_000,
workspaceId: 'workspace-1',
createdBy: 'user-1',
archivedAt: null,
createdAt: new Date(),
updatedAt: new Date(),
...overrides,
}
}

function makeRequest(body: unknown, tableId = 'tbl_1') {
const req = new NextRequest(`http://localhost:3000/api/table/${tableId}/delete-async`, {
method: 'POST',
headers: { 'content-type': 'application/json' },
body: JSON.stringify(body),
})
return POST(req, { params: Promise.resolve({ tableId }) })
}

const validBody = {
workspaceId: 'workspace-1',
filter: { status: 'archived' },
excludeRowIds: ['row_keep'],
}

describe('POST /api/table/[tableId]/delete-async', () => {
beforeEach(() => {
vi.clearAllMocks()
hybridAuthMockFns.mockCheckSessionOrInternalAuth.mockResolvedValue({
success: true,
userId: 'user-1',
authType: 'session',
})
mockCheckAccess.mockResolvedValue({ ok: true, table: buildTable() })
mockMarkTableJobRunning.mockResolvedValue(true)
mockRunTableDelete.mockResolvedValue(undefined)
mockBuildFilterClause.mockReturnValue({})
})

it('claims the job slot and kicks off the delete worker with filter + exclusions', async () => {
const response = await makeRequest(validBody)
const data = await response.json()

expect(response.status).toBe(200)
expect(data.data).toEqual({ tableId: 'tbl_1', jobId: 'job-id-xyz' })
expect(mockMarkTableJobRunning).toHaveBeenCalledWith('tbl_1', 'job-id-xyz', 'delete')
expect(mockRunTableDelete).toHaveBeenCalledWith(
expect.objectContaining({
jobId: 'job-id-xyz',
tableId: 'tbl_1',
workspaceId: 'workspace-1',
filter: { status: 'archived' },
excludeRowIds: ['row_keep'],
cutoff: expect.any(Date),
})
)
})

it('allows a whole-table delete with no filter', async () => {
const response = await makeRequest({ workspaceId: 'workspace-1' })
expect(response.status).toBe(200)
expect(mockRunTableDelete).toHaveBeenCalledWith(
expect.objectContaining({ filter: undefined, cutoff: expect.any(Date) })
)
})

it('returns 409 when a job is already in progress (claim lost)', async () => {
mockMarkTableJobRunning.mockResolvedValue(false)
const response = await makeRequest(validBody)
expect(response.status).toBe(409)
expect(mockRunTableDelete).not.toHaveBeenCalled()
})

it('returns 400 on an invalid filter without claiming the slot', async () => {
mockBuildFilterClause.mockImplementation(() => {
throw new TableQueryValidationError('bad field')
})
const response = await makeRequest(validBody)
expect(response.status).toBe(400)
expect(mockMarkTableJobRunning).not.toHaveBeenCalled()
expect(mockRunTableDelete).not.toHaveBeenCalled()
})

it('returns 401 when unauthenticated', async () => {
hybridAuthMockFns.mockCheckSessionOrInternalAuth.mockResolvedValue({ success: false })
const response = await makeRequest(validBody)
expect(response.status).toBe(401)
expect(mockMarkTableJobRunning).not.toHaveBeenCalled()
})

it('returns the access error status when access is denied', async () => {
mockCheckAccess.mockResolvedValue({ ok: false, status: 403 })
const response = await makeRequest(validBody)
expect(response.status).toBe(403)
expect(mockRunTableDelete).not.toHaveBeenCalled()
})

it('returns 400 when the table is archived', async () => {
mockCheckAccess.mockResolvedValue({ ok: true, table: buildTable({ archivedAt: new Date() }) })
const response = await makeRequest(validBody)
expect(response.status).toBe(400)
expect(mockRunTableDelete).not.toHaveBeenCalled()
})

it('returns 400 on workspace mismatch', async () => {
const response = await makeRequest({ ...validBody, workspaceId: 'other-ws' })
expect(response.status).toBe(400)
})
})
103 changes: 103 additions & 0 deletions apps/sim/app/api/table/[tableId]/delete-async/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import { createLogger } from '@sim/logger'
import { generateId } from '@sim/utils/id'
import { type NextRequest, NextResponse } from 'next/server'
import { deleteTableRowsAsyncContract } from '@/lib/api/contracts/tables'
import { parseRequest } from '@/lib/api/server'
import { checkSessionOrInternalAuth } from '@/lib/auth/hybrid'
import { runDetached } from '@/lib/core/utils/background'
import { generateRequestId } from '@/lib/core/utils/request'
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
import { USER_TABLE_ROWS_SQL_NAME } from '@/lib/table/constants'
import { runTableDelete } from '@/lib/table/delete-runner'
import { markTableJobRunning } from '@/lib/table/service'
import { buildFilterClause, TableQueryValidationError } from '@/lib/table/sql'
import type { Filter } from '@/lib/table/types'
import { accessError, checkAccess } from '@/app/api/table/utils'

const logger = createLogger('TableDeleteAsync')

export const runtime = 'nodejs'
export const dynamic = 'force-dynamic'

interface RouteParams {
params: Promise<{ tableId: string }>
}

/**
* POST /api/table/[tableId]/delete-async
*
* Kicks off a background "select all" delete: the client sends the active filter (and an optional
* exclusion set) instead of every row id. Claims the table's single job slot (mutually exclusive
* with imports), captures a `created_at` cutoff so rows inserted while the job runs survive, then
* runs the paginated delete worker detached.
*/
export const POST = withRouteHandler(async (request: NextRequest, { params }: RouteParams) => {
const requestId = generateRequestId()

const authResult = await checkSessionOrInternalAuth(request, { requireWorkflowId: false })
if (!authResult.success || !authResult.userId) {
return NextResponse.json({ error: 'Authentication required' }, { status: 401 })
}
const userId = authResult.userId

const parsed = await parseRequest(deleteTableRowsAsyncContract, request, { params })
if (!parsed.success) return parsed.response
const { tableId } = parsed.data.params
const { workspaceId, filter, excludeRowIds } = parsed.data.body

const access = await checkAccess(tableId, userId, 'write')
if (!access.ok) return accessError(access, requestId, tableId)
const { table } = access

if (table.workspaceId !== workspaceId) {
return NextResponse.json({ error: 'Invalid workspace ID' }, { status: 400 })
}
if (table.archivedAt) {
return NextResponse.json({ error: 'Cannot delete from an archived table' }, { status: 400 })
}

// Validate the filter up front so the caller gets immediate feedback (the worker reuses it).
if (filter) {
try {
buildFilterClause(filter as Filter, USER_TABLE_ROWS_SQL_NAME, table.schema.columns)
} catch (error) {
if (error instanceof TableQueryValidationError) {
return NextResponse.json({ error: error.message }, { status: 400 })
}
throw error
}
}

// Rows inserted after this instant are spared (created_at <= cutoff in the worker).
const cutoff = new Date()

// Atomically claim the job slot — one background job per table, so this also blocks while an
// import is in flight (and vice versa).
const jobId = generateId()
const claimed = await markTableJobRunning(tableId, jobId, 'delete')
if (!claimed) {
return NextResponse.json(
{ error: 'A job is already in progress for this table' },
{ status: 409 }
)
}

runDetached('table-delete', () =>
runTableDelete({
jobId,
tableId,
workspaceId,
filter: filter as Filter | undefined,
excludeRowIds,
cutoff,
})
)

logger.info(`[${requestId}] Async row delete started`, {
tableId,
jobId,
hasFilter: Boolean(filter),
excluded: excludeRowIds?.length ?? 0,
})
return NextResponse.json({ success: true, data: { tableId, jobId } })
})
4 changes: 2 additions & 2 deletions apps/sim/app/api/table/[tableId]/import-async/route.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ vi.mock('@sim/utils/id', () => ({
generateId: vi.fn().mockReturnValue('import-id-xyz'),
generateShortId: vi.fn().mockReturnValue('short-id'),
}))
vi.mock('@/lib/table/service', () => ({ markTableImporting: mockMarkTableImporting }))
vi.mock('@/lib/table/service', () => ({ markTableJobRunning: mockMarkTableImporting }))
vi.mock('@/lib/table/import-runner', () => ({ runTableImport: mockRunTableImport }))
vi.mock('@/lib/core/utils/background', () => ({
runDetached: (_label: string, work: () => Promise<unknown>) => {
Expand Down Expand Up @@ -92,7 +92,7 @@ describe('POST /api/table/[tableId]/import-async', () => {

expect(response.status).toBe(200)
expect(data.data).toEqual({ tableId: 'tbl_1', importId: 'import-id-xyz' })
expect(mockMarkTableImporting).toHaveBeenCalledWith('tbl_1', 'import-id-xyz')
expect(mockMarkTableImporting).toHaveBeenCalledWith('tbl_1', 'import-id-xyz', 'import')
expect(mockRunTableImport).toHaveBeenCalledWith(
expect.objectContaining({
tableId: 'tbl_1',
Expand Down
10 changes: 5 additions & 5 deletions apps/sim/app/api/table/[tableId]/import-async/route.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { runDetached } from '@/lib/core/utils/background'
import { generateRequestId } from '@/lib/core/utils/request'
import { withRouteHandler } from '@/lib/core/utils/with-route-handler'
import { runTableImport } from '@/lib/table/import-runner'
import { markTableImporting } from '@/lib/table/service'
import { markTableJobRunning } from '@/lib/table/service'
import { accessError, checkAccess } from '@/app/api/table/utils'

const logger = createLogger('TableImportIntoAsync')
Expand Down Expand Up @@ -56,13 +56,13 @@ export const POST = withRouteHandler(async (request: NextRequest, { params }: Ro
}
const delimiter = ext === 'tsv' ? '\t' : ','

// Atomically claim the table — the single concurrency gate. If another import already holds it,
// this returns false (no overlapping workers writing colliding row positions).
// Atomically claim the table's job slot — the single concurrency gate. If another job (import
// or delete) already holds it, this returns false (no overlapping workers).
const importId = generateId()
const claimed = await markTableImporting(tableId, importId)
const claimed = await markTableJobRunning(tableId, importId, 'import')
if (!claimed) {
return NextResponse.json(
{ error: 'An import is already in progress for this table' },
{ error: 'A job is already in progress for this table' },
{ status: 409 }
)
}
Expand Down
Loading
Loading