diff --git a/apps/hermes/dashboard/Dockerfile b/apps/hermes/dashboard/Dockerfile index 022dd75c..511d207d 100644 --- a/apps/hermes/dashboard/Dockerfile +++ b/apps/hermes/dashboard/Dockerfile @@ -17,6 +17,7 @@ COPY pnpm-lock.yaml package.json pnpm-workspace.yaml turbo.json ./ COPY packages ./packages COPY apps ./apps +COPY patches ./patches RUN pnpm install --frozen-lockfile diff --git a/apps/mediapulse/agents/data-collection/src/index.test.ts b/apps/mediapulse/agents/data-collection/src/index.test.ts index 4d592f71..d0d180d9 100644 --- a/apps/mediapulse/agents/data-collection/src/index.test.ts +++ b/apps/mediapulse/agents/data-collection/src/index.test.ts @@ -130,7 +130,20 @@ describe("data-collection agent (HTTP)", () => { beforeEach(() => { vi.clearAllMocks(); performWebSearchMock.mockResolvedValue(defaultSearchSuccess); - performWebFetchMock.mockResolvedValue(defaultFetchSuccess); + // performWebFetch now streams each outcome through the onOutcome hook (so + // run.ts persists per URL); mirror that contract instead of just resolving. + performWebFetchMock.mockImplementation( + async ( + _searchResults: unknown, + deps: { onOutcome?: (outcome: unknown) => unknown }, + ) => { + for (const outcome of defaultFetchSuccess) { + await deps?.onOutcome?.(outcome); + } + + return defaultFetchSuccess; + }, + ); existingUrlsCreateMock.mockResolvedValue({ existingUrls: [], hostCounts: {}, diff --git a/apps/mediapulse/agents/data-collection/src/run.test.ts b/apps/mediapulse/agents/data-collection/src/run.test.ts index d41bcd13..96152855 100644 --- a/apps/mediapulse/agents/data-collection/src/run.test.ts +++ b/apps/mediapulse/agents/data-collection/src/run.test.ts @@ -10,8 +10,10 @@ import { } from "./utilities/config-schema"; import type { FetchedWebSearchResult, + WebFetchDeps, WebFetchFailure, WebFetchOutcome, + WebSearchResult, } from "@workspace/agent-ingestion"; const TICKER_ID = "11111111-1111-4111-a111-111111111111"; @@ -151,6 +153,26 @@ const mockFetchFailure = ( failures: [{ provider: "jina", ...failure }], }); +/** + * Mirrors the real performWebFetch streaming contract in tests: invokes the + * `onOutcome` hook for each outcome (so per-URL persistence runs in run.ts) + * before resolving with the full outcome list. + * + * @param outcomes - Fetch outcomes the mocked call should stream and return. + */ +const fetchYielding = + (outcomes: WebFetchOutcome[]) => + async ( + _searchResults: WebSearchResult[], + deps: WebFetchDeps, + ): Promise => { + for (const outcome of outcomes) { + await deps.onOutcome?.(outcome); + } + + return outcomes; + }; + vi.mock("@mediapulse/env/agents-data-collection", () => ({ env: { AGENT_DATA_API_URL: "http://agent-data-api", @@ -251,12 +273,14 @@ describe("runDataCollection", () => { data: searchSuccessPage, }, ]); - vi.mocked(performWebFetch).mockResolvedValue([ - mockFetchSuccess({ - ...searchSuccessPage, - content: validArticleContent, - }), - ]); + vi.mocked(performWebFetch).mockImplementation( + fetchYielding([ + mockFetchSuccess({ + ...searchSuccessPage, + content: validArticleContent, + }), + ]), + ); getMock.mockResolvedValue({ data: [{ id: "sq-1", text: "test query", tickerId: TICKER_ID }], }); @@ -356,7 +380,7 @@ describe("runDataCollection", () => { existingUrls: ["http://example.com"], hostCounts: {}, }); - vi.mocked(performWebFetch).mockResolvedValueOnce([]); + vi.mocked(performWebFetch).mockImplementationOnce(fetchYielding([])); const result = await runDataCollection( createContext({ @@ -390,7 +414,7 @@ describe("runDataCollection", () => { deadUrlsLookupMock.mockResolvedValueOnce({ deadUrls: ["http://example.com/page-0", "http://example.com/page-1"], }); - vi.mocked(performWebFetch).mockResolvedValueOnce([]); + vi.mocked(performWebFetch).mockImplementationOnce(fetchYielding([])); const result = await runDataCollection( createContext({ @@ -425,17 +449,19 @@ describe("runDataCollection", () => { }); it("records 404 fetch failures to the dead-url cache", async () => { - vi.mocked(performWebFetch).mockResolvedValueOnce([ - mockFetchFailure({ - url: "http://failed.com", - queryId: "sq-1", - tickerId: TICKER_ID, - errorCategory: "provider_http_error", - message: "404 Not Found", - retryable: false, - httpStatus: 404, - }), - ]); + vi.mocked(performWebFetch).mockImplementationOnce( + fetchYielding([ + mockFetchFailure({ + url: "http://failed.com", + queryId: "sq-1", + tickerId: TICKER_ID, + errorCategory: "provider_http_error", + message: "404 Not Found", + retryable: false, + httpStatus: 404, + }), + ]), + ); await runDataCollection( createContext({ @@ -488,31 +514,33 @@ describe("runDataCollection", () => { }, ]); - vi.mocked(performWebFetch).mockResolvedValueOnce([ - mockFetchSuccess({ - ...searchSuccessPage, - url: "http://example.com/fresh", - content: validArticleContent, - fetchMetadata: { publishedTime: isoDaysAgo(2) }, - }), - mockFetchSuccess({ - ...searchSuccessPage, - url: "http://example.com/stale", - content: validArticleContent, - fetchMetadata: { publishedTime: isoDaysAgo(30) }, - }), - mockFetchSuccess({ - ...searchSuccessPage, - url: "http://example.com/unknown", - content: validArticleContent, - }), - mockFetchSuccess({ - ...searchSuccessPage, - url: "http://example.com/future", - content: validArticleContent, - fetchMetadata: { publishedTime: isoDaysAhead(5) }, - }), - ]); + vi.mocked(performWebFetch).mockImplementationOnce( + fetchYielding([ + mockFetchSuccess({ + ...searchSuccessPage, + url: "http://example.com/fresh", + content: validArticleContent, + fetchMetadata: { publishedTime: isoDaysAgo(2) }, + }), + mockFetchSuccess({ + ...searchSuccessPage, + url: "http://example.com/stale", + content: validArticleContent, + fetchMetadata: { publishedTime: isoDaysAgo(30) }, + }), + mockFetchSuccess({ + ...searchSuccessPage, + url: "http://example.com/unknown", + content: validArticleContent, + }), + mockFetchSuccess({ + ...searchSuccessPage, + url: "http://example.com/future", + content: validArticleContent, + fetchMetadata: { publishedTime: isoDaysAhead(5) }, + }), + ]), + ); const result = await runDataCollection( createContext({ @@ -548,7 +576,7 @@ describe("runDataCollection", () => { it("does not call dataCollection.create when there are no fetch successes", async () => { // Setup - vi.mocked(performWebFetch).mockResolvedValueOnce([]); + vi.mocked(performWebFetch).mockImplementationOnce(fetchYielding([])); // Act await runDataCollection( @@ -571,17 +599,19 @@ describe("runDataCollection", () => { it("records partial_success and persists fetch failures", async () => { // Setup - vi.mocked(performWebFetch).mockResolvedValueOnce([ - mockFetchFailure({ - url: "http://failed.com", - queryId: "sq-1", - tickerId: TICKER_ID, - errorCategory: "provider_http_error", - message: "404 Not Found", - retryable: false, - httpStatus: 404, - }), - ]); + vi.mocked(performWebFetch).mockImplementationOnce( + fetchYielding([ + mockFetchFailure({ + url: "http://failed.com", + queryId: "sq-1", + tickerId: TICKER_ID, + errorCategory: "provider_http_error", + message: "404 Not Found", + retryable: false, + httpStatus: 404, + }), + ]), + ); // Act const result = await runDataCollection( @@ -621,7 +651,7 @@ describe("runDataCollection", () => { it("returns semantic failure when run policy requires successes but none were collected", async () => { // Setup vi.mocked(performWebSearch).mockResolvedValueOnce([]); - vi.mocked(performWebFetch).mockResolvedValueOnce([]); + vi.mocked(performWebFetch).mockImplementationOnce(fetchYielding([])); // Act const result = await runDataCollection(createContext()); @@ -661,12 +691,14 @@ describe("runDataCollection", () => { data: searchSuccessPage, }, ]); - vi.mocked(performWebFetch).mockResolvedValueOnce([ - mockFetchSuccess({ - ...searchSuccessPage, - content: validArticleContent, - }), - ]); + vi.mocked(performWebFetch).mockImplementationOnce( + fetchYielding([ + mockFetchSuccess({ + ...searchSuccessPage, + content: validArticleContent, + }), + ]), + ); // Act const result = await runDataCollection( @@ -713,7 +745,7 @@ describe("runDataCollection", () => { }, }, ]); - vi.mocked(performWebFetch).mockResolvedValueOnce([]); + vi.mocked(performWebFetch).mockImplementationOnce(fetchYielding([])); // Act const result = await runDataCollection(createContext()); @@ -755,26 +787,28 @@ describe("runDataCollection", () => { }, }, ]); - vi.mocked(performWebFetch).mockResolvedValueOnce([ - mockFetchSuccess({ - ...searchSuccessPage, - url: "https://example.com/clean", - title: validArticleTitle, - content: validArticleContent, - }), - mockFetchSuccess({ - ...searchSuccessPage, - url: "https://example.com/off-topic", - title: "Microsoft earnings headline here", - content: offTopicContent, - }), - mockFetchSuccess({ - ...searchSuccessPage, - url: "https://example.com/paywall", - title: "Premium article headline here", - content: paywallContent, - }), - ]); + vi.mocked(performWebFetch).mockImplementationOnce( + fetchYielding([ + mockFetchSuccess({ + ...searchSuccessPage, + url: "https://example.com/clean", + title: validArticleTitle, + content: validArticleContent, + }), + mockFetchSuccess({ + ...searchSuccessPage, + url: "https://example.com/off-topic", + title: "Microsoft earnings headline here", + content: offTopicContent, + }), + mockFetchSuccess({ + ...searchSuccessPage, + url: "https://example.com/paywall", + title: "Premium article headline here", + content: paywallContent, + }), + ]), + ); // Act const result = await runDataCollection( @@ -850,32 +884,34 @@ describe("runDataCollection", () => { }, }, ]); - vi.mocked(performWebFetch).mockResolvedValueOnce([ - mockFetchSuccess({ - ...searchSuccessPage, - url: "https://example.com/clean", - title: validArticleTitle, - content: validArticleContent, - }), - mockFetchSuccess({ - ...searchSuccessPage, - url: "https://example.com/paywall", - title: "Premium article headline here", - content: paywallContent, - }), - mockFetchSuccess({ - ...searchSuccessPage, - url: "https://example.com/soft-404", - title: "Missing article headline here", - content: soft404Content, - }), - mockFetchSuccess({ - ...searchSuccessPage, - url: "https://example.com/short", - title: "Valid headline for short body", - content: shortContent, - }), - ]); + vi.mocked(performWebFetch).mockImplementationOnce( + fetchYielding([ + mockFetchSuccess({ + ...searchSuccessPage, + url: "https://example.com/clean", + title: validArticleTitle, + content: validArticleContent, + }), + mockFetchSuccess({ + ...searchSuccessPage, + url: "https://example.com/paywall", + title: "Premium article headline here", + content: paywallContent, + }), + mockFetchSuccess({ + ...searchSuccessPage, + url: "https://example.com/soft-404", + title: "Missing article headline here", + content: soft404Content, + }), + mockFetchSuccess({ + ...searchSuccessPage, + url: "https://example.com/short", + title: "Valid headline for short body", + content: shortContent, + }), + ]), + ); // Act const result = await runDataCollection( @@ -923,14 +959,16 @@ describe("runDataCollection", () => { }, }, ]); - vi.mocked(performWebFetch).mockResolvedValueOnce([ - mockFetchSuccess({ - ...searchSuccessPage, - title: "Company profile and key statistics", - url: "https://example.com/stocks", - content: `Financial summary and key statistics with market cap details. ${Array.from({ length: 120 }, (_, index) => `Detail paragraph ${index} covers regional lending trends.`).join(" ")}`, - }), - ]); + vi.mocked(performWebFetch).mockImplementationOnce( + fetchYielding([ + mockFetchSuccess({ + ...searchSuccessPage, + title: "Company profile and key statistics", + url: "https://example.com/stocks", + content: `Financial summary and key statistics with market cap details. ${Array.from({ length: 120 }, (_, index) => `Detail paragraph ${index} covers regional lending trends.`).join(" ")}`, + }), + ]), + ); // Act const result = await runDataCollection(createContext()); @@ -989,18 +1027,22 @@ describe("runDataCollection", () => { }, ]); vi.mocked(performWebFetch) - .mockResolvedValueOnce([ - mockFetchSuccess({ - ...searchSuccessPage, - content: validArticleContent, - }), - ]) - .mockResolvedValueOnce([ - mockFetchSuccess({ - ...searchSuccessPage, - content: validArticleContent, - }), - ]); + .mockImplementationOnce( + fetchYielding([ + mockFetchSuccess({ + ...searchSuccessPage, + content: validArticleContent, + }), + ]), + ) + .mockImplementationOnce( + fetchYielding([ + mockFetchSuccess({ + ...searchSuccessPage, + content: validArticleContent, + }), + ]), + ); // Act const result = await runDataCollection( @@ -1027,7 +1069,7 @@ describe("runDataCollection", () => { it("stops refill when no progress is made in a round", async () => { // Setup vi.mocked(performWebSearch).mockResolvedValue([]); - vi.mocked(performWebFetch).mockResolvedValue([]); + vi.mocked(performWebFetch).mockImplementation(fetchYielding([])); // Act const result = await runDataCollection( @@ -1056,12 +1098,14 @@ describe("runDataCollection", () => { data: searchSuccessPage, }, ]); - vi.mocked(performWebFetch).mockResolvedValue([ - mockFetchSuccess({ - ...searchSuccessPage, - content: validArticleContent, - }), - ]); + vi.mocked(performWebFetch).mockImplementation( + fetchYielding([ + mockFetchSuccess({ + ...searchSuccessPage, + content: validArticleContent, + }), + ]), + ); // Act const result = await runDataCollection( @@ -1103,14 +1147,16 @@ describe("runDataCollection", () => { ); vi.mocked(performWebSearch).mockResolvedValueOnce(searchHits); - vi.mocked(performWebFetch).mockImplementation(async (results) => - results.map((page) => + vi.mocked(performWebFetch).mockImplementation((results, deps) => { + const outcomes = results.map((page) => mockFetchSuccess({ ...page, content: validArticleContent, }), - ), - ); + ); + + return fetchYielding(outcomes)(results, deps); + }); // Act await runDataCollection( diff --git a/apps/mediapulse/agents/data-collection/src/run.ts b/apps/mediapulse/agents/data-collection/src/run.ts index df5bedcc..d84313a6 100644 --- a/apps/mediapulse/agents/data-collection/src/run.ts +++ b/apps/mediapulse/agents/data-collection/src/run.ts @@ -27,6 +27,7 @@ import { HostErrorTracker, hostFromUrl, type QualityDropForDeadUrl, + type FetchedWebSearchResult, buildTickerAliases, buildIndustryAliases, isRelevant, @@ -393,38 +394,20 @@ export async function runDataCollection( ...narrativeFetchStart(subject, searchSuccessesAfterHostBreaker.length), ); - const fetchThrottleStats = { throttleEvents: 0 }; - const fetchAttemptResults = await performWebFetch( - searchSuccessesAfterHostBreaker, - { - config: webFetchConfig, - logger: log, - throttleStats: fetchThrottleStats, - hostErrorTracker, - }, - ); - throttleEvents += fetchThrottleStats.throttleEvents; - const roundFetchSuccesses = fetchAttemptResults - .filter((outcome) => outcome.success !== null) - .map((outcome) => outcome.success!); - const roundFetchFailures = fetchAttemptResults.flatMap( - (outcome) => outcome.failures, - ); - const roundFailedUrlCount = fetchAttemptResults.filter( - (outcome) => outcome.success === null, - ).length; - fetchedCount += roundFetchSuccesses.length; - fetchFailedCount += roundFailedUrlCount; - fetchFailures.push(...roundFetchFailures); - let persistedThisRoundCount = 0; const roundQualityDrops: QualityDropForDeadUrl[] = []; - report(...narrativeSavingSources(subject, roundFetchSuccesses.length)); - for (const page of roundFetchSuccesses) { + + // Persist a single fetched page as soon as its fetch resolves, so each + // source reaches the Agent Data API immediately instead of waiting for the + // whole round's fetch batch to finish. Invoked per URL from performWebFetch + // via the onOutcome hook below. + const persistFetchedPage = async ( + page: FetchedWebSearchResult, + ): Promise => { const urlDecision = classifyNoisyUrl(page.url); if (urlDecision.blocked) { droppedByUrlReason[urlDecision.reason] += 1; - continue; + return; } const contentDecision = runQualityGate( @@ -438,7 +421,7 @@ export async function runDataCollection( url: urlDecision.canonicalUrl, reason: contentDecision.reason, }); - continue; + return; } if (relevanceGateConfig.enabled) { @@ -464,7 +447,7 @@ export async function runDataCollection( }, "dropped page that did not mention the target ticker or industry", ); - continue; + return; } } @@ -490,7 +473,7 @@ export async function runDataCollection( }, "dropped page outside freshness window", ); - continue; + return; } } @@ -510,7 +493,43 @@ export async function runDataCollection( persistedThisRunCount += 1; persistedThisRoundCount += 1; fetchSuccessCount += 1; - } + }; + + report( + ...narrativeSavingSources( + subject, + searchSuccessesAfterHostBreaker.length, + ), + ); + + const fetchThrottleStats = { throttleEvents: 0 }; + const fetchAttemptResults = await performWebFetch( + searchSuccessesAfterHostBreaker, + { + config: webFetchConfig, + logger: log, + throttleStats: fetchThrottleStats, + hostErrorTracker, + onOutcome: async (outcome) => { + if (outcome.success !== null) { + await persistFetchedPage(outcome.success); + } + }, + }, + ); + throttleEvents += fetchThrottleStats.throttleEvents; + const roundFetchSuccesses = fetchAttemptResults + .filter((outcome) => outcome.success !== null) + .map((outcome) => outcome.success!); + const roundFetchFailures = fetchAttemptResults.flatMap( + (outcome) => outcome.failures, + ); + const roundFailedUrlCount = fetchAttemptResults.filter( + (outcome) => outcome.success === null, + ).length; + fetchedCount += roundFetchSuccesses.length; + fetchFailedCount += roundFailedUrlCount; + fetchFailures.push(...roundFetchFailures); log.info( { diff --git a/apps/mediapulse/user-registration/Dockerfile b/apps/mediapulse/user-registration/Dockerfile index 4e968d56..4b35136e 100644 --- a/apps/mediapulse/user-registration/Dockerfile +++ b/apps/mediapulse/user-registration/Dockerfile @@ -18,6 +18,7 @@ COPY pnpm-lock.yaml package.json pnpm-workspace.yaml turbo.json ./ COPY packages ./packages COPY apps ./apps +COPY patches ./patches RUN pnpm install --frozen-lockfile diff --git a/package.json b/package.json index 27d846ee..9e59128f 100644 --- a/package.json +++ b/package.json @@ -62,6 +62,9 @@ "prisma", "sharp", "unix-dgram" - ] + ], + "patchedDependencies": { + "@nicnocquee/dataqueue@1.39.0": "patches/@nicnocquee__dataqueue@1.39.0.patch" + } } } diff --git a/packages/shared/agent-ingestion/src/web-fetch.test.ts b/packages/shared/agent-ingestion/src/web-fetch.test.ts index 7d5e2828..18547c94 100644 --- a/packages/shared/agent-ingestion/src/web-fetch.test.ts +++ b/packages/shared/agent-ingestion/src/web-fetch.test.ts @@ -10,7 +10,7 @@ vi.mock("@workspace/logger", () => ({ }, })); -import type { WebSearchResult } from "./web-fetch"; +import type { WebFetchOutcome, WebSearchResult } from "./web-fetch"; import { performWebFetch } from "./web-fetch"; const jinaProviderConfig = { @@ -301,6 +301,43 @@ describe("performWebFetch", () => { expect(acquireCounts.firecrawl).toBe(2); }); + it("streams each fetch outcome through onOutcome as it resolves", async () => { + // Setup + const postMock = vi.fn().mockReturnValue( + mockGotPostResponse({ + data: { + url: "http://example.com", + title: "Title", + content: "Full content", + }, + }), + ); + const fakeGot = { post: postMock } as unknown as typeof got; + const streamed: WebFetchOutcome[] = []; + + // Act + const result = await performWebFetch( + [ + { ...baseSearchResult, searchQueryId: "q1" }, + { ...baseSearchResult, searchQueryId: "q2" }, + ], + { + config: { providers: [jinaProviderConfig] }, + gotClient: fakeGot, + onOutcome: (outcome) => { + streamed.push(outcome); + }, + }, + ); + + // Assert — the hook fires once per URL with each resolved outcome + expect(streamed).toHaveLength(2); + expect( + streamed.map((outcome) => outcome.success?.searchQueryId).sort(), + ).toEqual(["q1", "q2"]); + expect(result).toHaveLength(2); + }); + it("logs a warning with a truncated URL when fetch fails and the URL is very long", async () => { // Setup const warnMock = vi.fn(); diff --git a/packages/shared/agent-ingestion/src/web-fetch.ts b/packages/shared/agent-ingestion/src/web-fetch.ts index 2eb870ea..fe803cc8 100644 --- a/packages/shared/agent-ingestion/src/web-fetch.ts +++ b/packages/shared/agent-ingestion/src/web-fetch.ts @@ -69,6 +69,14 @@ export interface WebFetchDeps { throttleStats?: StageThrottleStats; /** Optional per-run host error tracker updated on every fetch attempt. */ hostErrorTracker?: HostErrorTracker; + /** + * Optional hook invoked with each fetch outcome the moment it resolves, before the + * rest of the batch finishes. Lets callers stream per-URL side effects (for example, + * persisting a successful page immediately) instead of waiting for every fetch in the + * batch to complete. Awaited within the fetch concurrency slot, so a rejecting hook + * aborts the batch the same way a failing fetch would. + */ + onOutcome?: (outcome: WebFetchOutcome) => void | Promise; } type ProviderChainEntry = { @@ -257,7 +265,13 @@ export async function performWebFetch( searchResults: WebSearchResult[], deps: WebFetchDeps, ): Promise { - const { config, gotClient = got, logger: logOpt, throttleStats } = deps; + const { + config, + gotClient = got, + logger: logOpt, + throttleStats, + onOutcome, + } = deps; const log = logOpt ?? defaultLogger; const providerConfigs = config.providers; @@ -279,7 +293,13 @@ export async function performWebFetch( const results = await pMap( searchResults, - (result) => fetchOneResult(result, chain, sharedDeps), + async (result) => { + const outcome = await fetchOneResult(result, chain, sharedDeps); + if (onOutcome) { + await onOutcome(outcome); + } + return outcome; + }, { concurrency }, ); diff --git a/patches/@nicnocquee__dataqueue@1.39.0.patch b/patches/@nicnocquee__dataqueue@1.39.0.patch new file mode 100644 index 00000000..181f8d5f --- /dev/null +++ b/patches/@nicnocquee__dataqueue@1.39.0.patch @@ -0,0 +1,282 @@ +diff --git a/dist/index.cjs b/dist/index.cjs +index ba6a07535d9fac7b0acbb0bba474ef3cb26eda66..90ea5161c97b42198d0dc55b26a1ec464ea644ce 100644 +--- a/dist/index.cjs ++++ b/dist/index.cjs +@@ -593,8 +593,11 @@ var createProcessor = (backend, handlers, options = {}, onBeforeBatch, emit) => + ); + } + let running = false; +- let intervalId = null; +- let currentBatchPromise = null; ++ let pollTimer = null; ++ let claimInProgress = false; ++ let pumpRequested = false; ++ let inFlight = 0; ++ const inFlightJobs = new Set(); + setLogContext(options.verbose ?? false); + const processJobs = async () => { + if (!running) return 0; +@@ -643,22 +646,75 @@ var createProcessor = (backend, handlers, options = {}, onBeforeBatch, emit) => + if (running) return; + log(`Starting job processor with workerId: ${workerId}`); + running = true; +- const scheduleNext = (immediate) => { ++ const pump = async () => { + if (!running) return; +- if (immediate) { +- intervalId = setTimeout(loop, 0); +- } else { +- intervalId = setTimeout(loop, pollInterval); ++ if (claimInProgress) { ++ pumpRequested = true; ++ return; ++ } ++ if (inFlight >= concurrency) return; ++ claimInProgress = true; ++ pumpRequested = false; ++ let claimLimit = 0; ++ let claimed = 0; ++ try { ++ claimLimit = Math.min(concurrency - inFlight, batchSize); ++ const jobs = await backend.getNextBatch( ++ workerId, ++ claimLimit, ++ jobType, ++ groupConcurrency ++ ); ++ claimed = jobs.length; ++ for (const job of jobs) { ++ emit?.("job:processing", { jobId: job.id, jobType: job.jobType }); ++ inFlight++; ++ const jobPromise = processJobWithHandlers( ++ backend, ++ job, ++ handlers, ++ emit ++ ).catch((err) => { ++ onError(err instanceof Error ? err : new Error(String(err))); ++ }).finally(() => { ++ inFlight--; ++ inFlightJobs.delete(jobPromise); ++ void pump(); ++ }); ++ inFlightJobs.add(jobPromise); ++ } ++ } catch (error) { ++ const err = error instanceof Error ? error : new Error(String(error)); ++ onError(err); ++ emit?.("error", err); ++ } finally { ++ claimInProgress = false; ++ } ++ const moreLikely = claimed === claimLimit || pumpRequested; ++ if (running && moreLikely && inFlight < concurrency) { ++ void pump(); + } + }; +- const loop = async () => { ++ const tick = async () => { + if (!running) return; +- currentBatchPromise = processJobs(); +- const processed = await currentBatchPromise; +- currentBatchPromise = null; +- scheduleNext(processed === batchSize); ++ if (onBeforeBatch) { ++ try { ++ await onBeforeBatch(); ++ } catch (hookError) { ++ log(`onBeforeBatch hook error: ${hookError}`); ++ const err = hookError instanceof Error ? hookError : new Error(String(hookError)); ++ onError(err); ++ emit?.("error", err); ++ } ++ } ++ await pump(); ++ if (running) { ++ pollTimer = setTimeout(() => { ++ void tick(); ++ }, pollInterval); ++ } + }; +- loop(); ++ void tick(); + }, + /** + * Stop the job processor that runs in the background. +@@ -667,9 +723,9 @@ var createProcessor = (backend, handlers, options = {}, onBeforeBatch, emit) => + stop: () => { + log(`Stopping job processor with workerId: ${workerId}`); + running = false; +- if (intervalId) { +- clearTimeout(intervalId); +- intervalId = null; ++ if (pollTimer) { ++ clearTimeout(pollTimer); ++ pollTimer = null; + } + }, + /** +@@ -679,17 +735,15 @@ var createProcessor = (backend, handlers, options = {}, onBeforeBatch, emit) => + stopAndDrain: async (drainTimeoutMs = 3e4) => { + log(`Stopping and draining job processor with workerId: ${workerId}`); + running = false; +- if (intervalId) { +- clearTimeout(intervalId); +- intervalId = null; ++ if (pollTimer) { ++ clearTimeout(pollTimer); ++ pollTimer = null; + } +- if (currentBatchPromise) { ++ if (inFlightJobs.size > 0) { + await Promise.race([ +- currentBatchPromise.catch(() => { +- }), ++ Promise.allSettled(Array.from(inFlightJobs)), + new Promise((resolve) => setTimeout(resolve, drainTimeoutMs)) + ]); +- currentBatchPromise = null; + } + log(`Job processor ${workerId} drained`); + }, +diff --git a/dist/index.js b/dist/index.js +index 2eab8d874c5c99d7470b0f8e0e38f0e618c064e5..5557498b04705214ed460055f12a9e88d7866c60 100644 +--- a/dist/index.js ++++ b/dist/index.js +@@ -586,8 +586,11 @@ var createProcessor = (backend, handlers, options = {}, onBeforeBatch, emit) => + ); + } + let running = false; +- let intervalId = null; +- let currentBatchPromise = null; ++ let pollTimer = null; ++ let claimInProgress = false; ++ let pumpRequested = false; ++ let inFlight = 0; ++ const inFlightJobs = new Set(); + setLogContext(options.verbose ?? false); + const processJobs = async () => { + if (!running) return 0; +@@ -636,22 +639,75 @@ var createProcessor = (backend, handlers, options = {}, onBeforeBatch, emit) => + if (running) return; + log(`Starting job processor with workerId: ${workerId}`); + running = true; +- const scheduleNext = (immediate) => { ++ const pump = async () => { + if (!running) return; +- if (immediate) { +- intervalId = setTimeout(loop, 0); +- } else { +- intervalId = setTimeout(loop, pollInterval); ++ if (claimInProgress) { ++ pumpRequested = true; ++ return; ++ } ++ if (inFlight >= concurrency) return; ++ claimInProgress = true; ++ pumpRequested = false; ++ let claimLimit = 0; ++ let claimed = 0; ++ try { ++ claimLimit = Math.min(concurrency - inFlight, batchSize); ++ const jobs = await backend.getNextBatch( ++ workerId, ++ claimLimit, ++ jobType, ++ groupConcurrency ++ ); ++ claimed = jobs.length; ++ for (const job of jobs) { ++ emit?.("job:processing", { jobId: job.id, jobType: job.jobType }); ++ inFlight++; ++ const jobPromise = processJobWithHandlers( ++ backend, ++ job, ++ handlers, ++ emit ++ ).catch((err) => { ++ onError(err instanceof Error ? err : new Error(String(err))); ++ }).finally(() => { ++ inFlight--; ++ inFlightJobs.delete(jobPromise); ++ void pump(); ++ }); ++ inFlightJobs.add(jobPromise); ++ } ++ } catch (error) { ++ const err = error instanceof Error ? error : new Error(String(error)); ++ onError(err); ++ emit?.("error", err); ++ } finally { ++ claimInProgress = false; ++ } ++ const moreLikely = claimed === claimLimit || pumpRequested; ++ if (running && moreLikely && inFlight < concurrency) { ++ void pump(); + } + }; +- const loop = async () => { ++ const tick = async () => { + if (!running) return; +- currentBatchPromise = processJobs(); +- const processed = await currentBatchPromise; +- currentBatchPromise = null; +- scheduleNext(processed === batchSize); ++ if (onBeforeBatch) { ++ try { ++ await onBeforeBatch(); ++ } catch (hookError) { ++ log(`onBeforeBatch hook error: ${hookError}`); ++ const err = hookError instanceof Error ? hookError : new Error(String(hookError)); ++ onError(err); ++ emit?.("error", err); ++ } ++ } ++ await pump(); ++ if (running) { ++ pollTimer = setTimeout(() => { ++ void tick(); ++ }, pollInterval); ++ } + }; +- loop(); ++ void tick(); + }, + /** + * Stop the job processor that runs in the background. +@@ -660,9 +716,9 @@ var createProcessor = (backend, handlers, options = {}, onBeforeBatch, emit) => + stop: () => { + log(`Stopping job processor with workerId: ${workerId}`); + running = false; +- if (intervalId) { +- clearTimeout(intervalId); +- intervalId = null; ++ if (pollTimer) { ++ clearTimeout(pollTimer); ++ pollTimer = null; + } + }, + /** +@@ -672,17 +728,15 @@ var createProcessor = (backend, handlers, options = {}, onBeforeBatch, emit) => + stopAndDrain: async (drainTimeoutMs = 3e4) => { + log(`Stopping and draining job processor with workerId: ${workerId}`); + running = false; +- if (intervalId) { +- clearTimeout(intervalId); +- intervalId = null; ++ if (pollTimer) { ++ clearTimeout(pollTimer); ++ pollTimer = null; + } +- if (currentBatchPromise) { ++ if (inFlightJobs.size > 0) { + await Promise.race([ +- currentBatchPromise.catch(() => { +- }), ++ Promise.allSettled(Array.from(inFlightJobs)), + new Promise((resolve) => setTimeout(resolve, drainTimeoutMs)) + ]); +- currentBatchPromise = null; + } + log(`Job processor ${workerId} drained`); + }, diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index c16eed6d..d3ebcd97 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -100,6 +100,11 @@ catalogs: specifier: ^3 version: 3.25.76 +patchedDependencies: + '@nicnocquee/dataqueue@1.39.0': + hash: f113eb2707567ba904a1ade4726137c12c52e6049db3eb31f273747bf9311d5c + path: patches/@nicnocquee__dataqueue@1.39.0.patch + importers: .: @@ -252,7 +257,7 @@ importers: version: 16.2.3 '@nicnocquee/dataqueue': specifier: ^1.39.0 - version: 1.39.0(pg@8.18.0) + version: 1.39.0(patch_hash=f113eb2707567ba904a1ade4726137c12c52e6049db3eb31f273747bf9311d5c)(pg@8.18.0) '@workspace/agent-auth-client': specifier: workspace:* version: link:../../../packages/shared/agent-auth-client @@ -297,7 +302,7 @@ importers: version: 0.562.0(react@19.2.4) next: specifier: 'catalog:' - version: 16.2.3(@opentelemetry/api@1.9.0)(react-dom@19.2.4(react@19.2.4))(react@19.2.4) + version: 16.2.3(@babel/core@7.29.0)(@opentelemetry/api@1.9.0)(react-dom@19.2.4(react@19.2.4))(react@19.2.4) next-themes: specifier: 'catalog:' version: 0.4.6(react-dom@19.2.4(react@19.2.4))(react@19.2.4) @@ -315,7 +320,7 @@ importers: version: 6.9.2(@react-email/render@1.4.0(react-dom@19.2.4(react@19.2.4))(react@19.2.4)) route-action-gen: specifier: 'catalog:' - version: 0.0.10(next@16.2.3(@opentelemetry/api@1.9.0)(react-dom@19.2.4(react@19.2.4))(react@19.2.4))(react@19.2.4)(zod@3.25.76) + version: 0.0.10(next@16.2.3(@babel/core@7.29.0)(@opentelemetry/api@1.9.0)(react-dom@19.2.4(react@19.2.4))(react@19.2.4))(react@19.2.4)(zod@3.25.76) sonner: specifier: 'catalog:' version: 2.0.7(react-dom@19.2.4(react@19.2.4))(react@19.2.4) @@ -397,7 +402,7 @@ importers: version: link:../../../packages/hermes/step-input-syntax '@nicnocquee/dataqueue': specifier: ^1.39.0 - version: 1.39.0(pg@8.18.0) + version: 1.39.0(patch_hash=f113eb2707567ba904a1ade4726137c12c52e6049db3eb31f273747bf9311d5c)(pg@8.18.0) '@workspace/agent-auth-client': specifier: workspace:* version: link:../../../packages/shared/agent-auth-client @@ -1077,7 +1082,7 @@ importers: version: 0.562.0(react@19.2.4) next: specifier: 'catalog:' - version: 16.2.3(@opentelemetry/api@1.9.0)(react-dom@19.2.4(react@19.2.4))(react@19.2.4) + version: 16.2.3(@babel/core@7.29.0)(@opentelemetry/api@1.9.0)(react-dom@19.2.4(react@19.2.4))(react@19.2.4) next-themes: specifier: 'catalog:' version: 0.4.6(react-dom@19.2.4(react@19.2.4))(react@19.2.4) @@ -1319,7 +1324,7 @@ importers: version: link:../orchestration-database '@nicnocquee/dataqueue': specifier: ^1.39.0 - version: 1.39.0(pg@8.18.0) + version: 1.39.0(patch_hash=f113eb2707567ba904a1ade4726137c12c52e6049db3eb31f273747bf9311d5c)(pg@8.18.0) ajv: specifier: ^8.17.1 version: 8.17.1 @@ -11919,7 +11924,7 @@ snapshots: '@next/swc-win32-x64-msvc@16.2.3': optional: true - '@nicnocquee/dataqueue@1.39.0(pg@8.18.0)': + '@nicnocquee/dataqueue@1.39.0(patch_hash=f113eb2707567ba904a1ade4726137c12c52e6049db3eb31f273747bf9311d5c)(pg@8.18.0)': dependencies: '@modelcontextprotocol/sdk': 1.27.1(zod@3.25.76) croner: 10.0.1 @@ -17326,7 +17331,7 @@ snapshots: react: 19.2.4 react-dom: 19.2.4(react@19.2.4) - next@16.2.3(@opentelemetry/api@1.9.0)(react-dom@19.2.4(react@19.2.4))(react@19.2.4): + next@16.2.3(@babel/core@7.29.0)(@opentelemetry/api@1.9.0)(react-dom@19.2.4(react@19.2.4))(react@19.2.4): dependencies: '@next/env': 16.2.3 '@swc/helpers': 0.5.15 @@ -17335,7 +17340,7 @@ snapshots: postcss: 8.4.31 react: 19.2.4 react-dom: 19.2.4(react@19.2.4) - styled-jsx: 5.1.6(react@19.2.4) + styled-jsx: 5.1.6(@babel/core@7.29.0)(react@19.2.4) optionalDependencies: '@next/swc-darwin-arm64': 16.2.3 '@next/swc-darwin-x64': 16.2.3 @@ -18461,12 +18466,12 @@ snapshots: '@rollup/rollup-win32-x64-msvc': 4.57.1 fsevents: 2.3.3 - route-action-gen@0.0.10(next@16.2.3(@opentelemetry/api@1.9.0)(react-dom@19.2.4(react@19.2.4))(react@19.2.4))(react@19.2.4)(zod@3.25.76): + route-action-gen@0.0.10(next@16.2.3(@babel/core@7.29.0)(@opentelemetry/api@1.9.0)(react-dom@19.2.4(react@19.2.4))(react@19.2.4))(react@19.2.4)(zod@3.25.76): dependencies: glob: 13.0.2 zod: 3.25.76 optionalDependencies: - next: 16.2.3(@opentelemetry/api@1.9.0)(react-dom@19.2.4(react@19.2.4))(react@19.2.4) + next: 16.2.3(@babel/core@7.29.0)(@opentelemetry/api@1.9.0)(react-dom@19.2.4(react@19.2.4))(react@19.2.4) react: 19.2.4 router@2.2.0: @@ -18954,10 +18959,12 @@ snapshots: stubborn-utils@1.0.2: {} - styled-jsx@5.1.6(react@19.2.4): + styled-jsx@5.1.6(@babel/core@7.29.0)(react@19.2.4): dependencies: client-only: 0.0.1 react: 19.2.4 + optionalDependencies: + '@babel/core': 7.29.0 supports-color@10.2.2: {} diff --git a/scripts/lib/cursor-pr-review.mjs b/scripts/lib/cursor-pr-review.mjs index f4a112dd..8b0a8020 100644 --- a/scripts/lib/cursor-pr-review.mjs +++ b/scripts/lib/cursor-pr-review.mjs @@ -34,8 +34,14 @@ const isTsOrTsx = (filePath) => const isTsx = (filePath) => /\.tsx$/.test(filePath); -/** Markdown docs are excluded from the kebab-case new-file check (naming varies by tool conventions). */ -const isKebabCheckSkippedPath = (filePath) => /\.mdx?$/i.test(filePath); +/** + * Markdown docs and pnpm patch files are excluded from the kebab-case new-file + * check. pnpm generates patch filenames like `@scope__pkg@1.0.0.patch` which + * cannot be kebab-case by convention. + */ +const isKebabCheckSkippedPath = (filePath) => + /\.mdx?$/i.test(filePath) || + /^patches\//i.test(normalizeRepoPath(filePath)); /** Repo-root `scripts/` only (not e.g. `packages/foo/scripts/`). */ const normalizeRepoPath = (filePath) => filePath.replaceAll("\\", "/");