From cc048464fa23351f4952a9ff84566422920e5413 Mon Sep 17 00:00:00 2001 From: Matt Miller Date: Mon, 15 Jun 2026 14:36:26 -0700 Subject: [PATCH 01/17] ci: add team-gated Cursor review (thin caller for github-workflows) Calls the reusable Comfy-Org/github-workflows cursor-review.yml (single source of truth for panel, judge, prompts, scripts) instead of a standalone copy. Label-gated to the team; secret-bearing jobs skip fork PRs. Judge overridden to Opus 4.8. --- .github/workflows/pr-cursor-review.yaml | 58 +++++++++++++++++++++++++ 1 file changed, 58 insertions(+) create mode 100644 .github/workflows/pr-cursor-review.yaml diff --git a/.github/workflows/pr-cursor-review.yaml b/.github/workflows/pr-cursor-review.yaml new file mode 100644 index 00000000000..76d2c5b85bf --- /dev/null +++ b/.github/workflows/pr-cursor-review.yaml @@ -0,0 +1,58 @@ +# Description: Team-gated multi-model Cursor review — a thin caller for the +# reusable workflow in Comfy-Org/github-workflows, which is the single source of +# truth for the panel, judge, prompts, and scripts. Triggered by the +# 'cursor-review' label. +# +# Access control (team-only, two layers): +# 1. Only users with triage permission or higher can apply a label in a public +# repo, so the public cannot trigger this. +# 2. The reusable workflow's secret-bearing jobs do not run on fork PRs (forks +# get no secrets), so CURSOR_API_KEY is reachable only on internal branches. +name: 'PR: Cursor Review' + +on: + pull_request: + types: [labeled, unlabeled] + +permissions: + contents: read + pull-requests: write + +concurrency: + # Re-labeling cancels an in-flight run for the same PR + label. + group: cursor-review-pr-${{ github.event.pull_request.number }}-${{ github.event.label.name }} + cancel-in-progress: true + +jobs: + cursor-review: + # Pinned to github-workflows main HEAD: the reusable cursor-review workflow + # postdates the v1 tag, so there is no tagged release to pin to yet. + uses: Comfy-Org/github-workflows/.github/workflows/cursor-review.yml@41d1201821487c80b7752c5a692a53ca0396066b # main + with: + # Judge on Opus 4.8. The panel's Opus leg is fixed in the reusable matrix at + # the pinned SHA; once Comfy-Org/github-workflows#9 merges, bump the two SHAs + # here to pick up the 4.8 panel leg and drop this override (it will then + # match the new reusable default). + judge_model: claude-opus-4-8-thinking-xhigh + # Overriding replaces the reusable default wholesale, so this restates the + # generated/vendored defaults and adds this repo's heavy paths (Playwright + # snapshots, generated manager types). + diff_excludes: >- + :!**/package-lock.json + :!**/yarn.lock + :!**/pnpm-lock.yaml + :!**/node_modules/** + :!**/.claude/** + :!**/dist/** + :!**/vendor/** + :!**/*.generated.* + :!**/*.min.js + :!**/*.min.css + :!**/*-snapshots/** + :!src/workbench/extensions/manager/types/generatedManagerTypes.ts + # Load the prompts/scripts from the same ref as `uses:`. + workflows_ref: 41d1201821487c80b7752c5a692a53ca0396066b + secrets: + CURSOR_API_KEY: ${{ secrets.CURSOR_API_KEY }} + # Optional — enables start/complete Slack DMs to the triggerer. + SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }} From 4cadbb8af975b53ea7c960815ce0aab2826722a6 Mon Sep 17 00:00:00 2001 From: Matt Miller Date: Mon, 15 Jun 2026 15:42:26 -0700 Subject: [PATCH 02/17] fix: resolve review feedback --- .github/workflows/pr-cursor-review.yaml | 1 + 1 file changed, 1 insertion(+) diff --git a/.github/workflows/pr-cursor-review.yaml b/.github/workflows/pr-cursor-review.yaml index 76d2c5b85bf..8e9ad3b337a 100644 --- a/.github/workflows/pr-cursor-review.yaml +++ b/.github/workflows/pr-cursor-review.yaml @@ -25,6 +25,7 @@ concurrency: jobs: cursor-review: + if: github.event.action == 'labeled' && github.event.label.name == 'cursor-review' # Pinned to github-workflows main HEAD: the reusable cursor-review workflow # postdates the v1 tag, so there is no tagged release to pin to yet. uses: Comfy-Org/github-workflows/.github/workflows/cursor-review.yml@41d1201821487c80b7752c5a692a53ca0396066b # main From e498c4ae0d30183585568107fa789ec6f5572ddc Mon Sep 17 00:00:00 2001 From: Matt Miller Date: Wed, 17 Jun 2026 11:17:58 -0700 Subject: [PATCH 03/17] ci: bump cursor-review SHA to github-workflows#9, drop judge_model override --- .github/workflows/pr-cursor-review.yaml | 23 ++++++++++------------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/.github/workflows/pr-cursor-review.yaml b/.github/workflows/pr-cursor-review.yaml index 8e9ad3b337a..471b4a0f008 100644 --- a/.github/workflows/pr-cursor-review.yaml +++ b/.github/workflows/pr-cursor-review.yaml @@ -1,7 +1,7 @@ # Description: Team-gated multi-model Cursor review — a thin caller for the # reusable workflow in Comfy-Org/github-workflows, which is the single source of # truth for the panel, judge, prompts, and scripts. Triggered by the -# 'cursor-review' label. +#'cursor-review' label. # # Access control (team-only, two layers): # 1. Only users with triage permission or higher can apply a label in a public @@ -26,18 +26,14 @@ concurrency: jobs: cursor-review: if: github.event.action == 'labeled' && github.event.label.name == 'cursor-review' - # Pinned to github-workflows main HEAD: the reusable cursor-review workflow - # postdates the v1 tag, so there is no tagged release to pin to yet. - uses: Comfy-Org/github-workflows/.github/workflows/cursor-review.yml@41d1201821487c80b7752c5a692a53ca0396066b # main + # SHA-pinned per zizmor `unpinned-uses: hash-pin`. Bump this SHA to pick up + # upstream changes; keep `workflows_ref` matching so prompts/scripts load + # from the same commit as the workflow definition. + uses: Comfy-Org/github-workflows/.github/workflows/cursor-review.yml@047ca48febe3a6647608ed2e0c4331b491cb9d6a # github-workflows#9 with: - # Judge on Opus 4.8. The panel's Opus leg is fixed in the reusable matrix at - # the pinned SHA; once Comfy-Org/github-workflows#9 merges, bump the two SHAs - # here to pick up the 4.8 panel leg and drop this override (it will then - # match the new reusable default). - judge_model: claude-opus-4-8-thinking-xhigh - # Overriding replaces the reusable default wholesale, so this restates the - # generated/vendored defaults and adds this repo's heavy paths (Playwright - # snapshots, generated manager types). + # Overriding diff_excludes replaces the reusable default wholesale, so + # this restates the generated/vendored defaults and adds this repo's heavy + # paths (Playwright snapshots, generated manager types). diff_excludes: >- :!**/package-lock.json :!**/yarn.lock @@ -52,8 +48,9 @@ jobs: :!**/*-snapshots/** :!src/workbench/extensions/manager/types/generatedManagerTypes.ts # Load the prompts/scripts from the same ref as `uses:`. - workflows_ref: 41d1201821487c80b7752c5a692a53ca0396066b + workflows_ref: 047ca48febe3a6647608ed2e0c4331b491cb9d6a secrets: CURSOR_API_KEY: ${{ secrets.CURSOR_API_KEY }} # Optional — enables start/complete Slack DMs to the triggerer. SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }} + From 880582ab5d7d0c582c0cd12ea78f41705515ab6d Mon Sep 17 00:00:00 2001 From: GitHub Action Date: Wed, 17 Jun 2026 18:21:48 +0000 Subject: [PATCH 04/17] [automated] Apply ESLint and Oxfmt fixes --- .github/workflows/pr-cursor-review.yaml | 1 - 1 file changed, 1 deletion(-) diff --git a/.github/workflows/pr-cursor-review.yaml b/.github/workflows/pr-cursor-review.yaml index 471b4a0f008..6a6ff6b4d4e 100644 --- a/.github/workflows/pr-cursor-review.yaml +++ b/.github/workflows/pr-cursor-review.yaml @@ -53,4 +53,3 @@ jobs: CURSOR_API_KEY: ${{ secrets.CURSOR_API_KEY }} # Optional — enables start/complete Slack DMs to the triggerer. SLACK_BOT_TOKEN: ${{ secrets.SLACK_BOT_TOKEN }} - From 8d23daa33d52ab1345cf517a0f8a26964c16cde7 Mon Sep 17 00:00:00 2001 From: Matt Miller Date: Wed, 17 Jun 2026 11:25:19 -0700 Subject: [PATCH 05/17] fix: resolve review feedback --- .github/workflows/pr-cursor-review.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/pr-cursor-review.yaml b/.github/workflows/pr-cursor-review.yaml index 6a6ff6b4d4e..3a9f4a729b2 100644 --- a/.github/workflows/pr-cursor-review.yaml +++ b/.github/workflows/pr-cursor-review.yaml @@ -1,7 +1,7 @@ # Description: Team-gated multi-model Cursor review — a thin caller for the # reusable workflow in Comfy-Org/github-workflows, which is the single source of # truth for the panel, judge, prompts, and scripts. Triggered by the -#'cursor-review' label. +# 'cursor-review' label. # # Access control (team-only, two layers): # 1. Only users with triage permission or higher can apply a label in a public From 5ef89c73e6e3766951afaaf1e4af2015b6e779fe Mon Sep 17 00:00:00 2001 From: Matt Miller Date: Wed, 10 Jun 2026 12:03:49 -0700 Subject: [PATCH 06/17] feat(assets): adopt cursor pagination in the Generated tab jobs walk The Generated tab walks GET /jobs with offset paging, which drifts when jobs complete mid-scroll, and every status event resets the loaded list back to page one. Core and cloud now share a keyset cursor contract on /jobs (after / next_cursor), so: - fetchHistoryPage accepts { after } and sends it instead of offset; next_cursor is surfaced from the pagination response - the assets store walks history by cursor, bootstraps into cursor mode from the first offset page (the server mints a cursor either way), and keeps offset as the fallback for backends that don't mint cursors - a rejected cursor (e.g. 400 INVALID_CURSOR after a server restart) drops the cursor and resumes once via the offset fallback - fetchJobsRaw now throws on failure instead of fabricating an empty hasMore:false page, so a failed fetch is distinguishable from the end of the timeline and infinite scroll stays resumable after errors - job-completion refreshes call refreshHistoryHead, which merge-prepends the head page instead of resetting scroll state; it replaces the list when the page spans the whole timeline (pruning server-side deletions) and restarts the walk when more than a page of new jobs would leave an unfillable gap - in-flight page fetches are invalidated by an epoch counter when the list resets, so stale continuations can't pollute a fresh walk hasMoreHistory is now server-authoritative instead of inferred from batch length. --- src/components/queue/QueueProgressOverlay.vue | 2 +- .../missingMediaAssetResolver.test.ts | 4 +- .../missingMedia/missingMediaAssetResolver.ts | 2 +- .../missingMedia/missingMediaScan.test.ts | 6 +- .../remote/comfyui/jobs/fetchJobs.test.ts | 95 ++- src/platform/remote/comfyui/jobs/fetchJobs.ts | 75 +-- src/platform/remote/comfyui/jobs/jobTypes.ts | 3 +- src/stores/assetsStore.test.ts | 602 ++++++++++++++++-- src/stores/assetsStore.ts | 173 +++-- src/views/GraphView.test.ts | 5 +- src/views/GraphView.vue | 4 +- 11 files changed, 823 insertions(+), 148 deletions(-) diff --git a/src/components/queue/QueueProgressOverlay.vue b/src/components/queue/QueueProgressOverlay.vue index 38cfae3146b..8747cde2e18 100644 --- a/src/components/queue/QueueProgressOverlay.vue +++ b/src/components/queue/QueueProgressOverlay.vue @@ -251,7 +251,7 @@ const focusAssetInSidebar = async (item: JobListItem) => { const assetId = String(jobId) openAssetsSidebar() await nextTick() - await assetsStore.updateHistory() + await assetsStore.refreshHistoryHead() const asset = assetsStore.historyAssets.find( (existingAsset) => existingAsset.id === assetId ) diff --git a/src/platform/missingMedia/missingMediaAssetResolver.test.ts b/src/platform/missingMedia/missingMediaAssetResolver.test.ts index 632320759c1..b8f092cf081 100644 --- a/src/platform/missingMedia/missingMediaAssetResolver.test.ts +++ b/src/platform/missingMedia/missingMediaAssetResolver.test.ts @@ -255,13 +255,13 @@ describe('resolveMissingMediaAssetSources', () => { 1, expect.any(Function), 200, - 0 + { offset: 0 } ) expect(mockFetchHistoryPage).toHaveBeenNthCalledWith( 2, expect.any(Function), 200, - 200 + { offset: 200 } ) }) diff --git a/src/platform/missingMedia/missingMediaAssetResolver.ts b/src/platform/missingMedia/missingMediaAssetResolver.ts index 20f803c51ab..861fe8439d4 100644 --- a/src/platform/missingMedia/missingMediaAssetResolver.ts +++ b/src/platform/missingMedia/missingMediaAssetResolver.ts @@ -176,7 +176,7 @@ async function fetchGeneratedHistoryAssets( const historyPage = await fetchHistoryPage( api.fetchApi.bind(api), HISTORY_MEDIA_ASSETS_PAGE_SIZE, - requestedOffset + { offset: requestedOffset } ) signal?.throwIfAborted() diff --git a/src/platform/missingMedia/missingMediaScan.test.ts b/src/platform/missingMedia/missingMediaScan.test.ts index 6e2236533eb..d90ea77a42c 100644 --- a/src/platform/missingMedia/missingMediaScan.test.ts +++ b/src/platform/missingMedia/missingMediaScan.test.ts @@ -709,7 +709,7 @@ describe('verifyMediaCandidates', () => { expect(mockFetchHistoryPage).toHaveBeenCalledWith( expect.any(Function), 200, - 0 + { offset: 0 } ) expect(candidates[0]).toMatchObject({ name: 'subfolder/photo.png [output]', @@ -843,13 +843,13 @@ describe('verifyMediaCandidates', () => { 1, expect.any(Function), 200, - 0 + { offset: 0 } ) expect(mockFetchHistoryPage).toHaveBeenNthCalledWith( 2, expect.any(Function), 200, - 200 + { offset: 200 } ) expect(candidates[0].isMissing).toBe(false) }) diff --git a/src/platform/remote/comfyui/jobs/fetchJobs.test.ts b/src/platform/remote/comfyui/jobs/fetchJobs.test.ts index 53ad431f84c..076c2bb2320 100644 --- a/src/platform/remote/comfyui/jobs/fetchJobs.test.ts +++ b/src/platform/remote/comfyui/jobs/fetchJobs.test.ts @@ -39,7 +39,8 @@ function createMockResponse( offset: pagination.offset ?? 0, limit: pagination.limit ?? 200, total, - has_more: pagination.has_more ?? false + has_more: pagination.has_more ?? false, + next_cursor: pagination.next_cursor } } } @@ -135,23 +136,19 @@ describe('fetchJobs', () => { expect(result[0].priority).toBe(999) }) - it('returns empty array on error', async () => { + it('propagates fetch errors', async () => { const mockFetch = vi.fn().mockRejectedValue(new Error('Network error')) - const result = await fetchHistory(mockFetch) - - expect(result).toEqual([]) + await expect(fetchHistory(mockFetch)).rejects.toThrow('Network error') }) - it('returns empty array on non-ok response', async () => { + it('throws on non-ok response', async () => { const mockFetch = vi.fn().mockResolvedValue({ ok: false, - status: 500 + status: 400 }) - const result = await fetchHistory(mockFetch) - - expect(result).toEqual([]) + await expect(fetchHistory(mockFetch)).rejects.toThrow('400') }) it('parses batch containing text-only preview outputs', async () => { @@ -205,7 +202,7 @@ describe('fetchJobs', () => { ) }) - const result = await fetchHistoryPage(mockFetch, 2, 5) + const result = await fetchHistoryPage(mockFetch, 2, { offset: 5 }) expect(mockFetch).toHaveBeenCalledWith( '/jobs?status=completed,failed,cancelled&limit=2&offset=5' @@ -218,6 +215,76 @@ describe('fetchJobs', () => { expect(result.jobs[0].priority).toBe(5) expect(result.jobs[1].priority).toBe(4) }) + + it('sends the cursor instead of offset and returns next_cursor', async () => { + const mockFetch = vi.fn().mockResolvedValue({ + ok: true, + json: () => + Promise.resolve( + createMockResponse([createMockJob('job1', 'completed')], 10, { + has_more: true, + next_cursor: 'cursor-page-2' + }) + ) + }) + + const result = await fetchHistoryPage(mockFetch, 200, { + after: 'cursor-page-1' + }) + + expect(mockFetch).toHaveBeenCalledWith( + '/jobs?status=completed,failed,cancelled&limit=200&after=cursor-page-1' + ) + expect(result.nextCursor).toBe('cursor-page-2') + expect(result.hasMore).toBe(true) + }) + + it('uri-encodes the cursor', async () => { + const mockFetch = vi.fn().mockResolvedValue({ + ok: true, + json: () => Promise.resolve(createMockResponse([])) + }) + + await fetchHistoryPage(mockFetch, 200, { after: 'a+b/c=' }) + + expect(mockFetch).toHaveBeenCalledWith( + '/jobs?status=completed,failed,cancelled&limit=200&after=a%2Bb%2Fc%3D' + ) + }) + + it('returns next_cursor from offset-mode responses for cursor bootstrap', async () => { + const mockFetch = vi.fn().mockResolvedValue({ + ok: true, + json: () => + Promise.resolve( + createMockResponse([createMockJob('job1', 'completed')], 10, { + has_more: true, + next_cursor: 'minted-in-offset-mode' + }) + ) + }) + + const result = await fetchHistoryPage(mockFetch, 200, { offset: 0 }) + + expect(mockFetch).toHaveBeenCalledWith( + '/jobs?status=completed,failed,cancelled&limit=200&offset=0' + ) + expect(result.nextCursor).toBe('minted-in-offset-mode') + }) + + it('omits nextCursor when the server does not mint one', async () => { + const mockFetch = vi.fn().mockResolvedValue({ + ok: true, + json: () => + Promise.resolve( + createMockResponse([createMockJob('job1', 'completed')]) + ) + }) + + const result = await fetchHistoryPage(mockFetch, 200, { offset: 0 }) + + expect(result.nextCursor).toBeUndefined() + }) }) describe('fetchQueue', () => { @@ -268,12 +335,10 @@ describe('fetchJobs', () => { ) }) - it('returns empty arrays on error', async () => { + it('propagates fetch errors', async () => { const mockFetch = vi.fn().mockRejectedValue(new Error('Network error')) - const result = await fetchQueue(mockFetch) - - expect(result).toEqual({ Running: [], Pending: [] }) + await expect(fetchQueue(mockFetch)).rejects.toThrow('Network error') }) }) diff --git a/src/platform/remote/comfyui/jobs/fetchJobs.ts b/src/platform/remote/comfyui/jobs/fetchJobs.ts index 25790a5ecd8..b41c0211434 100644 --- a/src/platform/remote/comfyui/jobs/fetchJobs.ts +++ b/src/platform/remote/comfyui/jobs/fetchJobs.ts @@ -18,12 +18,24 @@ import type { } from './jobTypes' import { zJobDetail, zJobsListResponse, zWorkflowContainer } from './jobTypes' +/** + * Position of the page to fetch. `after` is an opaque keyset cursor from a + * prior response's `nextCursor` and takes precedence over `offset`; `offset` + * remains as the fallback for random access and for backends that don't mint + * cursors. + */ +export interface JobsPageRequest { + offset?: number + after?: string +} + interface FetchJobsRawResult { jobs: RawJobListItem[] total: number offset: number limit: number hasMore: boolean + nextCursor?: string } export interface FetchHistoryPageResult { @@ -32,43 +44,38 @@ export interface FetchHistoryPageResult { offset: number limit: number hasMore: boolean + nextCursor?: string } /** - * Fetches raw jobs from /jobs endpoint + * Fetches raw jobs from /jobs endpoint. + * Throws on failure so callers can tell a failed page apart from an empty + * last page (e.g. a stale cursor rejected with 400 INVALID_CURSOR). * @internal */ async function fetchJobsRaw( fetchApi: (url: string) => Promise, statuses: JobStatus[], maxItems: number = 200, - offset: number = 0 + page: JobsPageRequest = {} ): Promise { const statusParam = statuses.join(',') - const url = `/jobs?status=${statusParam}&limit=${maxItems}&offset=${offset}` - try { - const res = await fetchApi(url) - if (!res.ok) { - console.error(`[Jobs API] Failed to fetch jobs: ${res.status}`) - return { - jobs: [], - total: 0, - offset, - limit: maxItems, - hasMore: false - } - } - const data = zJobsListResponse.parse(await res.json()) - return { - jobs: data.jobs, - total: data.pagination.total, - offset: data.pagination.offset, - limit: data.pagination.limit, - hasMore: data.pagination.has_more - } - } catch (error) { - console.error('[Jobs API] Error fetching jobs:', error) - return { jobs: [], total: 0, offset, limit: maxItems, hasMore: false } + const pageParam = page.after + ? `after=${encodeURIComponent(page.after)}` + : `offset=${page.offset ?? 0}` + const url = `/jobs?status=${statusParam}&limit=${maxItems}&${pageParam}` + const res = await fetchApi(url) + if (!res.ok) { + throw new Error(`[Jobs API] Failed to fetch jobs: ${res.status}`) + } + const data = zJobsListResponse.parse(await res.json()) + return { + jobs: data.jobs, + total: data.pagination.total, + offset: data.pagination.offset, + limit: data.pagination.limit, + hasMore: data.pagination.has_more, + nextCursor: data.pagination.next_cursor } } @@ -98,7 +105,7 @@ export async function fetchHistory( maxItems: number = 200, offset: number = 0 ): Promise { - const { jobs } = await fetchHistoryPage(fetchApi, maxItems, offset) + const { jobs } = await fetchHistoryPage(fetchApi, maxItems, { offset }) return jobs } @@ -108,13 +115,13 @@ export async function fetchHistory( export async function fetchHistoryPage( fetchApi: (url: string) => Promise, maxItems: number = 200, - offset: number = 0 + page: JobsPageRequest = {} ): Promise { const result = await fetchJobsRaw( fetchApi, ['completed', 'failed', 'cancelled'], maxItems, - offset + page ) // History gets priority based on total count (lower than queue) @@ -123,7 +130,8 @@ export async function fetchHistoryPage( total: result.total, offset: result.offset, limit: result.limit, - hasMore: result.hasMore + hasMore: result.hasMore, + nextCursor: result.nextCursor } } @@ -134,12 +142,7 @@ export async function fetchHistoryPage( export async function fetchQueue( fetchApi: (url: string) => Promise ): Promise<{ Running: JobListItem[]; Pending: JobListItem[] }> { - const { jobs } = await fetchJobsRaw( - fetchApi, - ['in_progress', 'pending'], - 200, - 0 - ) + const { jobs } = await fetchJobsRaw(fetchApi, ['in_progress', 'pending']) const running = jobs.filter((j) => j.status === 'in_progress') const pending = jobs.filter((j) => j.status === 'pending') diff --git a/src/platform/remote/comfyui/jobs/jobTypes.ts b/src/platform/remote/comfyui/jobs/jobTypes.ts index 5704fba491d..6ea88b13643 100644 --- a/src/platform/remote/comfyui/jobs/jobTypes.ts +++ b/src/platform/remote/comfyui/jobs/jobTypes.ts @@ -87,7 +87,8 @@ const zPaginationInfo = z.object({ offset: z.number(), limit: z.number(), total: z.number(), - has_more: z.boolean() + has_more: z.boolean(), + next_cursor: z.string().optional() }) export const zJobsListResponse = z.object({ diff --git a/src/stores/assetsStore.test.ts b/src/stores/assetsStore.test.ts index dacaadfdc93..d85bd90625c 100644 --- a/src/stores/assetsStore.test.ts +++ b/src/stores/assetsStore.test.ts @@ -4,15 +4,16 @@ import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' import { nextTick, watch } from 'vue' import { useAssetsStore } from '@/stores/assetsStore' -import { api } from '@/scripts/api' import type { AssetItem } from '@/platform/assets/schemas/assetSchema' +import { fetchHistoryPage } from '@/platform/remote/comfyui/jobs/fetchJobs' +import type { FetchHistoryPageResult } from '@/platform/remote/comfyui/jobs/fetchJobs' import type { JobListItem } from '@/platform/remote/comfyui/jobs/jobTypes' import { assetService } from '@/platform/assets/services/assetService' // Mock the api module vi.mock('@/scripts/api', () => ({ api: { - getHistory: vi.fn(), + fetchApi: vi.fn(), internalURL: vi.fn((path) => `http://localhost:3000${path}`), apiURL: vi.fn((path) => `http://localhost:3000/api${path}`), addEventListener: vi.fn(), @@ -21,6 +22,30 @@ vi.mock('@/scripts/api', () => ({ } })) +// Mock the jobs API fetcher used for history pagination +vi.mock('@/platform/remote/comfyui/jobs/fetchJobs', () => ({ + fetchHistoryPage: vi.fn() +})) + +// Helper to build a server pagination page around a set of jobs +const mockHistoryPage = ( + jobs: JobListItem[], + { + hasMore = false, + nextCursor, + offset = 0, + total = jobs.length, + limit = 200 + }: Partial> = {} +): FetchHistoryPageResult => ({ + jobs, + total, + offset, + limit, + hasMore, + nextCursor +}) + // Mock the asset service vi.mock('@/platform/assets/services/assetService', () => ({ assetService: { @@ -208,32 +233,38 @@ describe('assetsStore - Refactored (Option A)', () => { const mockHistory = Array.from({ length: 10 }, (_, i) => createMockJobItem(i) ) - vi.mocked(api.getHistory).mockResolvedValue(mockHistory) + vi.mocked(fetchHistoryPage).mockResolvedValue( + mockHistoryPage(mockHistory) + ) await store.updateHistory() - expect(api.getHistory).toHaveBeenCalledWith(200, { offset: 0 }) + expect(fetchHistoryPage).toHaveBeenCalledWith(expect.any(Function), 200, { + offset: 0 + }) expect(store.historyAssets).toHaveLength(10) - expect(store.hasMoreHistory).toBe(false) // Less than BATCH_SIZE + expect(store.hasMoreHistory).toBe(false) // Server reported no more pages expect(store.historyLoading).toBe(false) expect(store.historyError).toBe(null) }) - it('should set hasMoreHistory to true when batch is full', async () => { + it('should set hasMoreHistory to true when server reports more pages', async () => { const mockHistory = Array.from({ length: 200 }, (_, i) => createMockJobItem(i) ) - vi.mocked(api.getHistory).mockResolvedValue(mockHistory) + vi.mocked(fetchHistoryPage).mockResolvedValue( + mockHistoryPage(mockHistory, { hasMore: true }) + ) await store.updateHistory() expect(store.historyAssets).toHaveLength(200) - expect(store.hasMoreHistory).toBe(true) // Exactly BATCH_SIZE + expect(store.hasMoreHistory).toBe(true) }) it('should handle errors during initial load', async () => { const error = new Error('Failed to fetch') - vi.mocked(api.getHistory).mockRejectedValue(error) + vi.mocked(fetchHistoryPage).mockRejectedValue(error) await store.updateHistory() @@ -258,7 +289,9 @@ describe('assetsStore - Refactored (Option A)', () => { }, createMockJobItem(2) ] - vi.mocked(api.getHistory).mockResolvedValue(mockHistory) + vi.mocked(fetchHistoryPage).mockResolvedValue( + mockHistoryPage(mockHistory) + ) await store.updateHistory() @@ -276,7 +309,9 @@ describe('assetsStore - Refactored (Option A)', () => { const firstBatch = Array.from({ length: 200 }, (_, i) => createMockJobItem(i) ) - vi.mocked(api.getHistory).mockResolvedValueOnce(firstBatch) + vi.mocked(fetchHistoryPage).mockResolvedValueOnce( + mockHistoryPage(firstBatch, { hasMore: true }) + ) await store.updateHistory() expect(store.historyAssets).toHaveLength(200) @@ -286,11 +321,16 @@ describe('assetsStore - Refactored (Option A)', () => { const secondBatch = Array.from({ length: 200 }, (_, i) => createMockJobItem(200 + i) ) - vi.mocked(api.getHistory).mockResolvedValueOnce(secondBatch) + vi.mocked(fetchHistoryPage).mockResolvedValueOnce( + mockHistoryPage(secondBatch, { hasMore: true }) + ) await store.loadMoreHistory() - expect(api.getHistory).toHaveBeenCalledWith(200, { offset: 200 }) + // Offset fallback advances by the number of jobs the page returned + expect(fetchHistoryPage).toHaveBeenCalledWith(expect.any(Function), 200, { + offset: 200 + }) expect(store.historyAssets).toHaveLength(400) // Accumulated expect(store.hasMoreHistory).toBe(true) }) @@ -300,7 +340,9 @@ describe('assetsStore - Refactored (Option A)', () => { const firstBatch = Array.from({ length: 200 }, (_, i) => createMockJobItem(i) ) - vi.mocked(api.getHistory).mockResolvedValueOnce(firstBatch) + vi.mocked(fetchHistoryPage).mockResolvedValueOnce( + mockHistoryPage(firstBatch, { hasMore: true }) + ) await store.updateHistory() expect(store.historyAssets).toHaveLength(200) @@ -311,7 +353,9 @@ describe('assetsStore - Refactored (Option A)', () => { createMockJobItem(5), // Duplicate ...Array.from({ length: 198 }, (_, i) => createMockJobItem(200 + i)) // New ] - vi.mocked(api.getHistory).mockResolvedValueOnce(secondBatch) + vi.mocked(fetchHistoryPage).mockResolvedValueOnce( + mockHistoryPage(secondBatch) + ) await store.loadMoreHistory() @@ -325,11 +369,13 @@ describe('assetsStore - Refactored (Option A)', () => { }) it('should stop loading when no more items', async () => { - // First batch - less than BATCH_SIZE + // Server reports no further pages const firstBatch = Array.from({ length: 50 }, (_, i) => createMockJobItem(i) ) - vi.mocked(api.getHistory).mockResolvedValueOnce(firstBatch) + vi.mocked(fetchHistoryPage).mockResolvedValueOnce( + mockHistoryPage(firstBatch, { hasMore: false }) + ) await store.updateHistory() expect(store.hasMoreHistory).toBe(false) @@ -338,7 +384,7 @@ describe('assetsStore - Refactored (Option A)', () => { await store.loadMoreHistory() // Should only have been called once (initial load) - expect(api.getHistory).toHaveBeenCalledTimes(1) + expect(fetchHistoryPage).toHaveBeenCalledTimes(1) }) it('should handle race conditions with concurrent loads', async () => { @@ -346,19 +392,21 @@ describe('assetsStore - Refactored (Option A)', () => { const initialBatch = Array.from({ length: 200 }, (_, i) => createMockJobItem(i) ) - vi.mocked(api.getHistory).mockResolvedValueOnce(initialBatch) + vi.mocked(fetchHistoryPage).mockResolvedValueOnce( + mockHistoryPage(initialBatch, { hasMore: true }) + ) await store.updateHistory() expect(store.hasMoreHistory).toBe(true) // Clear mock to count only loadMore calls - vi.mocked(api.getHistory).mockClear() + vi.mocked(fetchHistoryPage).mockClear() // Setup slow API response - let resolveLoadMore: (value: JobListItem[]) => void - const loadMorePromise = new Promise((resolve) => { + let resolveLoadMore: (value: FetchHistoryPageResult) => void + const loadMorePromise = new Promise((resolve) => { resolveLoadMore = resolve }) - vi.mocked(api.getHistory).mockReturnValueOnce(loadMorePromise) + vi.mocked(fetchHistoryPage).mockReturnValueOnce(loadMorePromise) // Start first loadMore const firstLoad = store.loadMoreHistory() @@ -370,12 +418,12 @@ describe('assetsStore - Refactored (Option A)', () => { const secondBatch = Array.from({ length: 200 }, (_, i) => createMockJobItem(200 + i) ) - resolveLoadMore!(secondBatch) + resolveLoadMore!(mockHistoryPage(secondBatch, { hasMore: true })) await Promise.all([firstLoad, secondLoad]) // Only one API call - expect(api.getHistory).toHaveBeenCalledTimes(1) + expect(fetchHistoryPage).toHaveBeenCalledTimes(1) }) it('should respect MAX_HISTORY_ITEMS limit', async () => { @@ -385,7 +433,9 @@ describe('assetsStore - Refactored (Option A)', () => { const firstBatch = Array.from({ length: 200 }, (_, i) => createMockJobItem(i) ) - vi.mocked(api.getHistory).mockResolvedValueOnce(firstBatch) + vi.mocked(fetchHistoryPage).mockResolvedValueOnce( + mockHistoryPage(firstBatch, { hasMore: true }) + ) await store.updateHistory() // Load additional batches @@ -393,7 +443,9 @@ describe('assetsStore - Refactored (Option A)', () => { const items = Array.from({ length: 200 }, (_, i) => createMockJobItem(batch * 200 + i) ) - vi.mocked(api.getHistory).mockResolvedValueOnce(items) + vi.mocked(fetchHistoryPage).mockResolvedValueOnce( + mockHistoryPage(items, { hasMore: true }) + ) await store.loadMoreHistory() } @@ -402,13 +454,437 @@ describe('assetsStore - Refactored (Option A)', () => { }) }) + describe('Cursor pagination', () => { + it('uses { after } for loadMore when the first page returned a cursor', async () => { + const firstBatch = Array.from({ length: 10 }, (_, i) => + createMockJobItem(i) + ) + vi.mocked(fetchHistoryPage).mockResolvedValueOnce( + mockHistoryPage(firstBatch, { hasMore: true, nextCursor: 'cursor-1' }) + ) + await store.updateHistory() + + const secondBatch = Array.from({ length: 10 }, (_, i) => + createMockJobItem(10 + i) + ) + vi.mocked(fetchHistoryPage).mockResolvedValueOnce( + mockHistoryPage(secondBatch) + ) + await store.loadMoreHistory() + + expect(fetchHistoryPage).toHaveBeenNthCalledWith( + 2, + expect.any(Function), + 200, + { after: 'cursor-1' } + ) + }) + + it('bootstraps from offset 0 then walks successive cursors', async () => { + const pageJobs = (start: number) => + Array.from({ length: 10 }, (_, i) => createMockJobItem(start + i)) + vi.mocked(fetchHistoryPage) + .mockResolvedValueOnce( + mockHistoryPage(pageJobs(0), { + hasMore: true, + nextCursor: 'cursor-1' + }) + ) + .mockResolvedValueOnce( + mockHistoryPage(pageJobs(10), { + hasMore: true, + nextCursor: 'cursor-2' + }) + ) + .mockResolvedValueOnce(mockHistoryPage(pageJobs(20))) + + await store.updateHistory() + await store.loadMoreHistory() + await store.loadMoreHistory() + + expect(fetchHistoryPage).toHaveBeenNthCalledWith( + 1, + expect.any(Function), + 200, + { offset: 0 } + ) + expect(fetchHistoryPage).toHaveBeenNthCalledWith( + 2, + expect.any(Function), + 200, + { after: 'cursor-1' } + ) + expect(fetchHistoryPage).toHaveBeenNthCalledWith( + 3, + expect.any(Function), + 200, + { after: 'cursor-2' } + ) + expect(store.historyAssets).toHaveLength(30) + }) + + it('falls back to offset paging advanced by returned job count when no cursor is minted', async () => { + const firstBatch = Array.from({ length: 200 }, (_, i) => + createMockJobItem(i) + ) + const secondBatch = Array.from({ length: 150 }, (_, i) => + createMockJobItem(200 + i) + ) + const thirdBatch = Array.from({ length: 10 }, (_, i) => + createMockJobItem(350 + i) + ) + vi.mocked(fetchHistoryPage) + .mockResolvedValueOnce(mockHistoryPage(firstBatch, { hasMore: true })) + .mockResolvedValueOnce(mockHistoryPage(secondBatch, { hasMore: true })) + .mockResolvedValueOnce(mockHistoryPage(thirdBatch)) + + await store.updateHistory() + await store.loadMoreHistory() + await store.loadMoreHistory() + + expect(fetchHistoryPage).toHaveBeenNthCalledWith( + 2, + expect.any(Function), + 200, + { offset: 200 } + ) + expect(fetchHistoryPage).toHaveBeenNthCalledWith( + 3, + expect.any(Function), + 200, + { offset: 350 } + ) + }) + + it('stops loadMore when the server reports hasMore false despite a full batch', async () => { + const fullBatch = Array.from({ length: 200 }, (_, i) => + createMockJobItem(i) + ) + vi.mocked(fetchHistoryPage).mockResolvedValueOnce( + mockHistoryPage(fullBatch, { hasMore: false }) + ) + + await store.updateHistory() + expect(store.hasMoreHistory).toBe(false) + + await store.loadMoreHistory() + + expect(fetchHistoryPage).toHaveBeenCalledTimes(1) + }) + + it('resets to { offset: 0 } on a full reload after a cursor walk', async () => { + const pageJobs = (start: number) => + Array.from({ length: 10 }, (_, i) => createMockJobItem(start + i)) + vi.mocked(fetchHistoryPage) + .mockResolvedValueOnce( + mockHistoryPage(pageJobs(0), { + hasMore: true, + nextCursor: 'cursor-1' + }) + ) + .mockResolvedValueOnce( + mockHistoryPage(pageJobs(10), { + hasMore: true, + nextCursor: 'cursor-2' + }) + ) + .mockResolvedValueOnce(mockHistoryPage(pageJobs(0))) + + await store.updateHistory() + await store.loadMoreHistory() + await store.updateHistory() + + expect(fetchHistoryPage).toHaveBeenNthCalledWith( + 3, + expect.any(Function), + 200, + { offset: 0 } + ) + }) + + it('recovers from a rejected cursor page by retrying via offset', async () => { + const firstBatch = Array.from({ length: 10 }, (_, i) => + createMockJobItem(i) + ) + vi.mocked(fetchHistoryPage) + .mockResolvedValueOnce( + mockHistoryPage(firstBatch, { + hasMore: true, + nextCursor: 'cursor-stale' + }) + ) + .mockRejectedValueOnce(new Error('400 INVALID_CURSOR')) + .mockResolvedValueOnce( + mockHistoryPage( + Array.from({ length: 10 }, (_, i) => createMockJobItem(10 + i)), + { hasMore: true, nextCursor: 'cursor-fresh' } + ) + ) + + await store.updateHistory() + await store.loadMoreHistory() + + expect(fetchHistoryPage).toHaveBeenNthCalledWith( + 2, + expect.any(Function), + 200, + { after: 'cursor-stale' } + ) + expect(fetchHistoryPage).toHaveBeenNthCalledWith( + 3, + expect.any(Function), + 200, + { offset: 10 } + ) + expect(store.historyAssets).toHaveLength(20) + expect(store.historyError).toBe(null) + + // The recovered page minted a fresh cursor, so the walk resumes in cursor mode + vi.mocked(fetchHistoryPage).mockResolvedValueOnce(mockHistoryPage([])) + await store.loadMoreHistory() + expect(fetchHistoryPage).toHaveBeenNthCalledWith( + 4, + expect.any(Function), + 200, + { after: 'cursor-fresh' } + ) + }) + + it('keeps pagination resumable when the offset retry also fails', async () => { + const firstBatch = Array.from({ length: 10 }, (_, i) => + createMockJobItem(i) + ) + vi.mocked(fetchHistoryPage) + .mockResolvedValueOnce( + mockHistoryPage(firstBatch, { + hasMore: true, + nextCursor: 'cursor-stale' + }) + ) + .mockRejectedValueOnce(new Error('400 INVALID_CURSOR')) + .mockRejectedValueOnce(new Error('network down')) + + await store.updateHistory() + await store.loadMoreHistory() + + expect(store.historyError).toBeInstanceOf(Error) + expect(store.hasMoreHistory).toBe(true) + expect(store.historyAssets).toHaveLength(10) + + // The dropped cursor means the next attempt resumes via offset + vi.mocked(fetchHistoryPage).mockResolvedValueOnce( + mockHistoryPage( + Array.from({ length: 5 }, (_, i) => createMockJobItem(10 + i)) + ) + ) + await store.loadMoreHistory() + expect(fetchHistoryPage).toHaveBeenNthCalledWith( + 4, + expect.any(Function), + 200, + { offset: 10 } + ) + expect(store.historyAssets).toHaveLength(15) + }) + + it('discards a stale loadMore continuation that resolves after a reset', async () => { + const firstBatch = Array.from({ length: 10 }, (_, i) => + createMockJobItem(i) + ) + vi.mocked(fetchHistoryPage).mockResolvedValueOnce( + mockHistoryPage(firstBatch, { hasMore: true, nextCursor: 'cursor-1' }) + ) + await store.updateHistory() + + let resolveStale: (page: FetchHistoryPageResult) => void + vi.mocked(fetchHistoryPage).mockReturnValueOnce( + new Promise((resolve) => { + resolveStale = resolve + }) + ) + const staleLoad = store.loadMoreHistory() + + const freshBatch = Array.from({ length: 5 }, (_, i) => + createMockJobItem(100 + i) + ) + vi.mocked(fetchHistoryPage).mockResolvedValueOnce( + mockHistoryPage(freshBatch, { + hasMore: true, + nextCursor: 'cursor-fresh' + }) + ) + await store.updateHistory() + + resolveStale!( + mockHistoryPage( + Array.from({ length: 10 }, (_, i) => createMockJobItem(10 + i)), + { hasMore: true, nextCursor: 'cursor-stale' } + ) + ) + await staleLoad + + // The stale page neither merged into the fresh list nor moved its cursor + expect(store.historyAssets).toHaveLength(5) + vi.mocked(fetchHistoryPage).mockResolvedValueOnce(mockHistoryPage([])) + await store.loadMoreHistory() + expect(fetchHistoryPage).toHaveBeenLastCalledWith( + expect.any(Function), + 200, + { after: 'cursor-fresh' } + ) + }) + }) + + describe('refreshHistoryHead', () => { + it('performs a full initial load when the list is empty', async () => { + const jobs = Array.from({ length: 5 }, (_, i) => createMockJobItem(i)) + vi.mocked(fetchHistoryPage).mockResolvedValueOnce(mockHistoryPage(jobs)) + + await store.refreshHistoryHead() + + expect(fetchHistoryPage).toHaveBeenCalledWith(expect.any(Function), 200, { + offset: 0 + }) + expect(store.historyAssets).toHaveLength(5) + expect(store.historyLoading).toBe(false) + }) + + it('prepends new completions without resetting the stored cursor', async () => { + const initialJobs = [createMockJobItem(1), createMockJobItem(2)] + vi.mocked(fetchHistoryPage) + .mockResolvedValueOnce( + mockHistoryPage(initialJobs, { + hasMore: true, + nextCursor: 'cursor-1' + }) + ) + .mockResolvedValueOnce( + mockHistoryPage([createMockJobItem(0), createMockJobItem(1)], { + hasMore: true, + nextCursor: 'cursor-head' + }) + ) + .mockResolvedValueOnce(mockHistoryPage([createMockJobItem(3)])) + + await store.updateHistory() + await store.refreshHistoryHead() + + // Head page re-fetched from the top, new job prepended, existing kept + expect(fetchHistoryPage).toHaveBeenNthCalledWith( + 2, + expect.any(Function), + 200, + { offset: 0 } + ) + expect(store.historyAssets.map((a) => a.id)).toEqual([ + 'prompt_0', + 'prompt_1', + 'prompt_2' + ]) + + // Subsequent loadMore still resumes from the pre-refresh cursor + await store.loadMoreHistory() + expect(fetchHistoryPage).toHaveBeenNthCalledWith( + 3, + expect.any(Function), + 200, + { after: 'cursor-1' } + ) + }) + + it('keeps existing items and records the error when the head fetch fails', async () => { + const initialJobs = Array.from({ length: 3 }, (_, i) => + createMockJobItem(i) + ) + vi.mocked(fetchHistoryPage).mockResolvedValueOnce( + mockHistoryPage(initialJobs, { hasMore: true }) + ) + await store.updateHistory() + + const error = new Error('refresh failed') + vi.mocked(fetchHistoryPage).mockRejectedValueOnce(error) + + await store.refreshHistoryHead() + + expect(store.historyAssets).toHaveLength(3) + expect(store.historyError).toBe(error) + }) + + it('restarts the walk when the head page does not reach the loaded items', async () => { + const overflowPage = () => + Array.from({ length: 200 }, (_, i) => createMockJobItem(1000 + i)) + vi.mocked(fetchHistoryPage) + .mockResolvedValueOnce( + mockHistoryPage( + Array.from({ length: 10 }, (_, i) => createMockJobItem(i)), + { hasMore: true, nextCursor: 'cursor-1' } + ) + ) + .mockResolvedValueOnce( + mockHistoryPage(overflowPage(), { + hasMore: true, + nextCursor: 'cursor-head' + }) + ) + .mockResolvedValueOnce( + mockHistoryPage(overflowPage(), { + hasMore: true, + nextCursor: 'cursor-head-2' + }) + ) + + await store.updateHistory() + await store.refreshHistoryHead() + + // No overlap with loaded items while more rows remain means merging + // would leave an unfillable hole, so the walk restarts from the top + expect(fetchHistoryPage).toHaveBeenCalledTimes(3) + expect(fetchHistoryPage).toHaveBeenNthCalledWith( + 3, + expect.any(Function), + 200, + { offset: 0 } + ) + expect(store.historyAssets).toHaveLength(200) + expect(store.historyAssets.some((asset) => asset.id === 'prompt_0')).toBe( + false + ) + }) + + it('prunes server-side deletions when the head page spans the whole timeline', async () => { + vi.mocked(fetchHistoryPage) + .mockResolvedValueOnce( + mockHistoryPage([ + createMockJobItem(0), + createMockJobItem(1), + createMockJobItem(2) + ]) + ) + .mockResolvedValueOnce( + mockHistoryPage([createMockJobItem(0), createMockJobItem(2)]) + ) + + await store.updateHistory() + expect(store.historyAssets).toHaveLength(3) + + await store.refreshHistoryHead() + + expect(store.historyAssets.map((asset) => asset.id)).toEqual([ + 'prompt_0', + 'prompt_2' + ]) + }) + }) + describe('Sorting', () => { it('should maintain date sorting after pagination', async () => { // First batch const firstBatch = Array.from({ length: 200 }, (_, i) => createMockJobItem(i) ) - vi.mocked(api.getHistory).mockResolvedValueOnce(firstBatch) + vi.mocked(fetchHistoryPage).mockResolvedValueOnce( + mockHistoryPage(firstBatch, { hasMore: true }) + ) await store.updateHistory() @@ -416,7 +892,9 @@ describe('assetsStore - Refactored (Option A)', () => { const secondBatch = Array.from({ length: 200 }, (_, i) => createMockJobItem(200 + i) ) - vi.mocked(api.getHistory).mockResolvedValueOnce(secondBatch) + vi.mocked(fetchHistoryPage).mockResolvedValueOnce( + mockHistoryPage(secondBatch) + ) await store.loadMoreHistory() @@ -435,14 +913,16 @@ describe('assetsStore - Refactored (Option A)', () => { const firstBatch = Array.from({ length: 200 }, (_, i) => createMockJobItem(i) ) - vi.mocked(api.getHistory).mockResolvedValueOnce(firstBatch) + vi.mocked(fetchHistoryPage).mockResolvedValueOnce( + mockHistoryPage(firstBatch, { hasMore: true }) + ) await store.updateHistory() expect(store.historyAssets).toHaveLength(200) // Second load fails const error = new Error('Network error') - vi.mocked(api.getHistory).mockRejectedValueOnce(error) + vi.mocked(fetchHistoryPage).mockRejectedValueOnce(error) await store.loadMoreHistory() @@ -457,13 +937,15 @@ describe('assetsStore - Refactored (Option A)', () => { const firstBatch = Array.from({ length: 200 }, (_, i) => createMockJobItem(i) ) - vi.mocked(api.getHistory).mockResolvedValueOnce(firstBatch) + vi.mocked(fetchHistoryPage).mockResolvedValueOnce( + mockHistoryPage(firstBatch, { hasMore: true }) + ) await store.updateHistory() // Second load fails const error = new Error('Network error') - vi.mocked(api.getHistory).mockRejectedValueOnce(error) + vi.mocked(fetchHistoryPage).mockRejectedValueOnce(error) await store.loadMoreHistory() expect(store.historyError).toBe(error) @@ -472,7 +954,9 @@ describe('assetsStore - Refactored (Option A)', () => { const thirdBatch = Array.from({ length: 200 }, (_, i) => createMockJobItem(200 + i) ) - vi.mocked(api.getHistory).mockResolvedValueOnce(thirdBatch) + vi.mocked(fetchHistoryPage).mockResolvedValueOnce( + mockHistoryPage(thirdBatch) + ) await store.loadMoreHistory() @@ -483,7 +967,7 @@ describe('assetsStore - Refactored (Option A)', () => { it('should handle errors with proper loading state', async () => { const error = new Error('API error') - vi.mocked(api.getHistory).mockRejectedValue(error) + vi.mocked(fetchHistoryPage).mockRejectedValue(error) await store.updateHistory() @@ -501,7 +985,9 @@ describe('assetsStore - Refactored (Option A)', () => { const items = Array.from({ length: 200 }, (_, i) => createMockJobItem(batch * 200 + i) ) - vi.mocked(api.getHistory).mockResolvedValueOnce(items) + vi.mocked(fetchHistoryPage).mockResolvedValueOnce( + mockHistoryPage(items, { hasMore: true }) + ) if (batch === 0) { await store.updateHistory() @@ -525,7 +1011,9 @@ describe('assetsStore - Refactored (Option A)', () => { const items = Array.from({ length: 200 }, (_, i) => createMockJobItem(batch * 200 + i) ) - vi.mocked(api.getHistory).mockResolvedValueOnce(items) + vi.mocked(fetchHistoryPage).mockResolvedValueOnce( + mockHistoryPage(items, { hasMore: true }) + ) if (batch === 0) { await store.updateHistory() @@ -550,7 +1038,9 @@ describe('assetsStore - Refactored (Option A)', () => { const mockHistory = Array.from({ length: 3 }, (_, i) => createMockJobItem(i) ) - vi.mocked(api.getHistory).mockResolvedValue(mockHistory) + vi.mocked(fetchHistoryPage).mockResolvedValue( + mockHistoryPage(mockHistory) + ) await store.updateHistory() const target = store.historyAssets[1] @@ -569,7 +1059,9 @@ describe('assetsStore - Refactored (Option A)', () => { it('matches by name even when ids differ between APIs', async () => { const mockHistory = [createMockJobItem(0)] - vi.mocked(api.getHistory).mockResolvedValue(mockHistory) + vi.mocked(fetchHistoryPage).mockResolvedValue( + mockHistoryPage(mockHistory) + ) await store.updateHistory() const historyAssetId = store.historyAssets[0].id @@ -587,7 +1079,9 @@ describe('assetsStore - Refactored (Option A)', () => { const mockHistory = Array.from({ length: 2 }, (_, i) => createMockJobItem(i) ) - vi.mocked(api.getHistory).mockResolvedValue(mockHistory) + vi.mocked(fetchHistoryPage).mockResolvedValue( + mockHistoryPage(mockHistory) + ) await store.updateHistory() const before = store.historyAssets.map((a) => ({ ...a })) @@ -600,7 +1094,9 @@ describe('assetsStore - Refactored (Option A)', () => { const mockHistory = Array.from({ length: 3 }, (_, i) => createMockJobItem(i) ) - vi.mocked(api.getHistory).mockResolvedValue(mockHistory) + vi.mocked(fetchHistoryPage).mockResolvedValue( + mockHistoryPage(mockHistory) + ) await store.updateHistory() // Patch using a non-matching prefix; the other assets must stay untouched @@ -613,7 +1109,9 @@ describe('assetsStore - Refactored (Option A)', () => { it('replaces the asset object so reactivity fires for v-for keyed by id', async () => { const mockHistory = [createMockJobItem(0)] - vi.mocked(api.getHistory).mockResolvedValue(mockHistory) + vi.mocked(fetchHistoryPage).mockResolvedValue( + mockHistoryPage(mockHistory) + ) await store.updateHistory() const before = store.historyAssets[0] @@ -632,7 +1130,9 @@ describe('assetsStore - Refactored (Option A)', () => { const mockHistory = Array.from({ length: 5 }, (_, i) => createMockJobItem(i) ) - vi.mocked(api.getHistory).mockResolvedValue(mockHistory) + vi.mocked(fetchHistoryPage).mockResolvedValue( + mockHistoryPage(mockHistory) + ) await store.updateHistory() @@ -676,7 +1176,9 @@ describe('assetsStore - Refactored (Option A)', () => { ] const mockHistory = [createMockJobItem(0)] - vi.mocked(api.getHistory).mockResolvedValue(mockHistory) + vi.mocked(fetchHistoryPage).mockResolvedValue( + mockHistoryPage(mockHistory) + ) await store.updateHistory() @@ -711,7 +1213,9 @@ describe('assetsStore - Refactored (Option A)', () => { ] const mockHistory = [createMockJobItem(0)] - vi.mocked(api.getHistory).mockResolvedValue(mockHistory) + vi.mocked(fetchHistoryPage).mockResolvedValue( + mockHistoryPage(mockHistory) + ) await store.updateHistory() @@ -739,7 +1243,9 @@ describe('assetsStore - Refactored (Option A)', () => { ] const mockHistory = [createMockJobItem(0)] - vi.mocked(api.getHistory).mockResolvedValue(mockHistory) + vi.mocked(fetchHistoryPage).mockResolvedValue( + mockHistoryPage(mockHistory) + ) await store.updateHistory() @@ -760,7 +1266,9 @@ describe('assetsStore - Refactored (Option A)', () => { ] const mockHistory = [createMockJobItem(0)] - vi.mocked(api.getHistory).mockResolvedValue(mockHistory) + vi.mocked(fetchHistoryPage).mockResolvedValue( + mockHistoryPage(mockHistory) + ) await store.updateHistory() diff --git a/src/stores/assetsStore.ts b/src/stores/assetsStore.ts index 3889182465f..fb1174ace10 100644 --- a/src/stores/assetsStore.ts +++ b/src/stores/assetsStore.ts @@ -17,6 +17,11 @@ import { } from '@/platform/assets/services/assetService' import type { PaginationOptions } from '@/platform/assets/services/assetService' import { isCloud } from '@/platform/distribution/types' +import { fetchHistoryPage } from '@/platform/remote/comfyui/jobs/fetchJobs' +import type { + FetchHistoryPageResult, + JobsPageRequest +} from '@/platform/remote/comfyui/jobs/fetchJobs' import type { JobListItem } from '@/platform/remote/comfyui/jobs/jobTypes' import { api } from '@/scripts/api' @@ -114,8 +119,11 @@ export const useAssetsStore = defineStore('assets', () => { return deletingAssetIds.has(assetId) } - // Pagination state + // Pagination state. The cursor walks the history timeline without the + // drift offset paging has when jobs complete mid-scroll; offset remains + // as the fallback for backends that don't mint cursors. const historyOffset = ref(0) + const historyNextCursor = ref(null) const hasMoreHistory = ref(true) const isLoadingMore = ref(false) @@ -146,6 +154,59 @@ export const useAssetsStore = defineStore('assets', () => { return result } + /** + * Insert assets in sorted order (newest first), skipping already-loaded ids + */ + const mergeHistoryAssets = (newAssets: AssetItem[]) => { + for (const asset of newAssets) { + if (loadedIds.has(asset.id)) { + continue + } + loadedIds.add(asset.id) + + const assetTime = new Date(asset.created_at ?? 0).getTime() + const insertIndex = allHistoryItems.value.findIndex( + (item) => new Date(item.created_at ?? 0).getTime() < assetTime + ) + + if (insertIndex === -1) { + allHistoryItems.value.push(asset) + } else { + allHistoryItems.value.splice(insertIndex, 0, asset) + } + } + } + + const trimHistoryToLimit = () => { + if (allHistoryItems.value.length <= MAX_HISTORY_ITEMS) return + + const removed = allHistoryItems.value.slice(MAX_HISTORY_ITEMS) + allHistoryItems.value = allHistoryItems.value.slice(0, MAX_HISTORY_ITEMS) + removed.forEach((item) => loadedIds.delete(item.id)) + } + + const fetchHistoryJobsPage = (page: JobsPageRequest) => + fetchHistoryPage(api.fetchApi.bind(api), BATCH_SIZE, page) + + // Invalidates in-flight history fetches whenever the list is replaced, so + // a stale continuation can't merge into (or move the cursor of) the new walk. + let historyFetchEpoch = 0 + + const fetchHistoryPageWithCursorRecovery = async ( + after: string | null + ): Promise => { + if (!after) return fetchHistoryJobsPage({ offset: historyOffset.value }) + try { + return await fetchHistoryJobsPage({ after }) + } catch (err) { + // A cursor can go stale (e.g. server restart rejects it with 400 + // INVALID_CURSOR); resume from the offset fallback instead. + console.warn('Cursor history page failed, retrying via offset:', err) + historyNextCursor.value = null + return fetchHistoryJobsPage({ offset: historyOffset.value }) + } + } + /** * Fetch history assets with pagination support * @param loadMore - true for pagination (append), false for initial load (replace) @@ -153,59 +214,36 @@ export const useAssetsStore = defineStore('assets', () => { const fetchHistoryAssets = async (loadMore = false): Promise => { // Reset state for initial load if (!loadMore) { + historyFetchEpoch += 1 historyOffset.value = 0 + historyNextCursor.value = null hasMoreHistory.value = true allHistoryItems.value = [] loadedIds.clear() } - // Fetch from server with offset - const history = await api.getHistory(BATCH_SIZE, { - offset: historyOffset.value - }) + const epoch = historyFetchEpoch + const page = await fetchHistoryPageWithCursorRecovery( + loadMore ? historyNextCursor.value : null + ) + if (epoch !== historyFetchEpoch) return allHistoryItems.value - // Convert JobListItems to AssetItems - const newAssets = mapHistoryToAssets(history) + const newAssets = mapHistoryToAssets(page.jobs) if (loadMore) { - // Filter out duplicates and insert in sorted order - for (const asset of newAssets) { - if (loadedIds.has(asset.id)) { - continue // Skip duplicates - } - loadedIds.add(asset.id) - - // Find insertion index to maintain sorted order (newest first) - const assetTime = new Date(asset.created_at ?? 0).getTime() - const insertIndex = allHistoryItems.value.findIndex( - (item) => new Date(item.created_at ?? 0).getTime() < assetTime - ) - - if (insertIndex === -1) { - // Asset is oldest, append to end - allHistoryItems.value.push(asset) - } else { - // Insert at the correct position - allHistoryItems.value.splice(insertIndex, 0, asset) - } - } + mergeHistoryAssets(newAssets) } else { - // Initial load: replace all allHistoryItems.value = newAssets newAssets.forEach((asset) => loadedIds.add(asset.id)) } - // Update pagination state - historyOffset.value += BATCH_SIZE - hasMoreHistory.value = history.length === BATCH_SIZE + // The server mints next_cursor even on offset-mode requests, so the walk + // upgrades to keyset paging after the first page that returns one. + historyOffset.value += page.jobs.length + historyNextCursor.value = page.nextCursor ?? null + hasMoreHistory.value = page.hasMore - if (allHistoryItems.value.length > MAX_HISTORY_ITEMS) { - const removed = allHistoryItems.value.slice(MAX_HISTORY_ITEMS) - allHistoryItems.value = allHistoryItems.value.slice(0, MAX_HISTORY_ITEMS) - - // Clean up Set - removed.forEach((item) => loadedIds.delete(item.id)) - } + trimHistoryToLimit() return allHistoryItems.value } @@ -260,6 +298,62 @@ export const useAssetsStore = defineStore('assets', () => { } } + /** + * A head page with no further rows spans the whole timeline, so replacing + * local state with it also prunes jobs deleted server-side (e.g. after the + * queue history is cleared from another surface). + */ + const replaceHistoryWithHeadPage = (page: FetchHistoryPageResult) => { + historyFetchEpoch += 1 + const newAssets = mapHistoryToAssets(page.jobs) + allHistoryItems.value = newAssets + loadedIds.clear() + newAssets.forEach((asset) => loadedIds.add(asset.id)) + historyOffset.value = page.jobs.length + historyNextCursor.value = page.nextCursor ?? null + hasMoreHistory.value = page.hasMore + } + + /** + * Merge newly completed jobs into the top of the list without resetting + * pagination state, so items loaded via infinite scroll survive the + * refresh. Cursors only walk toward older items, so new completions are + * picked up by re-fetching the head page and deduplicating. + */ + const refreshHistoryHead = async () => { + if (!allHistoryItems.value.length) { + await updateHistory() + return + } + + historyError.value = null + try { + const epoch = historyFetchEpoch + const page = await fetchHistoryJobsPage({ offset: 0 }) + if (epoch !== historyFetchEpoch) return + + const reachesLoadedItems = page.jobs.some((job) => loadedIds.has(job.id)) + if (page.hasMore && !reachesLoadedItems) { + // More than a page of new jobs arrived since the last refresh; + // merging would leave a hole the cursor walk can never reach back + // to fill, so restart the walk from the top instead. + await updateHistory() + return + } + + if (page.hasMore) { + mergeHistoryAssets(mapHistoryToAssets(page.jobs)) + trimHistoryToLimit() + } else { + replaceHistoryWithHeadPage(page) + } + historyAssets.value = allHistoryItems.value + } catch (err) { + console.error('Error refreshing history:', err) + historyError.value = err + } + } + const flatOutputAssets = ref([]) const flatOutputLoading = ref(false) const flatOutputError = ref(null) @@ -873,6 +967,7 @@ export const useAssetsStore = defineStore('assets', () => { updateInputs, updateHistory, loadMoreHistory, + refreshHistoryHead, setAssetPreview, // Flat output assets (cloud-only, tag-based) diff --git a/src/views/GraphView.test.ts b/src/views/GraphView.test.ts index a1fc5242c21..3df5fcd0d51 100644 --- a/src/views/GraphView.test.ts +++ b/src/views/GraphView.test.ts @@ -100,7 +100,10 @@ vi.mock('@/composables/useAppMode', () => ({ useAppMode: () => ({ isBuilderMode: ref(false) }) })) vi.mock('@/stores/assetsStore', () => ({ - useAssetsStore: () => ({ updateHistory: vi.fn() }) + useAssetsStore: () => ({ + updateHistory: vi.fn(), + refreshHistoryHead: vi.fn() + }) })) vi.mock('@/stores/commandStore', () => ({ useCommandStore: () => ({ registerCommands: vi.fn() }) diff --git a/src/views/GraphView.vue b/src/views/GraphView.vue index 24c7eaedc04..e6a0eeeec56 100644 --- a/src/views/GraphView.vue +++ b/src/views/GraphView.vue @@ -238,7 +238,7 @@ const onStatus = async (e: CustomEvent) => { // Only update assets if the assets sidebar is currently open // When sidebar is closed, AssetsSidebarTab.vue will refresh on mount if (sidebarTabStore.activeSidebarTabId === 'assets' || linearMode.value) { - await assetsStore.updateHistory() + await assetsStore.refreshHistoryHead() } } @@ -247,7 +247,7 @@ const onExecutionSuccess = async () => { // Only update assets if the assets sidebar is currently open // When sidebar is closed, AssetsSidebarTab.vue will refresh on mount if (sidebarTabStore.activeSidebarTabId === 'assets' || linearMode.value) { - await assetsStore.updateHistory() + await assetsStore.refreshHistoryHead() } } From 1f810a1373e4564386a4abbd8917e788ef36e48c Mon Sep 17 00:00:00 2001 From: Matt Miller Date: Wed, 10 Jun 2026 12:43:51 -0700 Subject: [PATCH 07/17] fix(assets): harden cursor pagination against review findings MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Applies verified findings from multi-model review of #12769: - next_cursor accepts null (z.string().nullish()) so a backend serializing absent-as-null can't crash the whole history fetch at the zod boundary - JobsApiError carries the HTTP status and response body; the store's cursor recovery now drops the cursor for the offset fallback only on a 400 (stale/rejected cursor) and only while its walk is still current — transient failures and superseded continuations propagate so a valid cursor is never discarded - an empty page without a cursor is terminal (offset paging cannot advance by zero), while an empty page that still mints a cursor keeps walking - refreshHistoryHead coalesces event bursts into the in-flight refresh plus exactly one trailing refresh, so a job completing mid-refresh is picked up instead of being satisfied by a response dispatched before it existed --- .../remote/comfyui/jobs/fetchJobs.test.ts | 29 ++- src/platform/remote/comfyui/jobs/fetchJobs.ts | 18 +- src/platform/remote/comfyui/jobs/jobTypes.ts | 2 +- src/stores/assetsStore.test.ts | 180 +++++++++++++++++- src/stores/assetsStore.ts | 57 +++++- 5 files changed, 265 insertions(+), 21 deletions(-) diff --git a/src/platform/remote/comfyui/jobs/fetchJobs.test.ts b/src/platform/remote/comfyui/jobs/fetchJobs.test.ts index 076c2bb2320..d587f5f9a96 100644 --- a/src/platform/remote/comfyui/jobs/fetchJobs.test.ts +++ b/src/platform/remote/comfyui/jobs/fetchJobs.test.ts @@ -1,6 +1,7 @@ import { describe, expect, it, vi } from 'vitest' import { + JobsApiError, extractWorkflow, fetchHistory, fetchHistoryPage, @@ -142,13 +143,35 @@ describe('fetchJobs', () => { await expect(fetchHistory(mockFetch)).rejects.toThrow('Network error') }) - it('throws on non-ok response', async () => { + it('throws a JobsApiError carrying status and body on non-ok response', async () => { const mockFetch = vi.fn().mockResolvedValue({ ok: false, - status: 400 + status: 400, + text: () => + Promise.resolve('{"error":"Invalid cursor","code":"INVALID_CURSOR"}') }) - await expect(fetchHistory(mockFetch)).rejects.toThrow('400') + await expect(fetchHistory(mockFetch)).rejects.toBeInstanceOf(JobsApiError) + await expect(fetchHistory(mockFetch)).rejects.toMatchObject({ + status: 400, + message: expect.stringContaining('INVALID_CURSOR') + }) + }) + + it('parses a null next_cursor as absent', async () => { + const mockFetch = vi.fn().mockResolvedValue({ + ok: true, + json: () => + Promise.resolve( + createMockResponse([createMockJob('job1', 'completed')], 1, { + next_cursor: null + }) + ) + }) + + const result = await fetchHistoryPage(mockFetch, 200, { offset: 0 }) + + expect(result.nextCursor).toBeUndefined() }) it('parses batch containing text-only preview outputs', async () => { diff --git a/src/platform/remote/comfyui/jobs/fetchJobs.ts b/src/platform/remote/comfyui/jobs/fetchJobs.ts index b41c0211434..cbccf3e4147 100644 --- a/src/platform/remote/comfyui/jobs/fetchJobs.ts +++ b/src/platform/remote/comfyui/jobs/fetchJobs.ts @@ -29,6 +29,20 @@ export interface JobsPageRequest { after?: string } +/** + * Non-ok response from the jobs API. Carries the HTTP status so callers can + * tell a rejected cursor (400 INVALID_CURSOR) apart from transient failures. + */ +export class JobsApiError extends Error { + constructor( + readonly status: number, + body: string + ) { + super(`[Jobs API] Failed to fetch jobs: ${status} ${body}`.trim()) + this.name = 'JobsApiError' + } +} + interface FetchJobsRawResult { jobs: RawJobListItem[] total: number @@ -66,7 +80,7 @@ async function fetchJobsRaw( const url = `/jobs?status=${statusParam}&limit=${maxItems}&${pageParam}` const res = await fetchApi(url) if (!res.ok) { - throw new Error(`[Jobs API] Failed to fetch jobs: ${res.status}`) + throw new JobsApiError(res.status, await res.text().catch(() => '')) } const data = zJobsListResponse.parse(await res.json()) return { @@ -75,7 +89,7 @@ async function fetchJobsRaw( offset: data.pagination.offset, limit: data.pagination.limit, hasMore: data.pagination.has_more, - nextCursor: data.pagination.next_cursor + nextCursor: data.pagination.next_cursor ?? undefined } } diff --git a/src/platform/remote/comfyui/jobs/jobTypes.ts b/src/platform/remote/comfyui/jobs/jobTypes.ts index 6ea88b13643..c0ffdab4982 100644 --- a/src/platform/remote/comfyui/jobs/jobTypes.ts +++ b/src/platform/remote/comfyui/jobs/jobTypes.ts @@ -88,7 +88,7 @@ const zPaginationInfo = z.object({ limit: z.number(), total: z.number(), has_more: z.boolean(), - next_cursor: z.string().optional() + next_cursor: z.string().nullish() }) export const zJobsListResponse = z.object({ diff --git a/src/stores/assetsStore.test.ts b/src/stores/assetsStore.test.ts index d85bd90625c..e7b49a318c2 100644 --- a/src/stores/assetsStore.test.ts +++ b/src/stores/assetsStore.test.ts @@ -5,7 +5,11 @@ import { nextTick, watch } from 'vue' import { useAssetsStore } from '@/stores/assetsStore' import type { AssetItem } from '@/platform/assets/schemas/assetSchema' -import { fetchHistoryPage } from '@/platform/remote/comfyui/jobs/fetchJobs' +import { + JobsApiError, + fetchHistoryPage +} from '@/platform/remote/comfyui/jobs/fetchJobs' +import type * as fetchJobsModule from '@/platform/remote/comfyui/jobs/fetchJobs' import type { FetchHistoryPageResult } from '@/platform/remote/comfyui/jobs/fetchJobs' import type { JobListItem } from '@/platform/remote/comfyui/jobs/jobTypes' import { assetService } from '@/platform/assets/services/assetService' @@ -22,8 +26,10 @@ vi.mock('@/scripts/api', () => ({ } })) -// Mock the jobs API fetcher used for history pagination -vi.mock('@/platform/remote/comfyui/jobs/fetchJobs', () => ({ +// Mock the jobs API fetcher used for history pagination, keeping the real +// JobsApiError class so the store's instanceof narrowing stays meaningful +vi.mock('@/platform/remote/comfyui/jobs/fetchJobs', async (importOriginal) => ({ + ...(await importOriginal()), fetchHistoryPage: vi.fn() })) @@ -613,7 +619,7 @@ describe('assetsStore - Refactored (Option A)', () => { nextCursor: 'cursor-stale' }) ) - .mockRejectedValueOnce(new Error('400 INVALID_CURSOR')) + .mockRejectedValueOnce(new JobsApiError(400, 'INVALID_CURSOR')) .mockResolvedValueOnce( mockHistoryPage( Array.from({ length: 10 }, (_, i) => createMockJobItem(10 + i)), @@ -661,7 +667,7 @@ describe('assetsStore - Refactored (Option A)', () => { nextCursor: 'cursor-stale' }) ) - .mockRejectedValueOnce(new Error('400 INVALID_CURSOR')) + .mockRejectedValueOnce(new JobsApiError(400, 'INVALID_CURSOR')) .mockRejectedValueOnce(new Error('network down')) await store.updateHistory() @@ -687,6 +693,121 @@ describe('assetsStore - Refactored (Option A)', () => { expect(store.historyAssets).toHaveLength(15) }) + it('preserves the cursor when a transient error rejects the page', async () => { + vi.mocked(fetchHistoryPage) + .mockResolvedValueOnce( + mockHistoryPage( + Array.from({ length: 10 }, (_, i) => createMockJobItem(i)), + { hasMore: true, nextCursor: 'cursor-1' } + ) + ) + .mockRejectedValueOnce(new JobsApiError(500, 'server error')) + + await store.updateHistory() + await store.loadMoreHistory() + + // No offset fallback for transient failures, just a recorded error + expect(fetchHistoryPage).toHaveBeenCalledTimes(2) + expect(store.historyError).toBeInstanceOf(Error) + expect(store.hasMoreHistory).toBe(true) + + // The still-valid cursor is retried on the next attempt + vi.mocked(fetchHistoryPage).mockResolvedValueOnce(mockHistoryPage([])) + await store.loadMoreHistory() + expect(fetchHistoryPage).toHaveBeenLastCalledWith( + expect.any(Function), + 200, + { after: 'cursor-1' } + ) + }) + + it('treats a cursorless empty page as terminal even if the server claims more', async () => { + vi.mocked(fetchHistoryPage).mockResolvedValueOnce( + mockHistoryPage([], { hasMore: true }) + ) + + await store.updateHistory() + + expect(store.hasMoreHistory).toBe(false) + await store.loadMoreHistory() + expect(fetchHistoryPage).toHaveBeenCalledTimes(1) + }) + + it('keeps paging when an empty page still mints a cursor', async () => { + vi.mocked(fetchHistoryPage) + .mockResolvedValueOnce( + mockHistoryPage([], { hasMore: true, nextCursor: 'cursor-skip' }) + ) + .mockResolvedValueOnce(mockHistoryPage([createMockJobItem(0)])) + + await store.updateHistory() + expect(store.hasMoreHistory).toBe(true) + + await store.loadMoreHistory() + expect(fetchHistoryPage).toHaveBeenNthCalledWith( + 2, + expect.any(Function), + 200, + { after: 'cursor-skip' } + ) + }) + + it('keeps paging when a page of jobs maps to no displayable assets', async () => { + const failedJobs = Array.from({ length: 3 }, (_, i) => ({ + ...createMockJobItem(i), + status: 'failed' as const, + preview_output: null + })) + vi.mocked(fetchHistoryPage).mockResolvedValueOnce( + mockHistoryPage(failedJobs, { hasMore: true }) + ) + + await store.updateHistory() + + expect(store.historyAssets).toHaveLength(0) + expect(store.hasMoreHistory).toBe(true) + }) + + it('does not let a stale rejected continuation drop the new walk cursor', async () => { + vi.mocked(fetchHistoryPage).mockResolvedValueOnce( + mockHistoryPage( + Array.from({ length: 10 }, (_, i) => createMockJobItem(i)), + { hasMore: true, nextCursor: 'cursor-old' } + ) + ) + await store.updateHistory() + + let rejectStale: (err: Error) => void + vi.mocked(fetchHistoryPage).mockReturnValueOnce( + new Promise((_, reject) => { + rejectStale = reject + }) + ) + const staleLoad = store.loadMoreHistory() + + vi.mocked(fetchHistoryPage).mockResolvedValueOnce( + mockHistoryPage([createMockJobItem(100)], { + hasMore: true, + nextCursor: 'cursor-fresh' + }) + ) + await store.updateHistory() + + rejectStale!(new JobsApiError(400, 'INVALID_CURSOR')) + await staleLoad + + // The superseded walk neither nulled the fresh cursor nor fired an + // offset retry against the new walk + expect(fetchHistoryPage).toHaveBeenCalledTimes(3) + vi.mocked(fetchHistoryPage).mockResolvedValueOnce(mockHistoryPage([])) + await store.loadMoreHistory() + expect(fetchHistoryPage).toHaveBeenLastCalledWith( + expect.any(Function), + 200, + { after: 'cursor-fresh' } + ) + }) + it('discards a stale loadMore continuation that resolves after a reset', async () => { const firstBatch = Array.from({ length: 10 }, (_, i) => createMockJobItem(i) @@ -851,6 +972,55 @@ describe('assetsStore - Refactored (Option A)', () => { ) }) + it('coalesces a burst into a leading fetch plus one trailing refresh', async () => { + vi.mocked(fetchHistoryPage).mockResolvedValueOnce( + mockHistoryPage([createMockJobItem(0)], { + hasMore: true, + nextCursor: 'cursor-1' + }) + ) + await store.updateHistory() + + let resolveLeading: (page: FetchHistoryPageResult) => void + vi.mocked(fetchHistoryPage).mockReturnValueOnce( + new Promise((resolve) => { + resolveLeading = resolve + }) + ) + + const first = store.refreshHistoryHead() + const second = store.refreshHistoryHead() + const third = store.refreshHistoryHead() + + // The trailing refresh re-fetches the head and sees the job the + // leading response was dispatched too early to include + vi.mocked(fetchHistoryPage).mockResolvedValueOnce( + mockHistoryPage([createMockJobItem(0), createMockJobItem(1)], { + hasMore: true + }) + ) + resolveLeading!( + mockHistoryPage([createMockJobItem(0)], { hasMore: true }) + ) + await Promise.all([first, second, third]) + + // Initial load + leading head fetch + exactly one trailing head fetch + expect(fetchHistoryPage).toHaveBeenCalledTimes(3) + expect(store.historyAssets).toHaveLength(2) + }) + + it('runs a fresh fetch for sequential refreshes', async () => { + vi.mocked(fetchHistoryPage).mockResolvedValue( + mockHistoryPage([createMockJobItem(0)], { hasMore: true }) + ) + await store.updateHistory() + + await store.refreshHistoryHead() + await store.refreshHistoryHead() + + expect(fetchHistoryPage).toHaveBeenCalledTimes(3) + }) + it('prunes server-side deletions when the head page spans the whole timeline', async () => { vi.mocked(fetchHistoryPage) .mockResolvedValueOnce( diff --git a/src/stores/assetsStore.ts b/src/stores/assetsStore.ts index fb1174ace10..f8fe3686dcf 100644 --- a/src/stores/assetsStore.ts +++ b/src/stores/assetsStore.ts @@ -17,7 +17,10 @@ import { } from '@/platform/assets/services/assetService' import type { PaginationOptions } from '@/platform/assets/services/assetService' import { isCloud } from '@/platform/distribution/types' -import { fetchHistoryPage } from '@/platform/remote/comfyui/jobs/fetchJobs' +import { + JobsApiError, + fetchHistoryPage +} from '@/platform/remote/comfyui/jobs/fetchJobs' import type { FetchHistoryPageResult, JobsPageRequest @@ -192,16 +195,24 @@ export const useAssetsStore = defineStore('assets', () => { // a stale continuation can't merge into (or move the cursor of) the new walk. let historyFetchEpoch = 0 + const isRejectedCursorError = (err: unknown): boolean => + err instanceof JobsApiError && err.status === 400 + const fetchHistoryPageWithCursorRecovery = async ( - after: string | null + after: string | null, + epoch: number ): Promise => { if (!after) return fetchHistoryJobsPage({ offset: historyOffset.value }) try { return await fetchHistoryJobsPage({ after }) } catch (err) { - // A cursor can go stale (e.g. server restart rejects it with 400 - // INVALID_CURSOR); resume from the offset fallback instead. - console.warn('Cursor history page failed, retrying via offset:', err) + // Only a rejected cursor (e.g. gone stale across a server restart) + // warrants dropping it for the offset fallback; transient failures + // propagate so the still-valid cursor is retried on the next attempt. + // A continuation from a superseded walk must also propagate — its + // recovery would null the cursor the newer walk just minted. + if (!isRejectedCursorError(err) || epoch !== historyFetchEpoch) throw err + console.warn('Stale history cursor rejected, resuming via offset:', err) historyNextCursor.value = null return fetchHistoryJobsPage({ offset: historyOffset.value }) } @@ -224,7 +235,8 @@ export const useAssetsStore = defineStore('assets', () => { const epoch = historyFetchEpoch const page = await fetchHistoryPageWithCursorRecovery( - loadMore ? historyNextCursor.value : null + loadMore ? historyNextCursor.value : null, + epoch ) if (epoch !== historyFetchEpoch) return allHistoryItems.value @@ -238,10 +250,14 @@ export const useAssetsStore = defineStore('assets', () => { } // The server mints next_cursor even on offset-mode requests, so the walk - // upgrades to keyset paging after the first page that returns one. + // upgrades to keyset paging after the first page that returns one. An + // empty page without a cursor is terminal regardless of has_more — + // offset paging advances by jobs.length, so it would refetch the same + // page forever; a minted cursor still makes progress. historyOffset.value += page.jobs.length historyNextCursor.value = page.nextCursor ?? null - hasMoreHistory.value = page.hasMore + hasMoreHistory.value = + page.hasMore && (page.jobs.length > 0 || page.nextCursor != null) trimHistoryToLimit() @@ -314,13 +330,34 @@ export const useAssetsStore = defineStore('assets', () => { hasMoreHistory.value = page.hasMore } + let headRefreshInFlight: Promise | null = null + let headRefreshTrailing: Promise | null = null + /** * Merge newly completed jobs into the top of the list without resetting * pagination state, so items loaded via infinite scroll survive the * refresh. Cursors only walk toward older items, so new completions are - * picked up by re-fetching the head page and deduplicating. + * picked up by re-fetching the head page and deduplicating. Bursts of + * status events share the in-flight refresh, and a call arriving + * mid-flight schedules exactly one trailing refresh — the shared response + * was dispatched before that caller's event, so it could miss the very + * completion the caller is reacting to. */ - const refreshHistoryHead = async () => { + const refreshHistoryHead = (): Promise => { + if (!headRefreshInFlight) { + headRefreshInFlight = doRefreshHistoryHead().finally(() => { + headRefreshInFlight = null + }) + return headRefreshInFlight + } + headRefreshTrailing ??= headRefreshInFlight.then(() => { + headRefreshTrailing = null + return refreshHistoryHead() + }) + return headRefreshTrailing + } + + const doRefreshHistoryHead = async () => { if (!allHistoryItems.value.length) { await updateHistory() return From 69a4d78cba6a39f7f9226bd393fb5e489724f496 Mon Sep 17 00:00:00 2001 From: Matt Miller Date: Tue, 16 Jun 2026 13:33:08 -0700 Subject: [PATCH 08/17] fix(assets): guard the history jobs walk against a non-advancing cursor MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Mirror the flat-output walk's stuck-cursor guard so a backend that returns the same next_cursor it was given (with has_more:true and a non-empty page) can't loop the history walk forever. Without this guard, mergeHistoryAssets dedupes every already-seen row, historyNextCursor stays set to the stuck value, and hasMoreHistory remains true because jobs.length > 0 — causing every subsequent loadMoreHistory to re-fetch the identical page indefinitely. Fix: capture requestedAfter before the fetch, detect cursorStuck when page.nextCursor equals it, and in that case force hasMoreHistory=false and drop the cursor. The initial (!loadMore) path sets requestedAfter to null, so cursorStuck is always false there — no regression to the happy path. Add a regression test that simulates the stuck-cursor shape and asserts no further fetch is issued after the guard fires. --- src/stores/assetsStore.test.ts | 32 ++++++++++++++++++++++++++++++++ src/stores/assetsStore.ts | 19 +++++++++++++------ 2 files changed, 45 insertions(+), 6 deletions(-) diff --git a/src/stores/assetsStore.test.ts b/src/stores/assetsStore.test.ts index e7b49a318c2..b31864f9345 100644 --- a/src/stores/assetsStore.test.ts +++ b/src/stores/assetsStore.test.ts @@ -854,6 +854,38 @@ describe('assetsStore - Refactored (Option A)', () => { { after: 'cursor-fresh' } ) }) + + it('terminates the walk when the backend returns the same cursor it was given (stuck cursor)', async () => { + // Page 1: initial load mints cursor-1 + const firstBatch = Array.from({ length: 10 }, (_, i) => + createMockJobItem(i) + ) + vi.mocked(fetchHistoryPage).mockResolvedValueOnce( + mockHistoryPage(firstBatch, { hasMore: true, nextCursor: 'cursor-1' }) + ) + await store.updateHistory() + expect(store.hasMoreHistory).toBe(true) + + // Page 2: backend echoes back cursor-1 (same as the requested after), + // with has_more:true and a non-empty page — the stuck-cursor shape. + // Without the guard, mergeHistoryAssets dedupes every row and + // hasMoreHistory stays true, causing an infinite spin. + const stuckPage = Array.from( + { length: 10 }, + (_, i) => createMockJobItem(i) // same ids as page 1 → all deduped + ) + vi.mocked(fetchHistoryPage).mockResolvedValueOnce( + mockHistoryPage(stuckPage, { hasMore: true, nextCursor: 'cursor-1' }) + ) + await store.loadMoreHistory() + + // Guard must have fired: hasMoreHistory forced off, cursor dropped + expect(store.hasMoreHistory).toBe(false) + + // A subsequent loadMoreHistory must not issue another fetch + await store.loadMoreHistory() + expect(fetchHistoryPage).toHaveBeenCalledTimes(2) + }) }) describe('refreshHistoryHead', () => { diff --git a/src/stores/assetsStore.ts b/src/stores/assetsStore.ts index f8fe3686dcf..27e6123972e 100644 --- a/src/stores/assetsStore.ts +++ b/src/stores/assetsStore.ts @@ -234,10 +234,8 @@ export const useAssetsStore = defineStore('assets', () => { } const epoch = historyFetchEpoch - const page = await fetchHistoryPageWithCursorRecovery( - loadMore ? historyNextCursor.value : null, - epoch - ) + const requestedAfter = loadMore ? historyNextCursor.value : null + const page = await fetchHistoryPageWithCursorRecovery(requestedAfter, epoch) if (epoch !== historyFetchEpoch) return allHistoryItems.value const newAssets = mapHistoryToAssets(page.jobs) @@ -254,10 +252,19 @@ export const useAssetsStore = defineStore('assets', () => { // empty page without a cursor is terminal regardless of has_more — // offset paging advances by jobs.length, so it would refetch the same // page forever; a minted cursor still makes progress. + // + // Guard against a non-advancing cursor: if the backend echoes back the + // same cursor it was given (with has_more:true and a non-empty page), + // deduplication would discard every row and the walk would spin forever. + // Treat a stuck cursor as terminal — drop it and force hasMoreHistory off. + const cursorStuck = + page.nextCursor != null && page.nextCursor === requestedAfter historyOffset.value += page.jobs.length - historyNextCursor.value = page.nextCursor ?? null + historyNextCursor.value = cursorStuck ? null : (page.nextCursor ?? null) hasMoreHistory.value = - page.hasMore && (page.jobs.length > 0 || page.nextCursor != null) + page.hasMore && + !cursorStuck && + (page.jobs.length > 0 || page.nextCursor != null) trimHistoryToLimit() From f318718521e8088cafd6c9575dc215e5dba87a48 Mon Sep 17 00:00:00 2001 From: Matt Miller Date: Wed, 17 Jun 2026 10:36:18 -0700 Subject: [PATCH 09/17] docs(assets): add JSDoc to assetsStore per review --- src/stores/assetsStore.ts | 29 ++++++++++++++++------------- 1 file changed, 16 insertions(+), 13 deletions(-) diff --git a/src/stores/assetsStore.ts b/src/stores/assetsStore.ts index 27e6123972e..d3f13f6f715 100644 --- a/src/stores/assetsStore.ts +++ b/src/stores/assetsStore.ts @@ -219,11 +219,24 @@ export const useAssetsStore = defineStore('assets', () => { } /** - * Fetch history assets with pagination support - * @param loadMore - true for pagination (append), false for initial load (replace) + * Fetch one page of history assets and update reactive state. + * + * Pagination model: the server starts in offset mode and mints a + * `next_cursor` on any page that has one; subsequent requests pass that + * cursor (keyset mode). The walk upgrades automatically — offset paging is + * only used until the first cursor is received. + * + * An empty page with no cursor is treated as terminal regardless of + * `has_more`, because offset paging would refetch the same page forever. + * A cursor that hasn't advanced (the server echoed back the value it was + * given) is also treated as terminal to prevent an infinite dedup loop. + * + * @param loadMore - When `true`, appends the next page to the existing list + * (infinite-scroll continuation). When `false` (default), resets all + * pagination state and replaces the list with the first page. + * @returns The current accumulated list of history asset items. */ const fetchHistoryAssets = async (loadMore = false): Promise => { - // Reset state for initial load if (!loadMore) { historyFetchEpoch += 1 historyOffset.value = 0 @@ -247,16 +260,6 @@ export const useAssetsStore = defineStore('assets', () => { newAssets.forEach((asset) => loadedIds.add(asset.id)) } - // The server mints next_cursor even on offset-mode requests, so the walk - // upgrades to keyset paging after the first page that returns one. An - // empty page without a cursor is terminal regardless of has_more — - // offset paging advances by jobs.length, so it would refetch the same - // page forever; a minted cursor still makes progress. - // - // Guard against a non-advancing cursor: if the backend echoes back the - // same cursor it was given (with has_more:true and a non-empty page), - // deduplication would discard every row and the walk would spin forever. - // Treat a stuck cursor as terminal — drop it and force hasMoreHistory off. const cursorStuck = page.nextCursor != null && page.nextCursor === requestedAfter historyOffset.value += page.jobs.length From 753b0b4c9b23beb1f48813da9069ff9e8f098f69 Mon Sep 17 00:00:00 2001 From: mattmiller Date: Thu, 18 Jun 2026 19:36:22 -0700 Subject: [PATCH 10/17] fix(assets): harden epoch guards and truncate JobsApiError body - Hoist epoch snapshot to before the try block in doRefreshHistoryHead so the catch can check it and suppress stale failures - Same pattern in loadMoreHistory: captures epoch before the await so a superseded loadMore that throws doesn't overwrite the healthy walk's error state - Truncate JobsApiError body at 200 chars to avoid unbounded memory reads and leaking raw backend error detail into logs/error trackers - Replace truthy cursor check (page.after ? ...) with != null in both fetchJobsRaw and fetchHistoryPageWithCursorRecovery so an empty-string cursor doesn't silently fall back to offset mode --- src/platform/remote/comfyui/jobs/fetchJobs.ts | 15 +++++++++++---- src/stores/assetsStore.ts | 11 +++++------ 2 files changed, 16 insertions(+), 10 deletions(-) diff --git a/src/platform/remote/comfyui/jobs/fetchJobs.ts b/src/platform/remote/comfyui/jobs/fetchJobs.ts index cbccf3e4147..16cd1d749ba 100644 --- a/src/platform/remote/comfyui/jobs/fetchJobs.ts +++ b/src/platform/remote/comfyui/jobs/fetchJobs.ts @@ -33,12 +33,18 @@ export interface JobsPageRequest { * Non-ok response from the jobs API. Carries the HTTP status so callers can * tell a rejected cursor (400 INVALID_CURSOR) apart from transient failures. */ +const MAX_ERROR_BODY_LENGTH = 200 + export class JobsApiError extends Error { constructor( readonly status: number, body: string ) { - super(`[Jobs API] Failed to fetch jobs: ${status} ${body}`.trim()) + const truncated = + body.length > MAX_ERROR_BODY_LENGTH + ? `${body.slice(0, MAX_ERROR_BODY_LENGTH)}…` + : body + super(`[Jobs API] Failed to fetch jobs: ${status} ${truncated}`.trim()) this.name = 'JobsApiError' } } @@ -74,9 +80,10 @@ async function fetchJobsRaw( page: JobsPageRequest = {} ): Promise { const statusParam = statuses.join(',') - const pageParam = page.after - ? `after=${encodeURIComponent(page.after)}` - : `offset=${page.offset ?? 0}` + const pageParam = + page.after != null + ? `after=${encodeURIComponent(page.after)}` + : `offset=${page.offset ?? 0}` const url = `/jobs?status=${statusParam}&limit=${maxItems}&${pageParam}` const res = await fetchApi(url) if (!res.ok) { diff --git a/src/stores/assetsStore.ts b/src/stores/assetsStore.ts index d3f13f6f715..12358df3d4f 100644 --- a/src/stores/assetsStore.ts +++ b/src/stores/assetsStore.ts @@ -202,7 +202,7 @@ export const useAssetsStore = defineStore('assets', () => { after: string | null, epoch: number ): Promise => { - if (!after) return fetchHistoryJobsPage({ offset: historyOffset.value }) + if (after == null) return fetchHistoryJobsPage({ offset: historyOffset.value }) try { return await fetchHistoryJobsPage({ after }) } catch (err) { @@ -309,13 +309,14 @@ export const useAssetsStore = defineStore('assets', () => { isLoadingMore.value = true historyError.value = null + const epoch = historyFetchEpoch try { await fetchHistoryAssets(true) historyAssets.value = allHistoryItems.value } catch (err) { + if (epoch !== historyFetchEpoch) return console.error('Error loading more history:', err) historyError.value = err - // Keep existing data when error occurs (consistent with updateHistory) if (!historyAssets.value.length) { historyAssets.value = [] } @@ -374,16 +375,13 @@ export const useAssetsStore = defineStore('assets', () => { } historyError.value = null + const epoch = historyFetchEpoch try { - const epoch = historyFetchEpoch const page = await fetchHistoryJobsPage({ offset: 0 }) if (epoch !== historyFetchEpoch) return const reachesLoadedItems = page.jobs.some((job) => loadedIds.has(job.id)) if (page.hasMore && !reachesLoadedItems) { - // More than a page of new jobs arrived since the last refresh; - // merging would leave a hole the cursor walk can never reach back - // to fill, so restart the walk from the top instead. await updateHistory() return } @@ -396,6 +394,7 @@ export const useAssetsStore = defineStore('assets', () => { } historyAssets.value = allHistoryItems.value } catch (err) { + if (epoch !== historyFetchEpoch) return console.error('Error refreshing history:', err) historyError.value = err } From 4290619af0f342a9b79b48291b1a7e968cb296a0 Mon Sep 17 00:00:00 2001 From: GitHub Action Date: Fri, 19 Jun 2026 02:40:07 +0000 Subject: [PATCH 11/17] [automated] Apply ESLint and Oxfmt fixes --- src/stores/assetsStore.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/stores/assetsStore.ts b/src/stores/assetsStore.ts index 12358df3d4f..dc901a4cc38 100644 --- a/src/stores/assetsStore.ts +++ b/src/stores/assetsStore.ts @@ -202,7 +202,8 @@ export const useAssetsStore = defineStore('assets', () => { after: string | null, epoch: number ): Promise => { - if (after == null) return fetchHistoryJobsPage({ offset: historyOffset.value }) + if (after == null) + return fetchHistoryJobsPage({ offset: historyOffset.value }) try { return await fetchHistoryJobsPage({ after }) } catch (err) { From 89fdbcd9137d359c5a87ab08fac7622535cfe370 Mon Sep 17 00:00:00 2001 From: Matt Miller Date: Mon, 22 Jun 2026 13:22:16 -0700 Subject: [PATCH 12/17] Update src/stores/assetsStore.ts Co-authored-by: Alexander Brown --- src/stores/assetsStore.ts | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/src/stores/assetsStore.ts b/src/stores/assetsStore.ts index dc901a4cc38..75c94153ca8 100644 --- a/src/stores/assetsStore.ts +++ b/src/stores/assetsStore.ts @@ -207,11 +207,9 @@ export const useAssetsStore = defineStore('assets', () => { try { return await fetchHistoryJobsPage({ after }) } catch (err) { - // Only a rejected cursor (e.g. gone stale across a server restart) - // warrants dropping it for the offset fallback; transient failures - // propagate so the still-valid cursor is retried on the next attempt. - // A continuation from a superseded walk must also propagate — its - // recovery would null the cursor the newer walk just minted. + // Drop only a rejected cursor (e.g. stale across a restart) to the + // offset fallback; transient failures and superseded-walk + // continuations must propagate so a valid/newer cursor isn't lost. if (!isRejectedCursorError(err) || epoch !== historyFetchEpoch) throw err console.warn('Stale history cursor rejected, resuming via offset:', err) historyNextCursor.value = null From c23814551ac7ca0bc9508de23b8943c222d3d761 Mon Sep 17 00:00:00 2001 From: Matt Miller Date: Mon, 22 Jun 2026 13:22:34 -0700 Subject: [PATCH 13/17] Update src/platform/remote/comfyui/jobs/jobTypes.ts Co-authored-by: Alexander Brown --- src/platform/remote/comfyui/jobs/jobTypes.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/platform/remote/comfyui/jobs/jobTypes.ts b/src/platform/remote/comfyui/jobs/jobTypes.ts index c0ffdab4982..a5662752c3f 100644 --- a/src/platform/remote/comfyui/jobs/jobTypes.ts +++ b/src/platform/remote/comfyui/jobs/jobTypes.ts @@ -88,7 +88,7 @@ const zPaginationInfo = z.object({ limit: z.number(), total: z.number(), has_more: z.boolean(), - next_cursor: z.string().nullish() + next_cursor: z.string().min(1).nullish() }) export const zJobsListResponse = z.object({ From 6f7686b9525115a830c088867f4e36675584f660 Mon Sep 17 00:00:00 2001 From: Matt Miller Date: Mon, 22 Jun 2026 13:22:49 -0700 Subject: [PATCH 14/17] Update src/platform/remote/comfyui/jobs/fetchJobs.ts Co-authored-by: Alexander Brown --- src/platform/remote/comfyui/jobs/fetchJobs.ts | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/platform/remote/comfyui/jobs/fetchJobs.ts b/src/platform/remote/comfyui/jobs/fetchJobs.ts index 16cd1d749ba..5dae6a2efc1 100644 --- a/src/platform/remote/comfyui/jobs/fetchJobs.ts +++ b/src/platform/remote/comfyui/jobs/fetchJobs.ts @@ -24,10 +24,9 @@ import { zJobDetail, zJobsListResponse, zWorkflowContainer } from './jobTypes' * remains as the fallback for random access and for backends that don't mint * cursors. */ -export interface JobsPageRequest { - offset?: number - after?: string -} +export type JobsPageRequest = + | { after: string; offset?: never } + | { offset?: number; after?: never } /** * Non-ok response from the jobs API. Carries the HTTP status so callers can From 6c3ead5b81f2368eae6c315c7db8ad9c9eeb5c14 Mon Sep 17 00:00:00 2001 From: Matt Miller Date: Mon, 22 Jun 2026 13:23:01 -0700 Subject: [PATCH 15/17] Update src/stores/assetsStore.ts Co-authored-by: Alexander Brown --- src/stores/assetsStore.ts | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/stores/assetsStore.ts b/src/stores/assetsStore.ts index 75c94153ca8..e1a82f6fbbc 100644 --- a/src/stores/assetsStore.ts +++ b/src/stores/assetsStore.ts @@ -122,9 +122,7 @@ export const useAssetsStore = defineStore('assets', () => { return deletingAssetIds.has(assetId) } - // Pagination state. The cursor walks the history timeline without the - // drift offset paging has when jobs complete mid-scroll; offset remains - // as the fallback for backends that don't mint cursors. + // History pagination state const historyOffset = ref(0) const historyNextCursor = ref(null) const hasMoreHistory = ref(true) From ea0f8a90408d8ab3b530ef05702973e74b3a7f1d Mon Sep 17 00:00:00 2001 From: Matt Miller Date: Mon, 22 Jun 2026 13:34:51 -0700 Subject: [PATCH 16/17] test: add body-truncation coverage and historyError-null assertion for superseded loadMore --- .../remote/comfyui/jobs/fetchJobs.test.ts | 37 ++++++++++++++----- src/stores/assetsStore.test.ts | 3 +- 2 files changed, 30 insertions(+), 10 deletions(-) diff --git a/src/platform/remote/comfyui/jobs/fetchJobs.test.ts b/src/platform/remote/comfyui/jobs/fetchJobs.test.ts index d587f5f9a96..e44e65d1ed5 100644 --- a/src/platform/remote/comfyui/jobs/fetchJobs.test.ts +++ b/src/platform/remote/comfyui/jobs/fetchJobs.test.ts @@ -158,6 +158,22 @@ describe('fetchJobs', () => { }) }) + it('truncates oversized error bodies to 200 chars in the thrown message', async () => { + const oversized = 'x'.repeat(500) + const mockFetch = vi.fn().mockResolvedValue({ + ok: false, + status: 500, + text: () => Promise.resolve(oversized) + }) + + const err = await fetchHistory(mockFetch).catch((e) => e) + expect(err).toBeInstanceOf(JobsApiError) + expect(err.message.length).toBeLessThanOrEqual( + '[Jobs API] Failed to fetch jobs: 500 '.length + 200 + 1 // +1 for the ellipsis + ) + expect(err.message).toContain('…') + }) + it('parses a null next_cursor as absent', async () => { const mockFetch = vi.fn().mockResolvedValue({ ok: true, @@ -240,16 +256,19 @@ describe('fetchJobs', () => { }) it('sends the cursor instead of offset and returns next_cursor', async () => { - const mockFetch = vi.fn().mockResolvedValue({ - ok: true, - json: () => - Promise.resolve( - createMockResponse([createMockJob('job1', 'completed')], 10, { - has_more: true, - next_cursor: 'cursor-page-2' - }) + const mockFetch = vi + .fn<(url: string) => Promise>() + .mockResolvedValue( + new Response( + JSON.stringify( + createMockResponse([createMockJob('job1', 'completed')], 10, { + has_more: true, + next_cursor: 'cursor-page-2' + }) + ), + { status: 200 } ) - }) + ) const result = await fetchHistoryPage(mockFetch, 200, { after: 'cursor-page-1' diff --git a/src/stores/assetsStore.test.ts b/src/stores/assetsStore.test.ts index b31864f9345..4e775848052 100644 --- a/src/stores/assetsStore.test.ts +++ b/src/stores/assetsStore.test.ts @@ -797,7 +797,8 @@ describe('assetsStore - Refactored (Option A)', () => { await staleLoad // The superseded walk neither nulled the fresh cursor nor fired an - // offset retry against the new walk + // offset retry against the new walk, and must not surface a spurious error + expect(store.historyError).toBeNull() expect(fetchHistoryPage).toHaveBeenCalledTimes(3) vi.mocked(fetchHistoryPage).mockResolvedValueOnce(mockHistoryPage([])) await store.loadMoreHistory() From 8b40f6a161c9ca27d2cbdb42f3a10097a0f8e8fe Mon Sep 17 00:00:00 2001 From: Matt Miller Date: Mon, 22 Jun 2026 13:50:34 -0700 Subject: [PATCH 17/17] fix: reset offset to 0 on cursor-recovery fallback to prevent page drift When a pagination cursor is rejected, fall back to offset 0 and replace the loaded list rather than using the stale client-side offset. The previous offset drifted when items were deleted server-side, causing the recovery page to skip or duplicate rows. Acceptable tradeoff: this rare fallback resets scroll position. --- src/stores/assetsStore.test.ts | 60 +++++++++++++++++++++++++++++----- src/stores/assetsStore.ts | 5 ++- 2 files changed, 56 insertions(+), 9 deletions(-) diff --git a/src/stores/assetsStore.test.ts b/src/stores/assetsStore.test.ts index 4e775848052..866931a46c1 100644 --- a/src/stores/assetsStore.test.ts +++ b/src/stores/assetsStore.test.ts @@ -608,7 +608,7 @@ describe('assetsStore - Refactored (Option A)', () => { ) }) - it('recovers from a rejected cursor page by retrying via offset', async () => { + it('recovers from a rejected cursor by restarting from offset 0 (no drift)', async () => { const firstBatch = Array.from({ length: 10 }, (_, i) => createMockJobItem(i) ) @@ -640,9 +640,10 @@ describe('assetsStore - Refactored (Option A)', () => { 3, expect.any(Function), 200, - { offset: 10 } + { offset: 0 } ) - expect(store.historyAssets).toHaveLength(20) + // List is replaced (not merged) so there are no duplicates from the reset + expect(store.historyAssets).toHaveLength(10) expect(store.historyError).toBe(null) // The recovered page minted a fresh cursor, so the walk resumes in cursor mode @@ -656,7 +657,7 @@ describe('assetsStore - Refactored (Option A)', () => { ) }) - it('keeps pagination resumable when the offset retry also fails', async () => { + it('keeps pagination resumable when the offset-0 retry also fails', async () => { const firstBatch = Array.from({ length: 10 }, (_, i) => createMockJobItem(i) ) @@ -675,12 +676,13 @@ describe('assetsStore - Refactored (Option A)', () => { expect(store.historyError).toBeInstanceOf(Error) expect(store.hasMoreHistory).toBe(true) + // historyAssets retains the last successful display state across a failed retry expect(store.historyAssets).toHaveLength(10) - // The dropped cursor means the next attempt resumes via offset + // The dropped cursor + reset offset means the next attempt restarts from 0 vi.mocked(fetchHistoryPage).mockResolvedValueOnce( mockHistoryPage( - Array.from({ length: 5 }, (_, i) => createMockJobItem(10 + i)) + Array.from({ length: 5 }, (_, i) => createMockJobItem(i)) ) ) await store.loadMoreHistory() @@ -688,9 +690,51 @@ describe('assetsStore - Refactored (Option A)', () => { 4, expect.any(Function), 200, - { offset: 10 } + { offset: 0 } + ) + expect(store.historyAssets).toHaveLength(5) + }) + + it('does not skip or duplicate rows when items are deleted server-side before cursor recovery', async () => { + // Client loaded jobs 0-9 (10 items), then some were deleted server-side. + // When the cursor is rejected, falling back to { offset: 10 } would skip + // rows because the server now has fewer items before that position. + // The fix resets to offset 0 and replaces the list. + const firstBatch = Array.from({ length: 10 }, (_, i) => + createMockJobItem(i) + ) + // Server-side: jobs 0, 3, 7 were deleted; remaining are 1,2,4,5,6,8,9 (7 items) + // Cursor is rejected; fallback at offset 0 returns the current server state + const serverStateAfterDeletions = [1, 2, 4, 5, 6, 8, 9].map((i) => + createMockJobItem(i) + ) + vi.mocked(fetchHistoryPage) + .mockResolvedValueOnce( + mockHistoryPage(firstBatch, { + hasMore: true, + nextCursor: 'cursor-stale' + }) + ) + .mockRejectedValueOnce(new JobsApiError(400, 'INVALID_CURSOR')) + .mockResolvedValueOnce(mockHistoryPage(serverStateAfterDeletions)) + + await store.updateHistory() + await store.loadMoreHistory() + + // Fallback must restart from offset 0, not the stale client offset (10) + expect(fetchHistoryPage).toHaveBeenNthCalledWith( + 3, + expect.any(Function), + 200, + { offset: 0 } ) - expect(store.historyAssets).toHaveLength(15) + // List is replaced with the fresh server state — no skipped rows, no duplicates + expect(store.historyAssets).toHaveLength(7) + const ids = store.historyAssets.map((a) => a.id) + expect(ids).not.toContain('prompt_0') + expect(ids).not.toContain('prompt_3') + expect(ids).not.toContain('prompt_7') + expect(new Set(ids).size).toBe(ids.length) }) it('preserves the cursor when a transient error rejects the page', async () => { diff --git a/src/stores/assetsStore.ts b/src/stores/assetsStore.ts index e1a82f6fbbc..f9be9b5e544 100644 --- a/src/stores/assetsStore.ts +++ b/src/stores/assetsStore.ts @@ -211,7 +211,10 @@ export const useAssetsStore = defineStore('assets', () => { if (!isRejectedCursorError(err) || epoch !== historyFetchEpoch) throw err console.warn('Stale history cursor rejected, resuming via offset:', err) historyNextCursor.value = null - return fetchHistoryJobsPage({ offset: historyOffset.value }) + historyOffset.value = 0 + allHistoryItems.value = [] + loadedIds.clear() + return fetchHistoryJobsPage({ offset: 0 }) } }