diff --git a/src/components/queue/QueueProgressOverlay.vue b/src/components/queue/QueueProgressOverlay.vue index 3e3b4c14460..416e0b490b8 100644 --- a/src/components/queue/QueueProgressOverlay.vue +++ b/src/components/queue/QueueProgressOverlay.vue @@ -245,7 +245,7 @@ const focusAssetInSidebar = async (item: JobListItem) => { const assetId = String(jobId) openAssetsSidebar() await nextTick() - await assetsStore.updateHistory() + await assetsStore.refreshHistoryHead() const asset = assetsStore.historyAssets.find( (existingAsset) => existingAsset.id === assetId ) diff --git a/src/platform/missingMedia/missingMediaAssetResolver.test.ts b/src/platform/missingMedia/missingMediaAssetResolver.test.ts index 632320759c1..b8f092cf081 100644 --- a/src/platform/missingMedia/missingMediaAssetResolver.test.ts +++ b/src/platform/missingMedia/missingMediaAssetResolver.test.ts @@ -255,13 +255,13 @@ describe('resolveMissingMediaAssetSources', () => { 1, expect.any(Function), 200, - 0 + { offset: 0 } ) expect(mockFetchHistoryPage).toHaveBeenNthCalledWith( 2, expect.any(Function), 200, - 200 + { offset: 200 } ) }) diff --git a/src/platform/missingMedia/missingMediaAssetResolver.ts b/src/platform/missingMedia/missingMediaAssetResolver.ts index 20f803c51ab..861fe8439d4 100644 --- a/src/platform/missingMedia/missingMediaAssetResolver.ts +++ b/src/platform/missingMedia/missingMediaAssetResolver.ts @@ -176,7 +176,7 @@ async function fetchGeneratedHistoryAssets( const historyPage = await fetchHistoryPage( api.fetchApi.bind(api), HISTORY_MEDIA_ASSETS_PAGE_SIZE, - requestedOffset + { offset: requestedOffset } ) signal?.throwIfAborted() diff --git a/src/platform/missingMedia/missingMediaScan.test.ts b/src/platform/missingMedia/missingMediaScan.test.ts index 6e2236533eb..d90ea77a42c 100644 --- a/src/platform/missingMedia/missingMediaScan.test.ts +++ b/src/platform/missingMedia/missingMediaScan.test.ts @@ -709,7 +709,7 @@ describe('verifyMediaCandidates', () => { expect(mockFetchHistoryPage).toHaveBeenCalledWith( expect.any(Function), 200, - 0 + { offset: 0 } ) expect(candidates[0]).toMatchObject({ name: 'subfolder/photo.png [output]', @@ -843,13 +843,13 @@ describe('verifyMediaCandidates', () => { 1, expect.any(Function), 200, - 0 + { offset: 0 } ) expect(mockFetchHistoryPage).toHaveBeenNthCalledWith( 2, expect.any(Function), 200, - 200 + { offset: 200 } ) expect(candidates[0].isMissing).toBe(false) }) diff --git a/src/platform/remote/comfyui/jobs/fetchJobs.test.ts b/src/platform/remote/comfyui/jobs/fetchJobs.test.ts index 53ad431f84c..e44e65d1ed5 100644 --- a/src/platform/remote/comfyui/jobs/fetchJobs.test.ts +++ b/src/platform/remote/comfyui/jobs/fetchJobs.test.ts @@ -1,6 +1,7 @@ import { describe, expect, it, vi } from 'vitest' import { + JobsApiError, extractWorkflow, fetchHistory, fetchHistoryPage, @@ -39,7 +40,8 @@ function createMockResponse( offset: pagination.offset ?? 0, limit: pagination.limit ?? 200, total, - has_more: pagination.has_more ?? false + has_more: pagination.has_more ?? false, + next_cursor: pagination.next_cursor } } } @@ -135,23 +137,57 @@ describe('fetchJobs', () => { expect(result[0].priority).toBe(999) }) - it('returns empty array on error', async () => { + it('propagates fetch errors', async () => { const mockFetch = vi.fn().mockRejectedValue(new Error('Network error')) - const result = await fetchHistory(mockFetch) + await expect(fetchHistory(mockFetch)).rejects.toThrow('Network error') + }) + + it('throws a JobsApiError carrying status and body on non-ok response', async () => { + const mockFetch = vi.fn().mockResolvedValue({ + ok: false, + status: 400, + text: () => + Promise.resolve('{"error":"Invalid cursor","code":"INVALID_CURSOR"}') + }) - expect(result).toEqual([]) + await expect(fetchHistory(mockFetch)).rejects.toBeInstanceOf(JobsApiError) + await expect(fetchHistory(mockFetch)).rejects.toMatchObject({ + status: 400, + message: expect.stringContaining('INVALID_CURSOR') + }) }) - it('returns empty array on non-ok response', async () => { + it('truncates oversized error bodies to 200 chars in the thrown message', async () => { + const oversized = 'x'.repeat(500) const mockFetch = vi.fn().mockResolvedValue({ ok: false, - status: 500 + status: 500, + text: () => Promise.resolve(oversized) }) - const result = await fetchHistory(mockFetch) + const err = await fetchHistory(mockFetch).catch((e) => e) + expect(err).toBeInstanceOf(JobsApiError) + expect(err.message.length).toBeLessThanOrEqual( + '[Jobs API] Failed to fetch jobs: 500 '.length + 200 + 1 // +1 for the ellipsis + ) + expect(err.message).toContain('…') + }) - expect(result).toEqual([]) + it('parses a null next_cursor as absent', async () => { + const mockFetch = vi.fn().mockResolvedValue({ + ok: true, + json: () => + Promise.resolve( + createMockResponse([createMockJob('job1', 'completed')], 1, { + next_cursor: null + }) + ) + }) + + const result = await fetchHistoryPage(mockFetch, 200, { offset: 0 }) + + expect(result.nextCursor).toBeUndefined() }) it('parses batch containing text-only preview outputs', async () => { @@ -205,7 +241,7 @@ describe('fetchJobs', () => { ) }) - const result = await fetchHistoryPage(mockFetch, 2, 5) + const result = await fetchHistoryPage(mockFetch, 2, { offset: 5 }) expect(mockFetch).toHaveBeenCalledWith( '/jobs?status=completed,failed,cancelled&limit=2&offset=5' @@ -218,6 +254,79 @@ describe('fetchJobs', () => { expect(result.jobs[0].priority).toBe(5) expect(result.jobs[1].priority).toBe(4) }) + + it('sends the cursor instead of offset and returns next_cursor', async () => { + const mockFetch = vi + .fn<(url: string) => Promise>() + .mockResolvedValue( + new Response( + JSON.stringify( + createMockResponse([createMockJob('job1', 'completed')], 10, { + has_more: true, + next_cursor: 'cursor-page-2' + }) + ), + { status: 200 } + ) + ) + + const result = await fetchHistoryPage(mockFetch, 200, { + after: 'cursor-page-1' + }) + + expect(mockFetch).toHaveBeenCalledWith( + '/jobs?status=completed,failed,cancelled&limit=200&after=cursor-page-1' + ) + expect(result.nextCursor).toBe('cursor-page-2') + expect(result.hasMore).toBe(true) + }) + + it('uri-encodes the cursor', async () => { + const mockFetch = vi.fn().mockResolvedValue({ + ok: true, + json: () => Promise.resolve(createMockResponse([])) + }) + + await fetchHistoryPage(mockFetch, 200, { after: 'a+b/c=' }) + + expect(mockFetch).toHaveBeenCalledWith( + '/jobs?status=completed,failed,cancelled&limit=200&after=a%2Bb%2Fc%3D' + ) + }) + + it('returns next_cursor from offset-mode responses for cursor bootstrap', async () => { + const mockFetch = vi.fn().mockResolvedValue({ + ok: true, + json: () => + Promise.resolve( + createMockResponse([createMockJob('job1', 'completed')], 10, { + has_more: true, + next_cursor: 'minted-in-offset-mode' + }) + ) + }) + + const result = await fetchHistoryPage(mockFetch, 200, { offset: 0 }) + + expect(mockFetch).toHaveBeenCalledWith( + '/jobs?status=completed,failed,cancelled&limit=200&offset=0' + ) + expect(result.nextCursor).toBe('minted-in-offset-mode') + }) + + it('omits nextCursor when the server does not mint one', async () => { + const mockFetch = vi.fn().mockResolvedValue({ + ok: true, + json: () => + Promise.resolve( + createMockResponse([createMockJob('job1', 'completed')]) + ) + }) + + const result = await fetchHistoryPage(mockFetch, 200, { offset: 0 }) + + expect(result.nextCursor).toBeUndefined() + }) }) describe('fetchQueue', () => { @@ -268,12 +377,10 @@ describe('fetchJobs', () => { ) }) - it('returns empty arrays on error', async () => { + it('propagates fetch errors', async () => { const mockFetch = vi.fn().mockRejectedValue(new Error('Network error')) - const result = await fetchQueue(mockFetch) - - expect(result).toEqual({ Running: [], Pending: [] }) + await expect(fetchQueue(mockFetch)).rejects.toThrow('Network error') }) }) diff --git a/src/platform/remote/comfyui/jobs/fetchJobs.ts b/src/platform/remote/comfyui/jobs/fetchJobs.ts index 25790a5ecd8..5dae6a2efc1 100644 --- a/src/platform/remote/comfyui/jobs/fetchJobs.ts +++ b/src/platform/remote/comfyui/jobs/fetchJobs.ts @@ -18,12 +18,43 @@ import type { } from './jobTypes' import { zJobDetail, zJobsListResponse, zWorkflowContainer } from './jobTypes' +/** + * Position of the page to fetch. `after` is an opaque keyset cursor from a + * prior response's `nextCursor` and takes precedence over `offset`; `offset` + * remains as the fallback for random access and for backends that don't mint + * cursors. + */ +export type JobsPageRequest = + | { after: string; offset?: never } + | { offset?: number; after?: never } + +/** + * Non-ok response from the jobs API. Carries the HTTP status so callers can + * tell a rejected cursor (400 INVALID_CURSOR) apart from transient failures. + */ +const MAX_ERROR_BODY_LENGTH = 200 + +export class JobsApiError extends Error { + constructor( + readonly status: number, + body: string + ) { + const truncated = + body.length > MAX_ERROR_BODY_LENGTH + ? `${body.slice(0, MAX_ERROR_BODY_LENGTH)}…` + : body + super(`[Jobs API] Failed to fetch jobs: ${status} ${truncated}`.trim()) + this.name = 'JobsApiError' + } +} + interface FetchJobsRawResult { jobs: RawJobListItem[] total: number offset: number limit: number hasMore: boolean + nextCursor?: string } export interface FetchHistoryPageResult { @@ -32,43 +63,39 @@ export interface FetchHistoryPageResult { offset: number limit: number hasMore: boolean + nextCursor?: string } /** - * Fetches raw jobs from /jobs endpoint + * Fetches raw jobs from /jobs endpoint. + * Throws on failure so callers can tell a failed page apart from an empty + * last page (e.g. a stale cursor rejected with 400 INVALID_CURSOR). * @internal */ async function fetchJobsRaw( fetchApi: (url: string) => Promise, statuses: JobStatus[], maxItems: number = 200, - offset: number = 0 + page: JobsPageRequest = {} ): Promise { const statusParam = statuses.join(',') - const url = `/jobs?status=${statusParam}&limit=${maxItems}&offset=${offset}` - try { - const res = await fetchApi(url) - if (!res.ok) { - console.error(`[Jobs API] Failed to fetch jobs: ${res.status}`) - return { - jobs: [], - total: 0, - offset, - limit: maxItems, - hasMore: false - } - } - const data = zJobsListResponse.parse(await res.json()) - return { - jobs: data.jobs, - total: data.pagination.total, - offset: data.pagination.offset, - limit: data.pagination.limit, - hasMore: data.pagination.has_more - } - } catch (error) { - console.error('[Jobs API] Error fetching jobs:', error) - return { jobs: [], total: 0, offset, limit: maxItems, hasMore: false } + const pageParam = + page.after != null + ? `after=${encodeURIComponent(page.after)}` + : `offset=${page.offset ?? 0}` + const url = `/jobs?status=${statusParam}&limit=${maxItems}&${pageParam}` + const res = await fetchApi(url) + if (!res.ok) { + throw new JobsApiError(res.status, await res.text().catch(() => '')) + } + const data = zJobsListResponse.parse(await res.json()) + return { + jobs: data.jobs, + total: data.pagination.total, + offset: data.pagination.offset, + limit: data.pagination.limit, + hasMore: data.pagination.has_more, + nextCursor: data.pagination.next_cursor ?? undefined } } @@ -98,7 +125,7 @@ export async function fetchHistory( maxItems: number = 200, offset: number = 0 ): Promise { - const { jobs } = await fetchHistoryPage(fetchApi, maxItems, offset) + const { jobs } = await fetchHistoryPage(fetchApi, maxItems, { offset }) return jobs } @@ -108,13 +135,13 @@ export async function fetchHistory( export async function fetchHistoryPage( fetchApi: (url: string) => Promise, maxItems: number = 200, - offset: number = 0 + page: JobsPageRequest = {} ): Promise { const result = await fetchJobsRaw( fetchApi, ['completed', 'failed', 'cancelled'], maxItems, - offset + page ) // History gets priority based on total count (lower than queue) @@ -123,7 +150,8 @@ export async function fetchHistoryPage( total: result.total, offset: result.offset, limit: result.limit, - hasMore: result.hasMore + hasMore: result.hasMore, + nextCursor: result.nextCursor } } @@ -134,12 +162,7 @@ export async function fetchHistoryPage( export async function fetchQueue( fetchApi: (url: string) => Promise ): Promise<{ Running: JobListItem[]; Pending: JobListItem[] }> { - const { jobs } = await fetchJobsRaw( - fetchApi, - ['in_progress', 'pending'], - 200, - 0 - ) + const { jobs } = await fetchJobsRaw(fetchApi, ['in_progress', 'pending']) const running = jobs.filter((j) => j.status === 'in_progress') const pending = jobs.filter((j) => j.status === 'pending') diff --git a/src/platform/remote/comfyui/jobs/jobTypes.ts b/src/platform/remote/comfyui/jobs/jobTypes.ts index 5704fba491d..a5662752c3f 100644 --- a/src/platform/remote/comfyui/jobs/jobTypes.ts +++ b/src/platform/remote/comfyui/jobs/jobTypes.ts @@ -87,7 +87,8 @@ const zPaginationInfo = z.object({ offset: z.number(), limit: z.number(), total: z.number(), - has_more: z.boolean() + has_more: z.boolean(), + next_cursor: z.string().min(1).nullish() }) export const zJobsListResponse = z.object({ diff --git a/src/stores/assetsStore.test.ts b/src/stores/assetsStore.test.ts index 98806b3330b..35baf58188f 100644 --- a/src/stores/assetsStore.test.ts +++ b/src/stores/assetsStore.test.ts @@ -4,18 +4,23 @@ import { afterEach, beforeEach, describe, expect, it, vi } from 'vitest' import { nextTick, watch } from 'vue' import { useAssetsStore } from '@/stores/assetsStore' -import { api } from '@/scripts/api' import type { AssetItem, AssetResponse } from '@/platform/assets/schemas/assetSchema' +import { + JobsApiError, + fetchHistoryPage +} from '@/platform/remote/comfyui/jobs/fetchJobs' +import type * as fetchJobsModule from '@/platform/remote/comfyui/jobs/fetchJobs' +import type { FetchHistoryPageResult } from '@/platform/remote/comfyui/jobs/fetchJobs' import type { JobListItem } from '@/platform/remote/comfyui/jobs/jobTypes' import { assetService } from '@/platform/assets/services/assetService' // Mock the api module vi.mock('@/scripts/api', () => ({ api: { - getHistory: vi.fn(), + fetchApi: vi.fn(), internalURL: vi.fn((path) => `http://localhost:3000${path}`), apiURL: vi.fn((path) => `http://localhost:3000/api${path}`), addEventListener: vi.fn(), @@ -24,6 +29,32 @@ vi.mock('@/scripts/api', () => ({ } })) +// Mock the jobs API fetcher used for history pagination, keeping the real +// JobsApiError class so the store's instanceof narrowing stays meaningful +vi.mock('@/platform/remote/comfyui/jobs/fetchJobs', async (importOriginal) => ({ + ...(await importOriginal()), + fetchHistoryPage: vi.fn() +})) + +// Helper to build a server pagination page around a set of jobs +const mockHistoryPage = ( + jobs: JobListItem[], + { + hasMore = false, + nextCursor, + offset = 0, + total = jobs.length, + limit = 200 + }: Partial> = {} +): FetchHistoryPageResult => ({ + jobs, + total, + offset, + limit, + hasMore, + nextCursor +}) + // Mock the asset service vi.mock('@/platform/assets/services/assetService', () => ({ assetService: { @@ -212,32 +243,38 @@ describe('assetsStore - Refactored (Option A)', () => { const mockHistory = Array.from({ length: 10 }, (_, i) => createMockJobItem(i) ) - vi.mocked(api.getHistory).mockResolvedValue(mockHistory) + vi.mocked(fetchHistoryPage).mockResolvedValue( + mockHistoryPage(mockHistory) + ) await store.updateHistory() - expect(api.getHistory).toHaveBeenCalledWith(200, { offset: 0 }) + expect(fetchHistoryPage).toHaveBeenCalledWith(expect.any(Function), 200, { + offset: 0 + }) expect(store.historyAssets).toHaveLength(10) - expect(store.hasMoreHistory).toBe(false) // Less than BATCH_SIZE + expect(store.hasMoreHistory).toBe(false) // Server reported no more pages expect(store.historyLoading).toBe(false) expect(store.historyError).toBe(null) }) - it('should set hasMoreHistory to true when batch is full', async () => { + it('should set hasMoreHistory to true when server reports more pages', async () => { const mockHistory = Array.from({ length: 200 }, (_, i) => createMockJobItem(i) ) - vi.mocked(api.getHistory).mockResolvedValue(mockHistory) + vi.mocked(fetchHistoryPage).mockResolvedValue( + mockHistoryPage(mockHistory, { hasMore: true }) + ) await store.updateHistory() expect(store.historyAssets).toHaveLength(200) - expect(store.hasMoreHistory).toBe(true) // Exactly BATCH_SIZE + expect(store.hasMoreHistory).toBe(true) }) it('should handle errors during initial load', async () => { const error = new Error('Failed to fetch') - vi.mocked(api.getHistory).mockRejectedValue(error) + vi.mocked(fetchHistoryPage).mockRejectedValue(error) await store.updateHistory() @@ -262,7 +299,9 @@ describe('assetsStore - Refactored (Option A)', () => { }, createMockJobItem(2) ] - vi.mocked(api.getHistory).mockResolvedValue(mockHistory) + vi.mocked(fetchHistoryPage).mockResolvedValue( + mockHistoryPage(mockHistory) + ) await store.updateHistory() @@ -280,7 +319,9 @@ describe('assetsStore - Refactored (Option A)', () => { const firstBatch = Array.from({ length: 200 }, (_, i) => createMockJobItem(i) ) - vi.mocked(api.getHistory).mockResolvedValueOnce(firstBatch) + vi.mocked(fetchHistoryPage).mockResolvedValueOnce( + mockHistoryPage(firstBatch, { hasMore: true }) + ) await store.updateHistory() expect(store.historyAssets).toHaveLength(200) @@ -290,11 +331,16 @@ describe('assetsStore - Refactored (Option A)', () => { const secondBatch = Array.from({ length: 200 }, (_, i) => createMockJobItem(200 + i) ) - vi.mocked(api.getHistory).mockResolvedValueOnce(secondBatch) + vi.mocked(fetchHistoryPage).mockResolvedValueOnce( + mockHistoryPage(secondBatch, { hasMore: true }) + ) await store.loadMoreHistory() - expect(api.getHistory).toHaveBeenCalledWith(200, { offset: 200 }) + // Offset fallback advances by the number of jobs the page returned + expect(fetchHistoryPage).toHaveBeenCalledWith(expect.any(Function), 200, { + offset: 200 + }) expect(store.historyAssets).toHaveLength(400) // Accumulated expect(store.hasMoreHistory).toBe(true) }) @@ -304,7 +350,9 @@ describe('assetsStore - Refactored (Option A)', () => { const firstBatch = Array.from({ length: 200 }, (_, i) => createMockJobItem(i) ) - vi.mocked(api.getHistory).mockResolvedValueOnce(firstBatch) + vi.mocked(fetchHistoryPage).mockResolvedValueOnce( + mockHistoryPage(firstBatch, { hasMore: true }) + ) await store.updateHistory() expect(store.historyAssets).toHaveLength(200) @@ -315,7 +363,9 @@ describe('assetsStore - Refactored (Option A)', () => { createMockJobItem(5), // Duplicate ...Array.from({ length: 198 }, (_, i) => createMockJobItem(200 + i)) // New ] - vi.mocked(api.getHistory).mockResolvedValueOnce(secondBatch) + vi.mocked(fetchHistoryPage).mockResolvedValueOnce( + mockHistoryPage(secondBatch) + ) await store.loadMoreHistory() @@ -329,11 +379,13 @@ describe('assetsStore - Refactored (Option A)', () => { }) it('should stop loading when no more items', async () => { - // First batch - less than BATCH_SIZE + // Server reports no further pages const firstBatch = Array.from({ length: 50 }, (_, i) => createMockJobItem(i) ) - vi.mocked(api.getHistory).mockResolvedValueOnce(firstBatch) + vi.mocked(fetchHistoryPage).mockResolvedValueOnce( + mockHistoryPage(firstBatch, { hasMore: false }) + ) await store.updateHistory() expect(store.hasMoreHistory).toBe(false) @@ -342,7 +394,7 @@ describe('assetsStore - Refactored (Option A)', () => { await store.loadMoreHistory() // Should only have been called once (initial load) - expect(api.getHistory).toHaveBeenCalledTimes(1) + expect(fetchHistoryPage).toHaveBeenCalledTimes(1) }) it('should handle race conditions with concurrent loads', async () => { @@ -350,19 +402,21 @@ describe('assetsStore - Refactored (Option A)', () => { const initialBatch = Array.from({ length: 200 }, (_, i) => createMockJobItem(i) ) - vi.mocked(api.getHistory).mockResolvedValueOnce(initialBatch) + vi.mocked(fetchHistoryPage).mockResolvedValueOnce( + mockHistoryPage(initialBatch, { hasMore: true }) + ) await store.updateHistory() expect(store.hasMoreHistory).toBe(true) // Clear mock to count only loadMore calls - vi.mocked(api.getHistory).mockClear() + vi.mocked(fetchHistoryPage).mockClear() // Setup slow API response - let resolveLoadMore: (value: JobListItem[]) => void - const loadMorePromise = new Promise((resolve) => { + let resolveLoadMore: (value: FetchHistoryPageResult) => void + const loadMorePromise = new Promise((resolve) => { resolveLoadMore = resolve }) - vi.mocked(api.getHistory).mockReturnValueOnce(loadMorePromise) + vi.mocked(fetchHistoryPage).mockReturnValueOnce(loadMorePromise) // Start first loadMore const firstLoad = store.loadMoreHistory() @@ -374,12 +428,12 @@ describe('assetsStore - Refactored (Option A)', () => { const secondBatch = Array.from({ length: 200 }, (_, i) => createMockJobItem(200 + i) ) - resolveLoadMore!(secondBatch) + resolveLoadMore!(mockHistoryPage(secondBatch, { hasMore: true })) await Promise.all([firstLoad, secondLoad]) // Only one API call - expect(api.getHistory).toHaveBeenCalledTimes(1) + expect(fetchHistoryPage).toHaveBeenCalledTimes(1) }) it('should respect MAX_HISTORY_ITEMS limit', async () => { @@ -389,7 +443,9 @@ describe('assetsStore - Refactored (Option A)', () => { const firstBatch = Array.from({ length: 200 }, (_, i) => createMockJobItem(i) ) - vi.mocked(api.getHistory).mockResolvedValueOnce(firstBatch) + vi.mocked(fetchHistoryPage).mockResolvedValueOnce( + mockHistoryPage(firstBatch, { hasMore: true }) + ) await store.updateHistory() // Load additional batches @@ -397,7 +453,9 @@ describe('assetsStore - Refactored (Option A)', () => { const items = Array.from({ length: 200 }, (_, i) => createMockJobItem(batch * 200 + i) ) - vi.mocked(api.getHistory).mockResolvedValueOnce(items) + vi.mocked(fetchHistoryPage).mockResolvedValueOnce( + mockHistoryPage(items, { hasMore: true }) + ) await store.loadMoreHistory() } @@ -406,13 +464,678 @@ describe('assetsStore - Refactored (Option A)', () => { }) }) + describe('Cursor pagination', () => { + it('uses { after } for loadMore when the first page returned a cursor', async () => { + const firstBatch = Array.from({ length: 10 }, (_, i) => + createMockJobItem(i) + ) + vi.mocked(fetchHistoryPage).mockResolvedValueOnce( + mockHistoryPage(firstBatch, { hasMore: true, nextCursor: 'cursor-1' }) + ) + await store.updateHistory() + + const secondBatch = Array.from({ length: 10 }, (_, i) => + createMockJobItem(10 + i) + ) + vi.mocked(fetchHistoryPage).mockResolvedValueOnce( + mockHistoryPage(secondBatch) + ) + await store.loadMoreHistory() + + expect(fetchHistoryPage).toHaveBeenNthCalledWith( + 2, + expect.any(Function), + 200, + { after: 'cursor-1' } + ) + }) + + it('bootstraps from offset 0 then walks successive cursors', async () => { + const pageJobs = (start: number) => + Array.from({ length: 10 }, (_, i) => createMockJobItem(start + i)) + vi.mocked(fetchHistoryPage) + .mockResolvedValueOnce( + mockHistoryPage(pageJobs(0), { + hasMore: true, + nextCursor: 'cursor-1' + }) + ) + .mockResolvedValueOnce( + mockHistoryPage(pageJobs(10), { + hasMore: true, + nextCursor: 'cursor-2' + }) + ) + .mockResolvedValueOnce(mockHistoryPage(pageJobs(20))) + + await store.updateHistory() + await store.loadMoreHistory() + await store.loadMoreHistory() + + expect(fetchHistoryPage).toHaveBeenNthCalledWith( + 1, + expect.any(Function), + 200, + { offset: 0 } + ) + expect(fetchHistoryPage).toHaveBeenNthCalledWith( + 2, + expect.any(Function), + 200, + { after: 'cursor-1' } + ) + expect(fetchHistoryPage).toHaveBeenNthCalledWith( + 3, + expect.any(Function), + 200, + { after: 'cursor-2' } + ) + expect(store.historyAssets).toHaveLength(30) + }) + + it('falls back to offset paging advanced by returned job count when no cursor is minted', async () => { + const firstBatch = Array.from({ length: 200 }, (_, i) => + createMockJobItem(i) + ) + const secondBatch = Array.from({ length: 150 }, (_, i) => + createMockJobItem(200 + i) + ) + const thirdBatch = Array.from({ length: 10 }, (_, i) => + createMockJobItem(350 + i) + ) + vi.mocked(fetchHistoryPage) + .mockResolvedValueOnce(mockHistoryPage(firstBatch, { hasMore: true })) + .mockResolvedValueOnce(mockHistoryPage(secondBatch, { hasMore: true })) + .mockResolvedValueOnce(mockHistoryPage(thirdBatch)) + + await store.updateHistory() + await store.loadMoreHistory() + await store.loadMoreHistory() + + expect(fetchHistoryPage).toHaveBeenNthCalledWith( + 2, + expect.any(Function), + 200, + { offset: 200 } + ) + expect(fetchHistoryPage).toHaveBeenNthCalledWith( + 3, + expect.any(Function), + 200, + { offset: 350 } + ) + }) + + it('stops loadMore when the server reports hasMore false despite a full batch', async () => { + const fullBatch = Array.from({ length: 200 }, (_, i) => + createMockJobItem(i) + ) + vi.mocked(fetchHistoryPage).mockResolvedValueOnce( + mockHistoryPage(fullBatch, { hasMore: false }) + ) + + await store.updateHistory() + expect(store.hasMoreHistory).toBe(false) + + await store.loadMoreHistory() + + expect(fetchHistoryPage).toHaveBeenCalledTimes(1) + }) + + it('resets to { offset: 0 } on a full reload after a cursor walk', async () => { + const pageJobs = (start: number) => + Array.from({ length: 10 }, (_, i) => createMockJobItem(start + i)) + vi.mocked(fetchHistoryPage) + .mockResolvedValueOnce( + mockHistoryPage(pageJobs(0), { + hasMore: true, + nextCursor: 'cursor-1' + }) + ) + .mockResolvedValueOnce( + mockHistoryPage(pageJobs(10), { + hasMore: true, + nextCursor: 'cursor-2' + }) + ) + .mockResolvedValueOnce(mockHistoryPage(pageJobs(0))) + + await store.updateHistory() + await store.loadMoreHistory() + await store.updateHistory() + + expect(fetchHistoryPage).toHaveBeenNthCalledWith( + 3, + expect.any(Function), + 200, + { offset: 0 } + ) + }) + + it('recovers from a rejected cursor by restarting from offset 0 (no drift)', async () => { + const firstBatch = Array.from({ length: 10 }, (_, i) => + createMockJobItem(i) + ) + vi.mocked(fetchHistoryPage) + .mockResolvedValueOnce( + mockHistoryPage(firstBatch, { + hasMore: true, + nextCursor: 'cursor-stale' + }) + ) + .mockRejectedValueOnce(new JobsApiError(400, 'INVALID_CURSOR')) + .mockResolvedValueOnce( + mockHistoryPage( + Array.from({ length: 10 }, (_, i) => createMockJobItem(10 + i)), + { hasMore: true, nextCursor: 'cursor-fresh' } + ) + ) + + await store.updateHistory() + await store.loadMoreHistory() + + expect(fetchHistoryPage).toHaveBeenNthCalledWith( + 2, + expect.any(Function), + 200, + { after: 'cursor-stale' } + ) + expect(fetchHistoryPage).toHaveBeenNthCalledWith( + 3, + expect.any(Function), + 200, + { offset: 0 } + ) + // List is replaced (not merged) so there are no duplicates from the reset + expect(store.historyAssets).toHaveLength(10) + expect(store.historyError).toBe(null) + + // The recovered page minted a fresh cursor, so the walk resumes in cursor mode + vi.mocked(fetchHistoryPage).mockResolvedValueOnce(mockHistoryPage([])) + await store.loadMoreHistory() + expect(fetchHistoryPage).toHaveBeenNthCalledWith( + 4, + expect.any(Function), + 200, + { after: 'cursor-fresh' } + ) + }) + + it('keeps pagination resumable when the offset-0 retry also fails', async () => { + const firstBatch = Array.from({ length: 10 }, (_, i) => + createMockJobItem(i) + ) + vi.mocked(fetchHistoryPage) + .mockResolvedValueOnce( + mockHistoryPage(firstBatch, { + hasMore: true, + nextCursor: 'cursor-stale' + }) + ) + .mockRejectedValueOnce(new JobsApiError(400, 'INVALID_CURSOR')) + .mockRejectedValueOnce(new Error('network down')) + + await store.updateHistory() + await store.loadMoreHistory() + + expect(store.historyError).toBeInstanceOf(Error) + expect(store.hasMoreHistory).toBe(true) + // historyAssets retains the last successful display state across a failed retry + expect(store.historyAssets).toHaveLength(10) + + // The dropped cursor + reset offset means the next attempt restarts from 0 + vi.mocked(fetchHistoryPage).mockResolvedValueOnce( + mockHistoryPage( + Array.from({ length: 5 }, (_, i) => createMockJobItem(i)) + ) + ) + await store.loadMoreHistory() + expect(fetchHistoryPage).toHaveBeenNthCalledWith( + 4, + expect.any(Function), + 200, + { offset: 0 } + ) + expect(store.historyAssets).toHaveLength(5) + }) + + it('does not skip or duplicate rows when items are deleted server-side before cursor recovery', async () => { + // Client loaded jobs 0-9 (10 items), then some were deleted server-side. + // When the cursor is rejected, falling back to { offset: 10 } would skip + // rows because the server now has fewer items before that position. + // The fix resets to offset 0 and replaces the list. + const firstBatch = Array.from({ length: 10 }, (_, i) => + createMockJobItem(i) + ) + // Server-side: jobs 0, 3, 7 were deleted; remaining are 1,2,4,5,6,8,9 (7 items) + // Cursor is rejected; fallback at offset 0 returns the current server state + const serverStateAfterDeletions = [1, 2, 4, 5, 6, 8, 9].map((i) => + createMockJobItem(i) + ) + vi.mocked(fetchHistoryPage) + .mockResolvedValueOnce( + mockHistoryPage(firstBatch, { + hasMore: true, + nextCursor: 'cursor-stale' + }) + ) + .mockRejectedValueOnce(new JobsApiError(400, 'INVALID_CURSOR')) + .mockResolvedValueOnce(mockHistoryPage(serverStateAfterDeletions)) + + await store.updateHistory() + await store.loadMoreHistory() + + // Fallback must restart from offset 0, not the stale client offset (10) + expect(fetchHistoryPage).toHaveBeenNthCalledWith( + 3, + expect.any(Function), + 200, + { offset: 0 } + ) + // List is replaced with the fresh server state — no skipped rows, no duplicates + expect(store.historyAssets).toHaveLength(7) + const ids = store.historyAssets.map((a) => a.id) + expect(ids).not.toContain('prompt_0') + expect(ids).not.toContain('prompt_3') + expect(ids).not.toContain('prompt_7') + expect(new Set(ids).size).toBe(ids.length) + }) + + it('preserves the cursor when a transient error rejects the page', async () => { + vi.mocked(fetchHistoryPage) + .mockResolvedValueOnce( + mockHistoryPage( + Array.from({ length: 10 }, (_, i) => createMockJobItem(i)), + { hasMore: true, nextCursor: 'cursor-1' } + ) + ) + .mockRejectedValueOnce(new JobsApiError(500, 'server error')) + + await store.updateHistory() + await store.loadMoreHistory() + + // No offset fallback for transient failures, just a recorded error + expect(fetchHistoryPage).toHaveBeenCalledTimes(2) + expect(store.historyError).toBeInstanceOf(Error) + expect(store.hasMoreHistory).toBe(true) + + // The still-valid cursor is retried on the next attempt + vi.mocked(fetchHistoryPage).mockResolvedValueOnce(mockHistoryPage([])) + await store.loadMoreHistory() + expect(fetchHistoryPage).toHaveBeenLastCalledWith( + expect.any(Function), + 200, + { after: 'cursor-1' } + ) + }) + + it('treats a cursorless empty page as terminal even if the server claims more', async () => { + vi.mocked(fetchHistoryPage).mockResolvedValueOnce( + mockHistoryPage([], { hasMore: true }) + ) + + await store.updateHistory() + + expect(store.hasMoreHistory).toBe(false) + await store.loadMoreHistory() + expect(fetchHistoryPage).toHaveBeenCalledTimes(1) + }) + + it('keeps paging when an empty page still mints a cursor', async () => { + vi.mocked(fetchHistoryPage) + .mockResolvedValueOnce( + mockHistoryPage([], { hasMore: true, nextCursor: 'cursor-skip' }) + ) + .mockResolvedValueOnce(mockHistoryPage([createMockJobItem(0)])) + + await store.updateHistory() + expect(store.hasMoreHistory).toBe(true) + + await store.loadMoreHistory() + expect(fetchHistoryPage).toHaveBeenNthCalledWith( + 2, + expect.any(Function), + 200, + { after: 'cursor-skip' } + ) + }) + + it('keeps paging when a page of jobs maps to no displayable assets', async () => { + const failedJobs = Array.from({ length: 3 }, (_, i) => ({ + ...createMockJobItem(i), + status: 'failed' as const, + preview_output: null + })) + vi.mocked(fetchHistoryPage).mockResolvedValueOnce( + mockHistoryPage(failedJobs, { hasMore: true }) + ) + + await store.updateHistory() + + expect(store.historyAssets).toHaveLength(0) + expect(store.hasMoreHistory).toBe(true) + }) + + it('does not let a stale rejected continuation drop the new walk cursor', async () => { + vi.mocked(fetchHistoryPage).mockResolvedValueOnce( + mockHistoryPage( + Array.from({ length: 10 }, (_, i) => createMockJobItem(i)), + { hasMore: true, nextCursor: 'cursor-old' } + ) + ) + await store.updateHistory() + + let rejectStale: (err: Error) => void + vi.mocked(fetchHistoryPage).mockReturnValueOnce( + new Promise((_, reject) => { + rejectStale = reject + }) + ) + const staleLoad = store.loadMoreHistory() + + vi.mocked(fetchHistoryPage).mockResolvedValueOnce( + mockHistoryPage([createMockJobItem(100)], { + hasMore: true, + nextCursor: 'cursor-fresh' + }) + ) + await store.updateHistory() + + rejectStale!(new JobsApiError(400, 'INVALID_CURSOR')) + await staleLoad + + // The superseded walk neither nulled the fresh cursor nor fired an + // offset retry against the new walk, and must not surface a spurious error + expect(store.historyError).toBeNull() + expect(fetchHistoryPage).toHaveBeenCalledTimes(3) + vi.mocked(fetchHistoryPage).mockResolvedValueOnce(mockHistoryPage([])) + await store.loadMoreHistory() + expect(fetchHistoryPage).toHaveBeenLastCalledWith( + expect.any(Function), + 200, + { after: 'cursor-fresh' } + ) + }) + + it('discards a stale loadMore continuation that resolves after a reset', async () => { + const firstBatch = Array.from({ length: 10 }, (_, i) => + createMockJobItem(i) + ) + vi.mocked(fetchHistoryPage).mockResolvedValueOnce( + mockHistoryPage(firstBatch, { hasMore: true, nextCursor: 'cursor-1' }) + ) + await store.updateHistory() + + let resolveStale: (page: FetchHistoryPageResult) => void + vi.mocked(fetchHistoryPage).mockReturnValueOnce( + new Promise((resolve) => { + resolveStale = resolve + }) + ) + const staleLoad = store.loadMoreHistory() + + const freshBatch = Array.from({ length: 5 }, (_, i) => + createMockJobItem(100 + i) + ) + vi.mocked(fetchHistoryPage).mockResolvedValueOnce( + mockHistoryPage(freshBatch, { + hasMore: true, + nextCursor: 'cursor-fresh' + }) + ) + await store.updateHistory() + + resolveStale!( + mockHistoryPage( + Array.from({ length: 10 }, (_, i) => createMockJobItem(10 + i)), + { hasMore: true, nextCursor: 'cursor-stale' } + ) + ) + await staleLoad + + // The stale page neither merged into the fresh list nor moved its cursor + expect(store.historyAssets).toHaveLength(5) + vi.mocked(fetchHistoryPage).mockResolvedValueOnce(mockHistoryPage([])) + await store.loadMoreHistory() + expect(fetchHistoryPage).toHaveBeenLastCalledWith( + expect.any(Function), + 200, + { after: 'cursor-fresh' } + ) + }) + + it('terminates the walk when the backend returns the same cursor it was given (stuck cursor)', async () => { + // Page 1: initial load mints cursor-1 + const firstBatch = Array.from({ length: 10 }, (_, i) => + createMockJobItem(i) + ) + vi.mocked(fetchHistoryPage).mockResolvedValueOnce( + mockHistoryPage(firstBatch, { hasMore: true, nextCursor: 'cursor-1' }) + ) + await store.updateHistory() + expect(store.hasMoreHistory).toBe(true) + + // Page 2: backend echoes back cursor-1 (same as the requested after), + // with has_more:true and a non-empty page — the stuck-cursor shape. + // Without the guard, mergeHistoryAssets dedupes every row and + // hasMoreHistory stays true, causing an infinite spin. + const stuckPage = Array.from( + { length: 10 }, + (_, i) => createMockJobItem(i) // same ids as page 1 → all deduped + ) + vi.mocked(fetchHistoryPage).mockResolvedValueOnce( + mockHistoryPage(stuckPage, { hasMore: true, nextCursor: 'cursor-1' }) + ) + await store.loadMoreHistory() + + // Guard must have fired: hasMoreHistory forced off, cursor dropped + expect(store.hasMoreHistory).toBe(false) + + // A subsequent loadMoreHistory must not issue another fetch + await store.loadMoreHistory() + expect(fetchHistoryPage).toHaveBeenCalledTimes(2) + }) + }) + + describe('refreshHistoryHead', () => { + it('performs a full initial load when the list is empty', async () => { + const jobs = Array.from({ length: 5 }, (_, i) => createMockJobItem(i)) + vi.mocked(fetchHistoryPage).mockResolvedValueOnce(mockHistoryPage(jobs)) + + await store.refreshHistoryHead() + + expect(fetchHistoryPage).toHaveBeenCalledWith(expect.any(Function), 200, { + offset: 0 + }) + expect(store.historyAssets).toHaveLength(5) + expect(store.historyLoading).toBe(false) + }) + + it('prepends new completions without resetting the stored cursor', async () => { + const initialJobs = [createMockJobItem(1), createMockJobItem(2)] + vi.mocked(fetchHistoryPage) + .mockResolvedValueOnce( + mockHistoryPage(initialJobs, { + hasMore: true, + nextCursor: 'cursor-1' + }) + ) + .mockResolvedValueOnce( + mockHistoryPage([createMockJobItem(0), createMockJobItem(1)], { + hasMore: true, + nextCursor: 'cursor-head' + }) + ) + .mockResolvedValueOnce(mockHistoryPage([createMockJobItem(3)])) + + await store.updateHistory() + await store.refreshHistoryHead() + + // Head page re-fetched from the top, new job prepended, existing kept + expect(fetchHistoryPage).toHaveBeenNthCalledWith( + 2, + expect.any(Function), + 200, + { offset: 0 } + ) + expect(store.historyAssets.map((a) => a.id)).toEqual([ + 'prompt_0', + 'prompt_1', + 'prompt_2' + ]) + + // Subsequent loadMore still resumes from the pre-refresh cursor + await store.loadMoreHistory() + expect(fetchHistoryPage).toHaveBeenNthCalledWith( + 3, + expect.any(Function), + 200, + { after: 'cursor-1' } + ) + }) + + it('keeps existing items and records the error when the head fetch fails', async () => { + const initialJobs = Array.from({ length: 3 }, (_, i) => + createMockJobItem(i) + ) + vi.mocked(fetchHistoryPage).mockResolvedValueOnce( + mockHistoryPage(initialJobs, { hasMore: true }) + ) + await store.updateHistory() + + const error = new Error('refresh failed') + vi.mocked(fetchHistoryPage).mockRejectedValueOnce(error) + + await store.refreshHistoryHead() + + expect(store.historyAssets).toHaveLength(3) + expect(store.historyError).toBe(error) + }) + + it('restarts the walk when the head page does not reach the loaded items', async () => { + const overflowPage = () => + Array.from({ length: 200 }, (_, i) => createMockJobItem(1000 + i)) + vi.mocked(fetchHistoryPage) + .mockResolvedValueOnce( + mockHistoryPage( + Array.from({ length: 10 }, (_, i) => createMockJobItem(i)), + { hasMore: true, nextCursor: 'cursor-1' } + ) + ) + .mockResolvedValueOnce( + mockHistoryPage(overflowPage(), { + hasMore: true, + nextCursor: 'cursor-head' + }) + ) + .mockResolvedValueOnce( + mockHistoryPage(overflowPage(), { + hasMore: true, + nextCursor: 'cursor-head-2' + }) + ) + + await store.updateHistory() + await store.refreshHistoryHead() + + // No overlap with loaded items while more rows remain means merging + // would leave an unfillable hole, so the walk restarts from the top + expect(fetchHistoryPage).toHaveBeenCalledTimes(3) + expect(fetchHistoryPage).toHaveBeenNthCalledWith( + 3, + expect.any(Function), + 200, + { offset: 0 } + ) + expect(store.historyAssets).toHaveLength(200) + expect(store.historyAssets.some((asset) => asset.id === 'prompt_0')).toBe( + false + ) + }) + + it('coalesces a burst into a leading fetch plus one trailing refresh', async () => { + vi.mocked(fetchHistoryPage).mockResolvedValueOnce( + mockHistoryPage([createMockJobItem(0)], { + hasMore: true, + nextCursor: 'cursor-1' + }) + ) + await store.updateHistory() + + let resolveLeading: (page: FetchHistoryPageResult) => void + vi.mocked(fetchHistoryPage).mockReturnValueOnce( + new Promise((resolve) => { + resolveLeading = resolve + }) + ) + + const first = store.refreshHistoryHead() + const second = store.refreshHistoryHead() + const third = store.refreshHistoryHead() + + // The trailing refresh re-fetches the head and sees the job the + // leading response was dispatched too early to include + vi.mocked(fetchHistoryPage).mockResolvedValueOnce( + mockHistoryPage([createMockJobItem(0), createMockJobItem(1)], { + hasMore: true + }) + ) + resolveLeading!( + mockHistoryPage([createMockJobItem(0)], { hasMore: true }) + ) + await Promise.all([first, second, third]) + + // Initial load + leading head fetch + exactly one trailing head fetch + expect(fetchHistoryPage).toHaveBeenCalledTimes(3) + expect(store.historyAssets).toHaveLength(2) + }) + + it('runs a fresh fetch for sequential refreshes', async () => { + vi.mocked(fetchHistoryPage).mockResolvedValue( + mockHistoryPage([createMockJobItem(0)], { hasMore: true }) + ) + await store.updateHistory() + + await store.refreshHistoryHead() + await store.refreshHistoryHead() + + expect(fetchHistoryPage).toHaveBeenCalledTimes(3) + }) + + it('prunes server-side deletions when the head page spans the whole timeline', async () => { + vi.mocked(fetchHistoryPage) + .mockResolvedValueOnce( + mockHistoryPage([ + createMockJobItem(0), + createMockJobItem(1), + createMockJobItem(2) + ]) + ) + .mockResolvedValueOnce( + mockHistoryPage([createMockJobItem(0), createMockJobItem(2)]) + ) + + await store.updateHistory() + expect(store.historyAssets).toHaveLength(3) + + await store.refreshHistoryHead() + + expect(store.historyAssets.map((asset) => asset.id)).toEqual([ + 'prompt_0', + 'prompt_2' + ]) + }) + }) + describe('Sorting', () => { it('should maintain date sorting after pagination', async () => { // First batch const firstBatch = Array.from({ length: 200 }, (_, i) => createMockJobItem(i) ) - vi.mocked(api.getHistory).mockResolvedValueOnce(firstBatch) + vi.mocked(fetchHistoryPage).mockResolvedValueOnce( + mockHistoryPage(firstBatch, { hasMore: true }) + ) await store.updateHistory() @@ -420,7 +1143,9 @@ describe('assetsStore - Refactored (Option A)', () => { const secondBatch = Array.from({ length: 200 }, (_, i) => createMockJobItem(200 + i) ) - vi.mocked(api.getHistory).mockResolvedValueOnce(secondBatch) + vi.mocked(fetchHistoryPage).mockResolvedValueOnce( + mockHistoryPage(secondBatch) + ) await store.loadMoreHistory() @@ -439,14 +1164,16 @@ describe('assetsStore - Refactored (Option A)', () => { const firstBatch = Array.from({ length: 200 }, (_, i) => createMockJobItem(i) ) - vi.mocked(api.getHistory).mockResolvedValueOnce(firstBatch) + vi.mocked(fetchHistoryPage).mockResolvedValueOnce( + mockHistoryPage(firstBatch, { hasMore: true }) + ) await store.updateHistory() expect(store.historyAssets).toHaveLength(200) // Second load fails const error = new Error('Network error') - vi.mocked(api.getHistory).mockRejectedValueOnce(error) + vi.mocked(fetchHistoryPage).mockRejectedValueOnce(error) await store.loadMoreHistory() @@ -461,13 +1188,15 @@ describe('assetsStore - Refactored (Option A)', () => { const firstBatch = Array.from({ length: 200 }, (_, i) => createMockJobItem(i) ) - vi.mocked(api.getHistory).mockResolvedValueOnce(firstBatch) + vi.mocked(fetchHistoryPage).mockResolvedValueOnce( + mockHistoryPage(firstBatch, { hasMore: true }) + ) await store.updateHistory() // Second load fails const error = new Error('Network error') - vi.mocked(api.getHistory).mockRejectedValueOnce(error) + vi.mocked(fetchHistoryPage).mockRejectedValueOnce(error) await store.loadMoreHistory() expect(store.historyError).toBe(error) @@ -476,7 +1205,9 @@ describe('assetsStore - Refactored (Option A)', () => { const thirdBatch = Array.from({ length: 200 }, (_, i) => createMockJobItem(200 + i) ) - vi.mocked(api.getHistory).mockResolvedValueOnce(thirdBatch) + vi.mocked(fetchHistoryPage).mockResolvedValueOnce( + mockHistoryPage(thirdBatch) + ) await store.loadMoreHistory() @@ -487,7 +1218,7 @@ describe('assetsStore - Refactored (Option A)', () => { it('should handle errors with proper loading state', async () => { const error = new Error('API error') - vi.mocked(api.getHistory).mockRejectedValue(error) + vi.mocked(fetchHistoryPage).mockRejectedValue(error) await store.updateHistory() @@ -505,7 +1236,9 @@ describe('assetsStore - Refactored (Option A)', () => { const items = Array.from({ length: 200 }, (_, i) => createMockJobItem(batch * 200 + i) ) - vi.mocked(api.getHistory).mockResolvedValueOnce(items) + vi.mocked(fetchHistoryPage).mockResolvedValueOnce( + mockHistoryPage(items, { hasMore: true }) + ) if (batch === 0) { await store.updateHistory() @@ -529,7 +1262,9 @@ describe('assetsStore - Refactored (Option A)', () => { const items = Array.from({ length: 200 }, (_, i) => createMockJobItem(batch * 200 + i) ) - vi.mocked(api.getHistory).mockResolvedValueOnce(items) + vi.mocked(fetchHistoryPage).mockResolvedValueOnce( + mockHistoryPage(items, { hasMore: true }) + ) if (batch === 0) { await store.updateHistory() @@ -554,7 +1289,9 @@ describe('assetsStore - Refactored (Option A)', () => { const mockHistory = Array.from({ length: 3 }, (_, i) => createMockJobItem(i) ) - vi.mocked(api.getHistory).mockResolvedValue(mockHistory) + vi.mocked(fetchHistoryPage).mockResolvedValue( + mockHistoryPage(mockHistory) + ) await store.updateHistory() const target = store.historyAssets[1] @@ -573,7 +1310,9 @@ describe('assetsStore - Refactored (Option A)', () => { it('matches by name even when ids differ between APIs', async () => { const mockHistory = [createMockJobItem(0)] - vi.mocked(api.getHistory).mockResolvedValue(mockHistory) + vi.mocked(fetchHistoryPage).mockResolvedValue( + mockHistoryPage(mockHistory) + ) await store.updateHistory() const historyAssetId = store.historyAssets[0].id @@ -591,7 +1330,9 @@ describe('assetsStore - Refactored (Option A)', () => { const mockHistory = Array.from({ length: 2 }, (_, i) => createMockJobItem(i) ) - vi.mocked(api.getHistory).mockResolvedValue(mockHistory) + vi.mocked(fetchHistoryPage).mockResolvedValue( + mockHistoryPage(mockHistory) + ) await store.updateHistory() const before = store.historyAssets.map((a) => ({ ...a })) @@ -604,7 +1345,9 @@ describe('assetsStore - Refactored (Option A)', () => { const mockHistory = Array.from({ length: 3 }, (_, i) => createMockJobItem(i) ) - vi.mocked(api.getHistory).mockResolvedValue(mockHistory) + vi.mocked(fetchHistoryPage).mockResolvedValue( + mockHistoryPage(mockHistory) + ) await store.updateHistory() // Patch using a non-matching prefix; the other assets must stay untouched @@ -617,7 +1360,9 @@ describe('assetsStore - Refactored (Option A)', () => { it('replaces the asset object so reactivity fires for v-for keyed by id', async () => { const mockHistory = [createMockJobItem(0)] - vi.mocked(api.getHistory).mockResolvedValue(mockHistory) + vi.mocked(fetchHistoryPage).mockResolvedValue( + mockHistoryPage(mockHistory) + ) await store.updateHistory() const before = store.historyAssets[0] @@ -636,7 +1381,9 @@ describe('assetsStore - Refactored (Option A)', () => { const mockHistory = Array.from({ length: 5 }, (_, i) => createMockJobItem(i) ) - vi.mocked(api.getHistory).mockResolvedValue(mockHistory) + vi.mocked(fetchHistoryPage).mockResolvedValue( + mockHistoryPage(mockHistory) + ) await store.updateHistory() @@ -680,7 +1427,9 @@ describe('assetsStore - Refactored (Option A)', () => { ] const mockHistory = [createMockJobItem(0)] - vi.mocked(api.getHistory).mockResolvedValue(mockHistory) + vi.mocked(fetchHistoryPage).mockResolvedValue( + mockHistoryPage(mockHistory) + ) await store.updateHistory() @@ -715,7 +1464,9 @@ describe('assetsStore - Refactored (Option A)', () => { ] const mockHistory = [createMockJobItem(0)] - vi.mocked(api.getHistory).mockResolvedValue(mockHistory) + vi.mocked(fetchHistoryPage).mockResolvedValue( + mockHistoryPage(mockHistory) + ) await store.updateHistory() @@ -743,7 +1494,9 @@ describe('assetsStore - Refactored (Option A)', () => { ] const mockHistory = [createMockJobItem(0)] - vi.mocked(api.getHistory).mockResolvedValue(mockHistory) + vi.mocked(fetchHistoryPage).mockResolvedValue( + mockHistoryPage(mockHistory) + ) await store.updateHistory() @@ -764,7 +1517,9 @@ describe('assetsStore - Refactored (Option A)', () => { ] const mockHistory = [createMockJobItem(0)] - vi.mocked(api.getHistory).mockResolvedValue(mockHistory) + vi.mocked(fetchHistoryPage).mockResolvedValue( + mockHistoryPage(mockHistory) + ) await store.updateHistory() diff --git a/src/stores/assetsStore.ts b/src/stores/assetsStore.ts index 0a8b20e4b5e..6e5b2b5dd62 100644 --- a/src/stores/assetsStore.ts +++ b/src/stores/assetsStore.ts @@ -17,6 +17,14 @@ import { } from '@/platform/assets/services/assetService' import type { PaginationOptions } from '@/platform/assets/services/assetService' import { isCloud } from '@/platform/distribution/types' +import { + JobsApiError, + fetchHistoryPage +} from '@/platform/remote/comfyui/jobs/fetchJobs' +import type { + FetchHistoryPageResult, + JobsPageRequest +} from '@/platform/remote/comfyui/jobs/fetchJobs' import type { JobListItem } from '@/platform/remote/comfyui/jobs/jobTypes' import { api } from '@/scripts/api' @@ -114,8 +122,9 @@ export const useAssetsStore = defineStore('assets', () => { return deletingAssetIds.has(assetId) } - // Pagination state + // History pagination state const historyOffset = ref(0) + const historyNextCursor = ref(null) const hasMoreHistory = ref(true) const isLoadingMore = ref(false) @@ -147,65 +156,120 @@ export const useAssetsStore = defineStore('assets', () => { } /** - * Fetch history assets with pagination support - * @param loadMore - true for pagination (append), false for initial load (replace) + * Insert assets in sorted order (newest first), skipping already-loaded ids + */ + const mergeHistoryAssets = (newAssets: AssetItem[]) => { + for (const asset of newAssets) { + if (loadedIds.has(asset.id)) { + continue + } + loadedIds.add(asset.id) + + const assetTime = new Date(asset.created_at ?? 0).getTime() + const insertIndex = allHistoryItems.value.findIndex( + (item) => new Date(item.created_at ?? 0).getTime() < assetTime + ) + + if (insertIndex === -1) { + allHistoryItems.value.push(asset) + } else { + allHistoryItems.value.splice(insertIndex, 0, asset) + } + } + } + + const trimHistoryToLimit = () => { + if (allHistoryItems.value.length <= MAX_HISTORY_ITEMS) return + + const removed = allHistoryItems.value.slice(MAX_HISTORY_ITEMS) + allHistoryItems.value = allHistoryItems.value.slice(0, MAX_HISTORY_ITEMS) + removed.forEach((item) => loadedIds.delete(item.id)) + } + + const fetchHistoryJobsPage = (page: JobsPageRequest) => + fetchHistoryPage(api.fetchApi.bind(api), BATCH_SIZE, page) + + // Invalidates in-flight history fetches whenever the list is replaced, so + // a stale continuation can't merge into (or move the cursor of) the new walk. + let historyFetchEpoch = 0 + + const isRejectedCursorError = (err: unknown): boolean => + err instanceof JobsApiError && err.status === 400 + + const fetchHistoryPageWithCursorRecovery = async ( + after: string | null, + epoch: number + ): Promise => { + if (after == null) + return fetchHistoryJobsPage({ offset: historyOffset.value }) + try { + return await fetchHistoryJobsPage({ after }) + } catch (err) { + // Drop only a rejected cursor (e.g. stale across a restart) to the + // offset fallback; transient failures and superseded-walk + // continuations must propagate so a valid/newer cursor isn't lost. + if (!isRejectedCursorError(err) || epoch !== historyFetchEpoch) throw err + console.warn('Stale history cursor rejected, resuming via offset:', err) + historyNextCursor.value = null + historyOffset.value = 0 + allHistoryItems.value = [] + loadedIds.clear() + return fetchHistoryJobsPage({ offset: 0 }) + } + } + + /** + * Fetch one page of history assets and update reactive state. + * + * Pagination model: the server starts in offset mode and mints a + * `next_cursor` on any page that has one; subsequent requests pass that + * cursor (keyset mode). The walk upgrades automatically — offset paging is + * only used until the first cursor is received. + * + * An empty page with no cursor is treated as terminal regardless of + * `has_more`, because offset paging would refetch the same page forever. + * A cursor that hasn't advanced (the server echoed back the value it was + * given) is also treated as terminal to prevent an infinite dedup loop. + * + * @param loadMore - When `true`, appends the next page to the existing list + * (infinite-scroll continuation). When `false` (default), resets all + * pagination state and replaces the list with the first page. + * @returns The current accumulated list of history asset items. */ const fetchHistoryAssets = async (loadMore = false): Promise => { - // Reset state for initial load if (!loadMore) { + historyFetchEpoch += 1 historyOffset.value = 0 + historyNextCursor.value = null hasMoreHistory.value = true allHistoryItems.value = [] loadedIds.clear() } - // Fetch from server with offset - const history = await api.getHistory(BATCH_SIZE, { - offset: historyOffset.value - }) + const epoch = historyFetchEpoch + const requestedAfter = loadMore ? historyNextCursor.value : null + const page = await fetchHistoryPageWithCursorRecovery(requestedAfter, epoch) + if (epoch !== historyFetchEpoch) return allHistoryItems.value - // Convert JobListItems to AssetItems - const newAssets = mapHistoryToAssets(history) + const newAssets = mapHistoryToAssets(page.jobs) if (loadMore) { - // Filter out duplicates and insert in sorted order - for (const asset of newAssets) { - if (loadedIds.has(asset.id)) { - continue // Skip duplicates - } - loadedIds.add(asset.id) - - // Find insertion index to maintain sorted order (newest first) - const assetTime = new Date(asset.created_at ?? 0).getTime() - const insertIndex = allHistoryItems.value.findIndex( - (item) => new Date(item.created_at ?? 0).getTime() < assetTime - ) - - if (insertIndex === -1) { - // Asset is oldest, append to end - allHistoryItems.value.push(asset) - } else { - // Insert at the correct position - allHistoryItems.value.splice(insertIndex, 0, asset) - } - } + mergeHistoryAssets(newAssets) } else { - // Initial load: replace all allHistoryItems.value = newAssets newAssets.forEach((asset) => loadedIds.add(asset.id)) } - // Update pagination state - historyOffset.value += BATCH_SIZE - hasMoreHistory.value = history.length === BATCH_SIZE - - if (allHistoryItems.value.length > MAX_HISTORY_ITEMS) { - const removed = allHistoryItems.value.slice(MAX_HISTORY_ITEMS) - allHistoryItems.value = allHistoryItems.value.slice(0, MAX_HISTORY_ITEMS) + const cursorStuck = + page.nextCursor != null && page.nextCursor === requestedAfter + historyOffset.value += page.jobs.length + historyNextCursor.value = cursorStuck ? null : (page.nextCursor ?? null) + hasMoreHistory.value = + page.hasMore && + !cursorStuck && + (page.jobs.length > 0 || page.nextCursor != null) - // Clean up Set - removed.forEach((item) => loadedIds.delete(item.id)) - } + trimHistoryToLimit() return allHistoryItems.value } @@ -245,13 +309,14 @@ export const useAssetsStore = defineStore('assets', () => { isLoadingMore.value = true historyError.value = null + const epoch = historyFetchEpoch try { await fetchHistoryAssets(true) historyAssets.value = allHistoryItems.value } catch (err) { + if (epoch !== historyFetchEpoch) return console.error('Error loading more history:', err) historyError.value = err - // Keep existing data when error occurs (consistent with updateHistory) if (!historyAssets.value.length) { historyAssets.value = [] } @@ -260,6 +325,81 @@ export const useAssetsStore = defineStore('assets', () => { } } + /** + * A head page with no further rows spans the whole timeline, so replacing + * local state with it also prunes jobs deleted server-side (e.g. after the + * queue history is cleared from another surface). + */ + const replaceHistoryWithHeadPage = (page: FetchHistoryPageResult) => { + historyFetchEpoch += 1 + const newAssets = mapHistoryToAssets(page.jobs) + allHistoryItems.value = newAssets + loadedIds.clear() + newAssets.forEach((asset) => loadedIds.add(asset.id)) + historyOffset.value = page.jobs.length + historyNextCursor.value = page.nextCursor ?? null + hasMoreHistory.value = page.hasMore + } + + let headRefreshInFlight: Promise | null = null + let headRefreshTrailing: Promise | null = null + + /** + * Merge newly completed jobs into the top of the list without resetting + * pagination state, so items loaded via infinite scroll survive the + * refresh. Cursors only walk toward older items, so new completions are + * picked up by re-fetching the head page and deduplicating. Bursts of + * status events share the in-flight refresh, and a call arriving + * mid-flight schedules exactly one trailing refresh — the shared response + * was dispatched before that caller's event, so it could miss the very + * completion the caller is reacting to. + */ + const refreshHistoryHead = (): Promise => { + if (!headRefreshInFlight) { + headRefreshInFlight = doRefreshHistoryHead().finally(() => { + headRefreshInFlight = null + }) + return headRefreshInFlight + } + headRefreshTrailing ??= headRefreshInFlight.then(() => { + headRefreshTrailing = null + return refreshHistoryHead() + }) + return headRefreshTrailing + } + + const doRefreshHistoryHead = async () => { + if (!allHistoryItems.value.length) { + await updateHistory() + return + } + + historyError.value = null + const epoch = historyFetchEpoch + try { + const page = await fetchHistoryJobsPage({ offset: 0 }) + if (epoch !== historyFetchEpoch) return + + const reachesLoadedItems = page.jobs.some((job) => loadedIds.has(job.id)) + if (page.hasMore && !reachesLoadedItems) { + await updateHistory() + return + } + + if (page.hasMore) { + mergeHistoryAssets(mapHistoryToAssets(page.jobs)) + trimHistoryToLimit() + } else { + replaceHistoryWithHeadPage(page) + } + historyAssets.value = allHistoryItems.value + } catch (err) { + if (epoch !== historyFetchEpoch) return + console.error('Error refreshing history:', err) + historyError.value = err + } + } + const flatOutputAssets = ref([]) const flatOutputLoading = ref(false) const flatOutputError = ref(null) @@ -884,6 +1024,7 @@ export const useAssetsStore = defineStore('assets', () => { updateInputs, updateHistory, loadMoreHistory, + refreshHistoryHead, setAssetPreview, // Flat output assets (cloud-only, tag-based) diff --git a/src/views/GraphView.test.ts b/src/views/GraphView.test.ts index a1fc5242c21..3df5fcd0d51 100644 --- a/src/views/GraphView.test.ts +++ b/src/views/GraphView.test.ts @@ -100,7 +100,10 @@ vi.mock('@/composables/useAppMode', () => ({ useAppMode: () => ({ isBuilderMode: ref(false) }) })) vi.mock('@/stores/assetsStore', () => ({ - useAssetsStore: () => ({ updateHistory: vi.fn() }) + useAssetsStore: () => ({ + updateHistory: vi.fn(), + refreshHistoryHead: vi.fn() + }) })) vi.mock('@/stores/commandStore', () => ({ useCommandStore: () => ({ registerCommands: vi.fn() }) diff --git a/src/views/GraphView.vue b/src/views/GraphView.vue index bcfda399b1f..c96167123b9 100644 --- a/src/views/GraphView.vue +++ b/src/views/GraphView.vue @@ -238,7 +238,7 @@ const onStatus = async (e: CustomEvent) => { // Only update assets if the assets sidebar is currently open // When sidebar is closed, AssetsSidebarTab.vue will refresh on mount if (sidebarTabStore.activeSidebarTabId === 'assets' || linearMode.value) { - await assetsStore.updateHistory() + await assetsStore.refreshHistoryHead() } } @@ -247,7 +247,7 @@ const onExecutionSuccess = async () => { // Only update assets if the assets sidebar is currently open // When sidebar is closed, AssetsSidebarTab.vue will refresh on mount if (sidebarTabStore.activeSidebarTabId === 'assets' || linearMode.value) { - await assetsStore.updateHistory() + await assetsStore.refreshHistoryHead() } }