From 289f263bdb8b72f164e7adebc6c6821263e2e0e5 Mon Sep 17 00:00:00 2001 From: Kevin Hermawan Date: Fri, 12 Jun 2026 07:47:19 +0700 Subject: [PATCH] Refill freed concurrency slots continuously instead of per batch startInBackground processed jobs in fixed batches behind a barrier: it claimed up to a batch, then waited for every job in that batch to settle before claiming again. A single slow job left the other concurrency slots idle until it finished. With groupConcurrency it stalled the whole group, because each claim is capped below the number of ready jobs, so the rest never got claimed until the in-flight batch fully drained. Replace the batch-barrier loop with a continuous worker pool that keeps up to `concurrency` jobs in flight and refills each slot the moment it frees, never exceeding groupConcurrency. A periodic tick still enqueues due cron jobs and polls for new work every pollInterval, and stopAndDrain now waits on the set of in-flight jobs. No public API change. The one-shot start() path is untouched. Add a regression test that reproduces the group stall: a slow job plus several fast jobs in one group under groupConcurrency. It times out on the old barrier and passes with the continuous pool. --- packages/dataqueue/src/processor.test.ts | 129 ++++++++++++++++++++++ packages/dataqueue/src/processor.ts | 135 ++++++++++++++++++----- packages/dataqueue/src/types.ts | 19 +++- 3 files changed, 249 insertions(+), 34 deletions(-) diff --git a/packages/dataqueue/src/processor.test.ts b/packages/dataqueue/src/processor.test.ts index 734b386..7d567c5 100644 --- a/packages/dataqueue/src/processor.test.ts +++ b/packages/dataqueue/src/processor.test.ts @@ -33,6 +33,25 @@ async function claimJob(p: Pool, jobId: number) { ); } +/** + * Polls `predicate` until it is truthy or the timeout elapses. + * Throws if the condition is never met, so a hung pool fails fast instead of + * timing out the whole test. + */ +async function waitFor( + predicate: () => boolean | Promise, + timeoutMs: number, + pollMs = 20, +): Promise { + const deadline = Date.now() + timeoutMs; + while (!(await predicate())) { + if (Date.now() > deadline) { + throw new Error(`waitFor: condition not met within ${timeoutMs}ms`); + } + await new Promise((r) => setTimeout(r, pollMs)); + } +} + // Integration tests for processor describe('processor integration', () => { @@ -494,6 +513,116 @@ describe('concurrency option', () => { 'Processor option "groupConcurrency" must be a positive integer when provided.', ); }); + + it('should refill freed group slots while a slow job runs, honoring groupConcurrency (continuous pool)', async () => { + // Regression guard for the batch-barrier stall on grouped pipelines. + // groupConcurrency caps each claim below the number of ready jobs, so the + // rest land in later claims. With the old loop those later claims never + // happened until the whole in-flight batch drained, so one slow job stalled + // the entire group. The continuous pool must keep refilling the freed group + // slot — without ever exceeding groupConcurrency — so every fast job + // completes while the slow job is still in flight. + const GROUP = 'pipeline-1'; + const FAST_JOBS = 5; + + let releaseSlow!: () => void; + const slowReleased = new Promise((resolve) => { + releaseSlow = resolve; + }); + let markSlowStarted!: () => void; + const slowStarted = new Promise((resolve) => { + markSlowStarted = resolve; + }); + let fastCompleted = 0; + let inHandler = 0; + let maxInHandler = 0; + + const handler = async (payload: { slow?: boolean }) => { + inHandler++; + maxInHandler = Math.max(maxInHandler, inHandler); + try { + if (payload.slow) { + markSlowStarted(); + await slowReleased; + + return; + } + fastCompleted++; + } finally { + inHandler--; + } + }; + const handlers = { test: handler }; + + // High priority so the slow job is always claimed into the first group slot. + const slowId = await queue.addJob<{ test: { slow?: boolean } }, 'test'>( + pool, + { + jobType: 'test', + payload: { slow: true }, + priority: 10, + group: { id: GROUP }, + }, + ); + for (let i = 0; i < FAST_JOBS; i++) { + await queue.addJob<{ test: { slow?: boolean } }, 'test'>(pool, { + jobType: 'test', + payload: {}, + group: { id: GROUP }, + }); + } + + const processor = createProcessor(backend, handlers, { + batchSize: 10, + concurrency: 2, + groupConcurrency: 2, + pollInterval: 50, + }); + processor.startInBackground(); + try { + // Wait until the slow job actually occupies a group slot. + await slowStarted; + + // Every fast job must finish while the slow job is still in flight. Under + // the old barrier this never happens (the loop waits on the slow job + // before claiming the rest of the group), so waitFor throws instead of + // the whole test hanging. + await waitFor(() => fastCompleted === FAST_JOBS, 5000); + + expect(fastCompleted).toBe(FAST_JOBS); + // groupConcurrency was honored throughout (slow + at most one fast). + expect(maxInHandler).toBeLessThanOrEqual(2); + + // The slow job never blocked the group — it is still processing. + const slowJob = await queue.getJob<{ test: { slow?: boolean } }, 'test'>( + pool, + slowId, + ); + + expect(slowJob?.status).toBe('processing'); + + // Release the slow job and let everything drain cleanly. + releaseSlow(); + await waitFor(async () => { + const job = await queue.getJob<{ test: { slow?: boolean } }, 'test'>( + pool, + slowId, + ); + + return job?.status === 'completed'; + }, 5000); + } finally { + releaseSlow(); + await processor.stopAndDrain(5000); + } + + const completed = await queue.getJobsByStatus< + { test: { slow?: boolean } }, + 'test' + >(pool, 'completed'); + + expect(completed.length).toBe(FAST_JOBS + 1); + }); }); describe('per-job timeout', () => { diff --git a/packages/dataqueue/src/processor.ts b/packages/dataqueue/src/processor.ts index feb8d43..49b6808 100644 --- a/packages/dataqueue/src/processor.ts +++ b/packages/dataqueue/src/processor.ts @@ -858,8 +858,16 @@ export const createProcessor = ( } let running = false; - let intervalId: NodeJS.Timeout | null = null; - let currentBatchPromise: Promise | null = null; + // Periodic timer that drives cron enqueueing and idle polling. + let pollTimer: NodeJS.Timeout | null = null; + // True while a getNextBatch claim is in progress, so claims stay serialized. + let claimInProgress = false; + // Set when a refill was requested while a claim was already running. + let pumpRequested = false; + // Number of jobs this processor currently has in flight. + let inFlight = 0; + // Promises for the in-flight jobs, awaited during graceful drain. + const inFlightJobs = new Set>(); setLogContext(options.verbose ?? false); @@ -909,7 +917,10 @@ export const createProcessor = ( return { /** * Start the job processor in the background. - * - This will run periodically (every pollInterval milliseconds or 5 seconds if not provided) and process jobs as they become available. + * - Keeps up to `concurrency` jobs in flight, refilling each slot as soon as + * it frees instead of waiting for a whole batch to settle. + * - Polls every `pollInterval` milliseconds (5 seconds if not provided) for + * new work and to enqueue due cron jobs. * - You have to call the stop method to stop the processor. */ startInBackground: () => { @@ -918,28 +929,97 @@ export const createProcessor = ( log(`Starting job processor with workerId: ${workerId}`); running = true; - // Single serialized loop: process a batch, then either immediately - // continue (if full batch was returned) or wait pollInterval. - const scheduleNext = (immediate: boolean) => { + // Continuous worker pool: keep up to `concurrency` jobs in flight and + // refill each slot the instant it frees, rather than waiting for a whole + // batch to settle. This removes the head-of-line blocking where one slow + // job stalls the other slots (and, with groupConcurrency, the whole + // group) until it finishes. + const pump = async (): Promise => { if (!running) return; - if (immediate) { - intervalId = setTimeout(loop, 0); - } else { - intervalId = setTimeout(loop, pollInterval); + // Serialize claims; remember the request so we refill afterwards. + if (claimInProgress) { + pumpRequested = true; + return; + } + // Pool is full; a finishing job will call pump() again. + if (inFlight >= concurrency) return; + + claimInProgress = true; + pumpRequested = false; + let claimLimit = 0; + let claimed = 0; + try { + claimLimit = Math.min(concurrency - inFlight, batchSize); + const jobs = await backend.getNextBatch< + PayloadMap, + JobType + >(workerId, claimLimit, jobType, groupConcurrency); + claimed = jobs.length; + + for (const job of jobs) { + emit?.('job:processing', { jobId: job.id, jobType: job.jobType }); + inFlight++; + const jobPromise: Promise = processJobWithHandlers( + backend, + job, + handlers, + emit, + ) + .catch((err) => { + onError(err instanceof Error ? err : new Error(String(err))); + }) + .finally(() => { + inFlight--; + inFlightJobs.delete(jobPromise); + // A slot just freed — try to claim more work immediately. + void pump(); + }); + inFlightJobs.add(jobPromise); + } + } catch (error) { + const err = error instanceof Error ? error : new Error(String(error)); + onError(err); + emit?.('error', err); + } finally { + claimInProgress = false; + } + + // Claim again right away when the queue likely still has ready work + // (we filled the whole request) or when a slot freed mid-claim. + const moreLikely = claimed === claimLimit || pumpRequested; + if (running && moreLikely && inFlight < concurrency) { + void pump(); } }; - const loop = async () => { + // Periodic tick: enqueue due cron jobs (onBeforeBatch) on a steady cadence + // regardless of pool saturation, then top up the pool. Also the idle poll + // so newly added jobs are picked up within pollInterval. + const tick = async (): Promise => { if (!running) return; - currentBatchPromise = processJobs(); - const processed = await currentBatchPromise; - currentBatchPromise = null; - // If we got a full batch, there may be more work — process immediately - scheduleNext(processed === batchSize); + if (onBeforeBatch) { + try { + await onBeforeBatch(); + } catch (hookError) { + log(`onBeforeBatch hook error: ${hookError}`); + const err = + hookError instanceof Error + ? hookError + : new Error(String(hookError)); + onError(err); + emit?.('error', err); + } + } + await pump(); + if (running) { + pollTimer = setTimeout(() => { + void tick(); + }, pollInterval); + } }; - // Start the first iteration immediately - loop(); + // Start the first iteration immediately. + void tick(); }, /** * Stop the job processor that runs in the background. @@ -948,9 +1028,9 @@ export const createProcessor = ( stop: () => { log(`Stopping job processor with workerId: ${workerId}`); running = false; - if (intervalId) { - clearTimeout(intervalId); - intervalId = null; + if (pollTimer) { + clearTimeout(pollTimer); + pollTimer = null; } }, /** @@ -960,17 +1040,16 @@ export const createProcessor = ( stopAndDrain: async (drainTimeoutMs = 30000) => { log(`Stopping and draining job processor with workerId: ${workerId}`); running = false; - if (intervalId) { - clearTimeout(intervalId); - intervalId = null; + if (pollTimer) { + clearTimeout(pollTimer); + pollTimer = null; } - // Wait for current batch to finish, with a timeout - if (currentBatchPromise) { + // Wait for all in-flight jobs to finish, with a timeout. + if (inFlightJobs.size > 0) { await Promise.race([ - currentBatchPromise.catch(() => {}), + Promise.allSettled(Array.from(inFlightJobs)), new Promise((resolve) => setTimeout(resolve, drainTimeoutMs)), ]); - currentBatchPromise = null; } log(`Job processor ${workerId} drained`); }, diff --git a/packages/dataqueue/src/types.ts b/packages/dataqueue/src/types.ts index 66edf6e..57686eb 100644 --- a/packages/dataqueue/src/types.ts +++ b/packages/dataqueue/src/types.ts @@ -567,14 +567,20 @@ export type JobHandlers = { export interface ProcessorOptions { workerId?: string; /** - * The number of jobs to process at a time. - * - If not provided, the processor will process 10 jobs at a time. - * - In serverless functions, it's better to process less jobs at a time since serverless functions are charged by the second and have a timeout. + * The maximum number of jobs to claim from the queue per poll. + * - If not provided, up to 10 jobs are claimed per poll. + * - With `startInBackground`, claims are capped to the number of free + * concurrency slots, so this only matters when it is below `concurrency`. + * - In serverless functions, it's better to claim fewer jobs at a time since serverless functions are charged by the second and have a timeout. */ batchSize?: number; /** - * The maximum number of jobs to process in parallel per batch. - * - If not provided, all jobs in the batch are processed in parallel. + * The maximum number of jobs to process in parallel. + * - With `startInBackground`, this is the steady number of jobs kept in + * flight: each slot is refilled as soon as it frees, so a slow job never + * blocks the other slots. + * - With the one-shot `start`, this caps parallelism within the single batch. + * - If not provided, defaults to 3. * - Set to 1 to process jobs sequentially. * - Set to a lower value to avoid resource exhaustion. */ @@ -604,7 +610,8 @@ export interface ProcessorOptions { export interface Processor { /** * Start the job processor in the background. - * - This will run periodically (every pollInterval milliseconds or 5 seconds if not provided) and process jobs (as many as batchSize) as they become available. + * - Keeps up to `concurrency` jobs in flight, refilling each slot as soon as it frees instead of waiting for a whole batch to settle. + * - Polls every pollInterval milliseconds (5 seconds if not provided) for new work as it becomes available. * - **You have to call the stop method to stop the processor.** * - Handlers are provided per-processor when calling createProcessor. * - In serverless functions, it's recommended to call start instead and await it to finish.