Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
129 changes: 129 additions & 0 deletions packages/dataqueue/src/processor.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,25 @@ async function claimJob(p: Pool, jobId: number) {
);
}

/**
* Polls `predicate` until it is truthy or the timeout elapses.
* Throws if the condition is never met, so a hung pool fails fast instead of
* timing out the whole test.
*/
async function waitFor(
predicate: () => boolean | Promise<boolean>,
timeoutMs: number,
pollMs = 20,
): Promise<void> {
const deadline = Date.now() + timeoutMs;
while (!(await predicate())) {
if (Date.now() > deadline) {
throw new Error(`waitFor: condition not met within ${timeoutMs}ms`);
}
await new Promise((r) => setTimeout(r, pollMs));
}
}

// Integration tests for processor

describe('processor integration', () => {
Expand Down Expand Up @@ -494,6 +513,116 @@ describe('concurrency option', () => {
'Processor option "groupConcurrency" must be a positive integer when provided.',
);
});

it('should refill freed group slots while a slow job runs, honoring groupConcurrency (continuous pool)', async () => {
// Regression guard for the batch-barrier stall on grouped pipelines.
// groupConcurrency caps each claim below the number of ready jobs, so the
// rest land in later claims. With the old loop those later claims never
// happened until the whole in-flight batch drained, so one slow job stalled
// the entire group. The continuous pool must keep refilling the freed group
// slot — without ever exceeding groupConcurrency — so every fast job
// completes while the slow job is still in flight.
const GROUP = 'pipeline-1';
const FAST_JOBS = 5;

let releaseSlow!: () => void;
const slowReleased = new Promise<void>((resolve) => {
releaseSlow = resolve;
});
let markSlowStarted!: () => void;
const slowStarted = new Promise<void>((resolve) => {
markSlowStarted = resolve;
});
let fastCompleted = 0;
let inHandler = 0;
let maxInHandler = 0;

const handler = async (payload: { slow?: boolean }) => {
inHandler++;
maxInHandler = Math.max(maxInHandler, inHandler);
try {
if (payload.slow) {
markSlowStarted();
await slowReleased;

return;
}
fastCompleted++;
} finally {
inHandler--;
}
};
const handlers = { test: handler };

// High priority so the slow job is always claimed into the first group slot.
const slowId = await queue.addJob<{ test: { slow?: boolean } }, 'test'>(
pool,
{
jobType: 'test',
payload: { slow: true },
priority: 10,
group: { id: GROUP },
},
);
for (let i = 0; i < FAST_JOBS; i++) {
await queue.addJob<{ test: { slow?: boolean } }, 'test'>(pool, {
jobType: 'test',
payload: {},
group: { id: GROUP },
});
}

const processor = createProcessor(backend, handlers, {
batchSize: 10,
concurrency: 2,
groupConcurrency: 2,
pollInterval: 50,
});
processor.startInBackground();
try {
// Wait until the slow job actually occupies a group slot.
await slowStarted;

// Every fast job must finish while the slow job is still in flight. Under
// the old barrier this never happens (the loop waits on the slow job
// before claiming the rest of the group), so waitFor throws instead of
// the whole test hanging.
await waitFor(() => fastCompleted === FAST_JOBS, 5000);

expect(fastCompleted).toBe(FAST_JOBS);
// groupConcurrency was honored throughout (slow + at most one fast).
expect(maxInHandler).toBeLessThanOrEqual(2);

// The slow job never blocked the group — it is still processing.
const slowJob = await queue.getJob<{ test: { slow?: boolean } }, 'test'>(
pool,
slowId,
);

expect(slowJob?.status).toBe('processing');

// Release the slow job and let everything drain cleanly.
releaseSlow();
await waitFor(async () => {
const job = await queue.getJob<{ test: { slow?: boolean } }, 'test'>(
pool,
slowId,
);

return job?.status === 'completed';
}, 5000);
} finally {
releaseSlow();
await processor.stopAndDrain(5000);
}

const completed = await queue.getJobsByStatus<
{ test: { slow?: boolean } },
'test'
>(pool, 'completed');

expect(completed.length).toBe(FAST_JOBS + 1);
});
});

describe('per-job timeout', () => {
Expand Down
135 changes: 107 additions & 28 deletions packages/dataqueue/src/processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -858,8 +858,16 @@ export const createProcessor = <PayloadMap = any>(
}

let running = false;
let intervalId: NodeJS.Timeout | null = null;
let currentBatchPromise: Promise<number> | null = null;
// Periodic timer that drives cron enqueueing and idle polling.
let pollTimer: NodeJS.Timeout | null = null;
// True while a getNextBatch claim is in progress, so claims stay serialized.
let claimInProgress = false;
// Set when a refill was requested while a claim was already running.
let pumpRequested = false;
// Number of jobs this processor currently has in flight.
let inFlight = 0;
// Promises for the in-flight jobs, awaited during graceful drain.
const inFlightJobs = new Set<Promise<void>>();

setLogContext(options.verbose ?? false);

Expand Down Expand Up @@ -909,7 +917,10 @@ export const createProcessor = <PayloadMap = any>(
return {
/**
* Start the job processor in the background.
* - This will run periodically (every pollInterval milliseconds or 5 seconds if not provided) and process jobs as they become available.
* - Keeps up to `concurrency` jobs in flight, refilling each slot as soon as
* it frees instead of waiting for a whole batch to settle.
* - Polls every `pollInterval` milliseconds (5 seconds if not provided) for
* new work and to enqueue due cron jobs.
* - You have to call the stop method to stop the processor.
*/
startInBackground: () => {
Expand All @@ -918,28 +929,97 @@ export const createProcessor = <PayloadMap = any>(
log(`Starting job processor with workerId: ${workerId}`);
running = true;

// Single serialized loop: process a batch, then either immediately
// continue (if full batch was returned) or wait pollInterval.
const scheduleNext = (immediate: boolean) => {
// Continuous worker pool: keep up to `concurrency` jobs in flight and
// refill each slot the instant it frees, rather than waiting for a whole
// batch to settle. This removes the head-of-line blocking where one slow
// job stalls the other slots (and, with groupConcurrency, the whole
// group) until it finishes.
const pump = async (): Promise<void> => {
if (!running) return;
if (immediate) {
intervalId = setTimeout(loop, 0);
} else {
intervalId = setTimeout(loop, pollInterval);
// Serialize claims; remember the request so we refill afterwards.
if (claimInProgress) {
pumpRequested = true;
return;
}
// Pool is full; a finishing job will call pump() again.
if (inFlight >= concurrency) return;

claimInProgress = true;
pumpRequested = false;
let claimLimit = 0;
let claimed = 0;
try {
claimLimit = Math.min(concurrency - inFlight, batchSize);
const jobs = await backend.getNextBatch<
PayloadMap,
JobType<PayloadMap>
>(workerId, claimLimit, jobType, groupConcurrency);
claimed = jobs.length;

for (const job of jobs) {
emit?.('job:processing', { jobId: job.id, jobType: job.jobType });
inFlight++;
const jobPromise: Promise<void> = processJobWithHandlers(
backend,
job,
handlers,
emit,
)
.catch((err) => {
onError(err instanceof Error ? err : new Error(String(err)));
})
.finally(() => {
inFlight--;
inFlightJobs.delete(jobPromise);
// A slot just freed — try to claim more work immediately.
void pump();
});
inFlightJobs.add(jobPromise);
}
} catch (error) {
const err = error instanceof Error ? error : new Error(String(error));
onError(err);
emit?.('error', err);
} finally {
claimInProgress = false;
}

// Claim again right away when the queue likely still has ready work
// (we filled the whole request) or when a slot freed mid-claim.
const moreLikely = claimed === claimLimit || pumpRequested;
if (running && moreLikely && inFlight < concurrency) {
void pump();
}
};

const loop = async () => {
// Periodic tick: enqueue due cron jobs (onBeforeBatch) on a steady cadence
// regardless of pool saturation, then top up the pool. Also the idle poll
// so newly added jobs are picked up within pollInterval.
const tick = async (): Promise<void> => {
if (!running) return;
currentBatchPromise = processJobs();
const processed = await currentBatchPromise;
currentBatchPromise = null;
// If we got a full batch, there may be more work — process immediately
scheduleNext(processed === batchSize);
if (onBeforeBatch) {
try {
await onBeforeBatch();
} catch (hookError) {
log(`onBeforeBatch hook error: ${hookError}`);
const err =
hookError instanceof Error
? hookError
: new Error(String(hookError));
onError(err);
emit?.('error', err);
}
}
await pump();
if (running) {
pollTimer = setTimeout(() => {
void tick();
}, pollInterval);
}
};

// Start the first iteration immediately
loop();
// Start the first iteration immediately.
void tick();
},
/**
* Stop the job processor that runs in the background.
Expand All @@ -948,9 +1028,9 @@ export const createProcessor = <PayloadMap = any>(
stop: () => {
log(`Stopping job processor with workerId: ${workerId}`);
running = false;
if (intervalId) {
clearTimeout(intervalId);
intervalId = null;
if (pollTimer) {
clearTimeout(pollTimer);
pollTimer = null;
}
},
/**
Expand All @@ -960,17 +1040,16 @@ export const createProcessor = <PayloadMap = any>(
stopAndDrain: async (drainTimeoutMs = 30000) => {
log(`Stopping and draining job processor with workerId: ${workerId}`);
running = false;
if (intervalId) {
clearTimeout(intervalId);
intervalId = null;
if (pollTimer) {
clearTimeout(pollTimer);
pollTimer = null;
}
// Wait for current batch to finish, with a timeout
if (currentBatchPromise) {
// Wait for all in-flight jobs to finish, with a timeout.
if (inFlightJobs.size > 0) {
await Promise.race([
currentBatchPromise.catch(() => {}),
Promise.allSettled(Array.from(inFlightJobs)),
new Promise<void>((resolve) => setTimeout(resolve, drainTimeoutMs)),
]);
currentBatchPromise = null;
}
log(`Job processor ${workerId} drained`);
},
Expand Down
19 changes: 13 additions & 6 deletions packages/dataqueue/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -567,14 +567,20 @@ export type JobHandlers<PayloadMap> = {
export interface ProcessorOptions {
workerId?: string;
/**
* The number of jobs to process at a time.
* - If not provided, the processor will process 10 jobs at a time.
* - In serverless functions, it's better to process less jobs at a time since serverless functions are charged by the second and have a timeout.
* The maximum number of jobs to claim from the queue per poll.
* - If not provided, up to 10 jobs are claimed per poll.
* - With `startInBackground`, claims are capped to the number of free
* concurrency slots, so this only matters when it is below `concurrency`.
* - In serverless functions, it's better to claim fewer jobs at a time since serverless functions are charged by the second and have a timeout.
*/
batchSize?: number;
/**
* The maximum number of jobs to process in parallel per batch.
* - If not provided, all jobs in the batch are processed in parallel.
* The maximum number of jobs to process in parallel.
* - With `startInBackground`, this is the steady number of jobs kept in
* flight: each slot is refilled as soon as it frees, so a slow job never
* blocks the other slots.
* - With the one-shot `start`, this caps parallelism within the single batch.
* - If not provided, defaults to 3.
* - Set to 1 to process jobs sequentially.
* - Set to a lower value to avoid resource exhaustion.
*/
Expand Down Expand Up @@ -604,7 +610,8 @@ export interface ProcessorOptions {
export interface Processor {
/**
* Start the job processor in the background.
* - This will run periodically (every pollInterval milliseconds or 5 seconds if not provided) and process jobs (as many as batchSize) as they become available.
* - Keeps up to `concurrency` jobs in flight, refilling each slot as soon as it frees instead of waiting for a whole batch to settle.
* - Polls every pollInterval milliseconds (5 seconds if not provided) for new work as it becomes available.
* - **You have to call the stop method to stop the processor.**
* - Handlers are provided per-processor when calling createProcessor.
* - In serverless functions, it's recommended to call start instead and await it to finish.
Expand Down
Loading