From 16d819634374b26bdfcc6cb54de72d1eac1d4dff Mon Sep 17 00:00:00 2001 From: Taras Mankovski Date: Tue, 31 Mar 2026 07:06:07 -0400 Subject: [PATCH 1/6] =?UTF-8?q?=E2=9C=A8=20Add=20throttle=20stream=20opera?= =?UTF-8?q?tor=20with=20trailing-edge=20semantics?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add a throttle helper to @effectionx/stream-helpers that emits at most one value per delay window while guaranteeing the final value is never dropped. Uses leading+trailing semantics following the pattern from batch.ts with spawn + timebox for time-windowed upstream consumption. --- stream-helpers/mod.ts | 1 + stream-helpers/throttle.test.ts | 124 ++++++++++++++++++++++++++++++++ stream-helpers/throttle.ts | 86 ++++++++++++++++++++++ 3 files changed, 211 insertions(+) create mode 100644 stream-helpers/throttle.test.ts create mode 100644 stream-helpers/throttle.ts diff --git a/stream-helpers/mod.ts b/stream-helpers/mod.ts index d6a832d9..ee010da4 100644 --- a/stream-helpers/mod.ts +++ b/stream-helpers/mod.ts @@ -14,3 +14,4 @@ export * from "./last.ts"; export * from "./take.ts"; export * from "./take-while.ts"; export * from "./take-until.ts"; +export * from "./throttle.ts"; diff --git a/stream-helpers/throttle.test.ts b/stream-helpers/throttle.test.ts new file mode 100644 index 00000000..4770f98e --- /dev/null +++ b/stream-helpers/throttle.test.ts @@ -0,0 +1,124 @@ +import { describe, it } from "@effectionx/bdd"; +import { createArraySignal, is } from "@effectionx/signals"; +import { expect } from "expect"; +import { createChannel, sleep, spawn } from "effection"; + +import { throttle } from "./throttle.ts"; +import { forEach } from "./for-each.ts"; +import { useFaucet } from "./test-helpers/faucet.ts"; + +describe("throttle", () => { + it("emits the first value immediately", function* () { + const source = createChannel(); + const stream = throttle(100)(source); + const subscription = yield* stream; + + const next = yield* spawn(() => subscription.next()); + yield* source.send(1); + + expect((yield* next).value).toBe(1); + }); + + it("drops intermediate values and emits trailing", function* () { + const faucet = yield* useFaucet({ open: true }); + const stream = throttle(50)(faucet); + const results = yield* createArraySignal([]); + + yield* spawn(() => + forEach(function* (value) { + results.push(value); + }, stream), + ); + + yield* sleep(0); + + yield* faucet.pour([1, 2, 3, 4, 5]); + + yield* is(results, (r) => r.length >= 2); + + const values = results.valueOf(); + expect(values[0]).toBe(1); + expect(values[values.length - 1]).toBe(5); + }); + + it("emits the final value before stream closes", function* () { + const source = createChannel(); + const stream = throttle(200)(source); + const results = yield* createArraySignal([]); + + yield* spawn(() => + forEach(function* (value) { + results.push(value); + }, stream), + ); + + yield* sleep(0); + + yield* source.send(1); + yield* source.send(2); + yield* source.send(3); + yield* source.close(); + + yield* is(results, (r) => r.includes(3)); + + const values = results.valueOf(); + expect(values).toContain(1); + expect(values).toContain(3); + }); + + it("passes through values spaced beyond the delay", function* () { + const faucet = yield* useFaucet({ open: true }); + const stream = throttle(20)(faucet); + const results = yield* createArraySignal([]); + + yield* spawn(() => + forEach(function* (value) { + results.push(value); + }, stream), + ); + + yield* sleep(0); + + yield* faucet.pour(function* (send) { + yield* send(1); + yield* sleep(50); + yield* send(2); + yield* sleep(50); + yield* send(3); + }); + + yield* is(results, (r) => r.length >= 3); + + expect(results.valueOf()).toEqual([1, 2, 3]); + }); + + it("handles multiple throttle windows", function* () { + const faucet = yield* useFaucet({ open: true }); + const stream = throttle(30)(faucet); + const results = yield* createArraySignal([]); + + yield* spawn(() => + forEach(function* (value) { + results.push(value); + }, stream), + ); + + yield* sleep(0); + + yield* faucet.pour([1, 2, 3]); + + yield* is(results, (r) => r.length >= 2); + + yield* sleep(60); + + yield* faucet.pour([10, 20, 30]); + + yield* is(results, (r) => r.includes(30)); + + const values = results.valueOf(); + expect(values).toContain(1); + expect(values).toContain(3); + expect(values).toContain(10); + expect(values).toContain(30); + }); +}); diff --git a/stream-helpers/throttle.ts b/stream-helpers/throttle.ts new file mode 100644 index 00000000..bd8a2a2e --- /dev/null +++ b/stream-helpers/throttle.ts @@ -0,0 +1,86 @@ +import { timebox } from "@effectionx/timebox"; +import { type Stream, type Task, spawn } from "effection"; + +/** + * Throttles a stream to emit at most one value per `delayMS` milliseconds. + * + * Uses leading+trailing semantics: the first value is emitted immediately, + * intermediate values during the throttle window are dropped, and the most + * recent value is always emitted after the window expires. This ensures the + * final state is never lost when a burst of events ends mid-window. + * + * @param delayMS - The minimum time between emissions in milliseconds + */ +export function throttle( + delayMS: number, +): (stream: Stream) => Stream { + return (stream: Stream): Stream => ({ + *[Symbol.iterator]() { + const subscription = yield* stream; + let lastPull: Task> | undefined; + let pendingTrailing: A | undefined; + let hasTrailing = false; + let doneResult: IteratorResult | undefined; + + return { + *next() { + // Emit stashed trailing value from previous window + if (hasTrailing) { + const value = pendingTrailing as A; + hasTrailing = false; + pendingTrailing = undefined; + return { done: false as const, value }; + } + + // Stream already ended + if (doneResult) { + return doneResult; + } + + // Pull the next upstream value + const result = lastPull + ? yield* lastPull + : yield* subscription.next(); + lastPull = undefined; + + if (result.done) { + return result; + } + + const valueToEmit = result.value; + + // Throttle window: absorb upstream values for delayMS + const windowStart = performance.now(); + + while (true) { + const remaining = delayMS - (performance.now() - windowStart); + if (remaining <= 0) break; + + lastPull = yield* spawn(() => subscription.next()); + const tb = yield* timebox(remaining, () => lastPull!); + + if (tb.timeout) { + // Timer expired, lastPull survives for next call + break; + } + + const upstream = tb.value; + lastPull = undefined; + + if (upstream.done) { + doneResult = upstream; + break; + } + + // Overwrite trailing — only the latest matters + pendingTrailing = upstream.value; + hasTrailing = true; + } + + // Emit the leading-edge value + return { done: false as const, value: valueToEmit }; + }, + }; + }, + }); +} From 6315930a2006c2f71d5ba98033ca5bb0bdee4813 Mon Sep 17 00:00:00 2001 From: Taras Mankovski Date: Tue, 31 Mar 2026 07:09:54 -0400 Subject: [PATCH 2/6] =?UTF-8?q?=F0=9F=90=9B=20Fix=20channel=20close=20type?= =?UTF-8?q?=20in=20throttle=20test?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Use void instead of never for TClose so source.close() is callable. --- stream-helpers/throttle.test.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/stream-helpers/throttle.test.ts b/stream-helpers/throttle.test.ts index 4770f98e..196d6fdd 100644 --- a/stream-helpers/throttle.test.ts +++ b/stream-helpers/throttle.test.ts @@ -42,7 +42,7 @@ describe("throttle", () => { }); it("emits the final value before stream closes", function* () { - const source = createChannel(); + const source = createChannel(); const stream = throttle(200)(source); const results = yield* createArraySignal([]); From 6671b0e0bbf073a3b05923dd4de9dbff62a471e2 Mon Sep 17 00:00:00 2001 From: Taras Mankovski Date: Tue, 31 Mar 2026 07:31:06 -0400 Subject: [PATCH 3/6] =?UTF-8?q?=E2=99=BB=EF=B8=8F=20Rewrite=20throttle=20w?= =?UTF-8?q?ith=20correct=20leading+trailing=20timing?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Fix two bugs in the original throttle operator: 1. Leading value was delayed: the absorption loop ran synchronously before returning, deferring the first emission by delayMS. 2. Trailing value emitted too early: it was returned on the immediate next pull with no delay enforcement, allowing two emissions 0ms apart. The rewrite uses inline absorption deferred to the next next() call: - Leading: returns immediately, records a windowDeadline - Next call: absorbs upstream values until deadline, then emits trailing - Tasks spawned via spawn() live in the subscription scope and survive across next() calls (matching the batch.ts pattern) Also: - Add 8 timing-aware tests covering leading/trailing/window/close - Bump stream-helpers to 0.9.0 (new exported feature) --- stream-helpers/package.json | 2 +- stream-helpers/throttle.test.ts | 175 ++++++++++++++++++++++++-------- stream-helpers/throttle.ts | 109 ++++++++++++-------- 3 files changed, 199 insertions(+), 87 deletions(-) diff --git a/stream-helpers/package.json b/stream-helpers/package.json index 54bbf357..1dd92abb 100644 --- a/stream-helpers/package.json +++ b/stream-helpers/package.json @@ -1,7 +1,7 @@ { "name": "@effectionx/stream-helpers", "description": "Type-safe stream operators like filter, map, reduce, and forEach", - "version": "0.8.2", + "version": "0.9.0", "keywords": ["effection", "effectionx", "streams", "map", "filter", "reduce"], "type": "module", "main": "./dist/mod.js", diff --git a/stream-helpers/throttle.test.ts b/stream-helpers/throttle.test.ts index 196d6fdd..dbf63109 100644 --- a/stream-helpers/throttle.test.ts +++ b/stream-helpers/throttle.test.ts @@ -7,26 +7,36 @@ import { throttle } from "./throttle.ts"; import { forEach } from "./for-each.ts"; import { useFaucet } from "./test-helpers/faucet.ts"; +interface Emission { + value: T; + time: number; +} + describe("throttle", () => { it("emits the first value immediately", function* () { const source = createChannel(); - const stream = throttle(100)(source); + const stream = throttle(200)(source); const subscription = yield* stream; + const start = performance.now(); const next = yield* spawn(() => subscription.next()); yield* source.send(1); - expect((yield* next).value).toBe(1); + const result = yield* next; + const elapsed = performance.now() - start; + + expect(result.value).toBe(1); + expect(elapsed).toBeLessThan(50); }); - it("drops intermediate values and emits trailing", function* () { + it("suppresses intermediate values and emits trailing", function* () { const faucet = yield* useFaucet({ open: true }); - const stream = throttle(50)(faucet); - const results = yield* createArraySignal([]); + const stream = throttle(100)(faucet); + const emissions = yield* createArraySignal>([]); yield* spawn(() => forEach(function* (value) { - results.push(value); + emissions.push({ value, time: performance.now() }); }, stream), ); @@ -34,21 +44,103 @@ describe("throttle", () => { yield* faucet.pour([1, 2, 3, 4, 5]); - yield* is(results, (r) => r.length >= 2); + yield* is(emissions, (e) => e.length >= 2); - const values = results.valueOf(); + const values = emissions.valueOf().map((e) => e.value); expect(values[0]).toBe(1); - expect(values[values.length - 1]).toBe(5); + expect(values[1]).toBe(5); + expect(values).toHaveLength(2); + }); + + it("emits trailing value after the window expires", function* () { + const delay = 80; + const faucet = yield* useFaucet({ open: true }); + const stream = throttle(delay)(faucet); + const emissions = yield* createArraySignal>([]); + + yield* spawn(() => + forEach(function* (value) { + emissions.push({ value, time: performance.now() }); + }, stream), + ); + + yield* sleep(0); + + yield* faucet.pour([1, 2, 3]); + + yield* is(emissions, (e) => e.length >= 2); + + const [leading, trailing] = emissions.valueOf(); + const gap = trailing.time - leading.time; + + expect(leading.value).toBe(1); + expect(trailing.value).toBe(3); + expect(gap).toBeGreaterThanOrEqual(delay * 0.8); }); - it("emits the final value before stream closes", function* () { + it("does not emit trailing before the remaining delay elapses", function* () { + const delay = 100; + const faucet = yield* useFaucet({ open: true }); + const stream = throttle(delay)(faucet); + const emissions = yield* createArraySignal>([]); + + yield* spawn(() => + forEach(function* (value) { + emissions.push({ value, time: performance.now() }); + }, stream), + ); + + yield* sleep(0); + + yield* faucet.pour([1, 2]); + + // Checkpoint inside the window: only the leading value should exist. + // sleep() here creates a timing scenario, not waiting for a result. + yield* sleep(delay * 0.4); + expect(emissions.valueOf()).toHaveLength(1); + expect(emissions.valueOf()[0].value).toBe(1); + + // Now wait for trailing to actually arrive + yield* is(emissions, (e) => e.length >= 2); + expect(emissions.valueOf()[1].value).toBe(2); + }); + + it("emits at most once per delay window", function* () { + const delay = 60; + const faucet = yield* useFaucet({ open: true }); + const stream = throttle(delay)(faucet); + const emissions = yield* createArraySignal>([]); + + yield* spawn(() => + forEach(function* (value) { + emissions.push({ value, time: performance.now() }); + }, stream), + ); + + yield* sleep(0); + + // Two rapid bursts separated by a gap longer than the delay + yield* faucet.pour([1, 2, 3]); + yield* is(emissions, (e) => e.length >= 2); + yield* sleep(delay + 20); + yield* faucet.pour([10, 20, 30]); + yield* is(emissions, (e) => e.some((v) => v.value === 30)); + + const times = emissions.valueOf().map((e) => e.time); + for (let i = 1; i < times.length; i++) { + const gap = times[i] - times[i - 1]; + expect(gap).toBeGreaterThanOrEqual(delay * 0.8); + } + }); + + it("handles upstream completion during the window", function* () { const source = createChannel(); const stream = throttle(200)(source); - const results = yield* createArraySignal([]); + const emissions = yield* createArraySignal>([]); yield* spawn(() => forEach(function* (value) { - results.push(value); + emissions.push({ value, time: performance.now() }); }, stream), ); @@ -59,16 +151,39 @@ describe("throttle", () => { yield* source.send(3); yield* source.close(); - yield* is(results, (r) => r.includes(3)); + yield* is(emissions, (e) => e.some((v) => v.value === 3)); - const values = results.valueOf(); + const values = emissions.valueOf().map((e) => e.value); expect(values).toContain(1); expect(values).toContain(3); }); + it("closes only after trailing emission is handled", function* () { + const source = createChannel(); + const stream = throttle(200)(source); + const subscription = yield* stream; + + yield* spawn(function* () { + yield* sleep(0); + yield* source.send("a"); + yield* source.send("b"); + yield* source.close(42); + }); + + const first = yield* subscription.next(); + expect(first).toEqual({ done: false, value: "a" }); + + const second = yield* subscription.next(); + expect(second).toEqual({ done: false, value: "b" }); + + const third = yield* subscription.next(); + expect(third).toEqual({ done: true, value: 42 }); + }); + it("passes through values spaced beyond the delay", function* () { + const delay = 20; const faucet = yield* useFaucet({ open: true }); - const stream = throttle(20)(faucet); + const stream = throttle(delay)(faucet); const results = yield* createArraySignal([]); yield* spawn(() => @@ -91,34 +206,4 @@ describe("throttle", () => { expect(results.valueOf()).toEqual([1, 2, 3]); }); - - it("handles multiple throttle windows", function* () { - const faucet = yield* useFaucet({ open: true }); - const stream = throttle(30)(faucet); - const results = yield* createArraySignal([]); - - yield* spawn(() => - forEach(function* (value) { - results.push(value); - }, stream), - ); - - yield* sleep(0); - - yield* faucet.pour([1, 2, 3]); - - yield* is(results, (r) => r.length >= 2); - - yield* sleep(60); - - yield* faucet.pour([10, 20, 30]); - - yield* is(results, (r) => r.includes(30)); - - const values = results.valueOf(); - expect(values).toContain(1); - expect(values).toContain(3); - expect(values).toContain(10); - expect(values).toContain(30); - }); }); diff --git a/stream-helpers/throttle.ts b/stream-helpers/throttle.ts index bd8a2a2e..6cbbbcc3 100644 --- a/stream-helpers/throttle.ts +++ b/stream-helpers/throttle.ts @@ -1,15 +1,23 @@ import { timebox } from "@effectionx/timebox"; -import { type Stream, type Task, spawn } from "effection"; +import { type Operation, type Stream, type Task, spawn } from "effection"; /** * Throttles a stream to emit at most one value per `delayMS` milliseconds. * - * Uses leading+trailing semantics: the first value is emitted immediately, - * intermediate values during the throttle window are dropped, and the most - * recent value is always emitted after the window expires. This ensures the - * final state is never lost when a burst of events ends mid-window. + * Uses leading+trailing semantics: + * - The first upstream value is emitted immediately (leading edge). + * - While the throttle window is open, upstream values are consumed and only + * the latest is buffered. + * - After the window expires, the buffered value is emitted (trailing edge), + * which opens a new window. + * - Two emissions are never closer together than `delayMS`. * - * @param delayMS - The minimum time between emissions in milliseconds + * Stream-completion exception: if the upstream closes during an open window, + * the trailing value (if any) is emitted promptly without waiting for the + * remaining delay, and `done` follows on the next pull. This avoids adding + * artificial latency before propagating the close signal. + * + * @param delayMS - minimum milliseconds between emissions */ export function throttle( delayMS: number, @@ -17,27 +25,73 @@ export function throttle( return (stream: Stream): Stream => ({ *[Symbol.iterator]() { const subscription = yield* stream; + + // ── shared state ────────────────────────────────────────────── let lastPull: Task> | undefined; + let windowDeadline: number | undefined; let pendingTrailing: A | undefined; let hasTrailing = false; let doneResult: IteratorResult | undefined; + // ── helpers ─────────────────────────────────────────────────── + + /** Consume upstream values until the window deadline expires. */ + function* absorbUntilDeadline(): Operation { + while (windowDeadline !== undefined) { + const remaining = windowDeadline - performance.now(); + if (remaining <= 0) break; + + if (!lastPull) { + lastPull = yield* spawn(() => subscription.next()); + } + const tb = yield* timebox(remaining, () => lastPull!); + + if (tb.timeout) { + // lastPull survives for the next pull + break; + } + + const upstream = tb.value; + lastPull = undefined; + + if (upstream.done) { + doneResult = upstream; + break; + } + + pendingTrailing = upstream.value; + hasTrailing = true; + } + windowDeadline = undefined; + } + + // ── subscription ───────────────────────────────────────────── return { - *next() { - // Emit stashed trailing value from previous window + *next(): Operation> { + // ── drain active window ─────────────────────────────────── + if (windowDeadline !== undefined) { + yield* absorbUntilDeadline(); + } + + // ── emit buffered trailing value ────────────────────────── if (hasTrailing) { const value = pendingTrailing as A; hasTrailing = false; pendingTrailing = undefined; + + if (!doneResult) { + windowDeadline = performance.now() + delayMS; + } + return { done: false as const, value }; } - // Stream already ended + // ── propagate stream close ──────────────────────────────── if (doneResult) { return doneResult; } - // Pull the next upstream value + // ── pull next upstream value (leading edge) ─────────────── const result = lastPull ? yield* lastPull : yield* subscription.next(); @@ -47,38 +101,11 @@ export function throttle( return result; } - const valueToEmit = result.value; - - // Throttle window: absorb upstream values for delayMS - const windowStart = performance.now(); - - while (true) { - const remaining = delayMS - (performance.now() - windowStart); - if (remaining <= 0) break; - - lastPull = yield* spawn(() => subscription.next()); - const tb = yield* timebox(remaining, () => lastPull!); - - if (tb.timeout) { - // Timer expired, lastPull survives for next call - break; - } - - const upstream = tb.value; - lastPull = undefined; - - if (upstream.done) { - doneResult = upstream; - break; - } - - // Overwrite trailing — only the latest matters - pendingTrailing = upstream.value; - hasTrailing = true; - } + // Record the window deadline. Absorption is deferred to the + // next next() call so this value returns immediately. + windowDeadline = performance.now() + delayMS; - // Emit the leading-edge value - return { done: false as const, value: valueToEmit }; + return { done: false as const, value: result.value }; }, }; }, From ba8f34c1729383400e53087492c22d1076787938 Mon Sep 17 00:00:00 2001 From: Taras Mankovski Date: Tue, 31 Mar 2026 07:48:17 -0400 Subject: [PATCH 4/6] =?UTF-8?q?=E2=99=BB=EF=B8=8F=20Rewrite=20throttle=20w?= =?UTF-8?q?ith=20persistent=20pump=20and=20slow-consumer=20test?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace the deferred-absorption design with a persistent pump task (matching the valve.ts pattern). The pump owns all upstream reads and writes ready-to-emit IteratorResults to a shared ArraySignal buffer. next() simply shifts from the buffer. This fixes two issues: 1. Stale intermediate values when the consumer is slower than the window — the pump absorbs eagerly regardless of pull timing, so the trailing value is always the latest seen during the window. 2. Subscription deadlocks from spawning/halting pulls in child scopes then re-calling subscription.next() from the parent scope. Added test: "yields the latest window value when consumer is slower than the window" — sends [1,2,3] within a 100ms window, waits 500ms before the second next(), and asserts 3 (not 2) is returned. --- stream-helpers/throttle.test.ts | 27 +++++++ stream-helpers/throttle.ts | 122 +++++++++++++------------------- 2 files changed, 77 insertions(+), 72 deletions(-) diff --git a/stream-helpers/throttle.test.ts b/stream-helpers/throttle.test.ts index dbf63109..f9ad0f95 100644 --- a/stream-helpers/throttle.test.ts +++ b/stream-helpers/throttle.test.ts @@ -180,6 +180,33 @@ describe("throttle", () => { expect(third).toEqual({ done: true, value: 42 }); }); + it("yields the latest window value when consumer is slower than the window", function* () { + const source = createChannel(); + const stream = throttle(100)(source); + const subscription = yield* stream; + + // Pump three values in a spawned task so they queue up while the + // consumer is idle. + yield* spawn(function* () { + yield* sleep(0); + yield* source.send(1); + yield* source.send(2); + yield* source.send(3); + }); + + // Leading value — returned immediately. + const first = yield* subscription.next(); + expect(first).toEqual({ done: false, value: 1 }); + + // Wait well beyond delayMS so the window has long expired. + yield* sleep(500); + + // Must be the latest value the absorber saw during the window, not + // the oldest queued one. + const second = yield* subscription.next(); + expect(second).toEqual({ done: false, value: 3 }); + }); + it("passes through values spaced beyond the delay", function* () { const delay = 20; const faucet = yield* useFaucet({ open: true }); diff --git a/stream-helpers/throttle.ts b/stream-helpers/throttle.ts index 6cbbbcc3..7c69f1eb 100644 --- a/stream-helpers/throttle.ts +++ b/stream-helpers/throttle.ts @@ -1,13 +1,14 @@ import { timebox } from "@effectionx/timebox"; -import { type Operation, type Stream, type Task, spawn } from "effection"; +import { createArraySignal } from "@effectionx/signals"; +import { type Stream, spawn } from "effection"; /** * Throttles a stream to emit at most one value per `delayMS` milliseconds. * * Uses leading+trailing semantics: * - The first upstream value is emitted immediately (leading edge). - * - While the throttle window is open, upstream values are consumed and only - * the latest is buffered. + * - While the throttle window is open, upstream values are consumed eagerly + * and only the latest is buffered. * - After the window expires, the buffered value is emitted (trailing edge), * which opens a new window. * - Two emissions are never closer together than `delayMS`. @@ -25,87 +26,64 @@ export function throttle( return (stream: Stream): Stream => ({ *[Symbol.iterator]() { const subscription = yield* stream; - - // ── shared state ────────────────────────────────────────────── - let lastPull: Task> | undefined; - let windowDeadline: number | undefined; - let pendingTrailing: A | undefined; - let hasTrailing = false; - let doneResult: IteratorResult | undefined; - - // ── helpers ─────────────────────────────────────────────────── - - /** Consume upstream values until the window deadline expires. */ - function* absorbUntilDeadline(): Operation { - while (windowDeadline !== undefined) { - const remaining = windowDeadline - performance.now(); - if (remaining <= 0) break; - - if (!lastPull) { - lastPull = yield* spawn(() => subscription.next()); - } - const tb = yield* timebox(remaining, () => lastPull!); - - if (tb.timeout) { - // lastPull survives for the next pull - break; + const output = yield* createArraySignal>([]); + + // ── pump ────────────────────────────────────────────────────── + // A persistent background task that owns all upstream reads. + // It alternates between two phases: + // 1. Pull the next upstream value and push it (leading edge). + // 2. Open a window for delayMS: consume upstream, keep only + // the latest, then push it when the window expires + // (trailing edge). + yield* spawn(function* () { + while (true) { + // ── leading edge ──────────────────────────────────────── + const first = yield* subscription.next(); + if (first.done) { + output.push(first); + return; } + output.push({ done: false as const, value: first.value }); - const upstream = tb.value; - lastPull = undefined; + // ── absorption window ─────────────────────────────────── + let trailing: A | undefined; + let hasTrailing = false; + const windowStart = performance.now(); - if (upstream.done) { - doneResult = upstream; - break; - } - - pendingTrailing = upstream.value; - hasTrailing = true; - } - windowDeadline = undefined; - } - - // ── subscription ───────────────────────────────────────────── - return { - *next(): Operation> { - // ── drain active window ─────────────────────────────────── - if (windowDeadline !== undefined) { - yield* absorbUntilDeadline(); - } + while (true) { + const remaining = delayMS - (performance.now() - windowStart); + if (remaining <= 0) break; - // ── emit buffered trailing value ────────────────────────── - if (hasTrailing) { - const value = pendingTrailing as A; - hasTrailing = false; - pendingTrailing = undefined; + const tb = yield* timebox(remaining, () => subscription.next()); - if (!doneResult) { - windowDeadline = performance.now() + delayMS; + if (tb.timeout) { + break; } - return { done: false as const, value }; - } + if (tb.value.done) { + // Stream closed during window — flush trailing, then done + if (hasTrailing) { + output.push({ done: false as const, value: trailing as A }); + } + output.push(tb.value); + return; + } - // ── propagate stream close ──────────────────────────────── - if (doneResult) { - return doneResult; + trailing = tb.value.value; + hasTrailing = true; } - // ── pull next upstream value (leading edge) ─────────────── - const result = lastPull - ? yield* lastPull - : yield* subscription.next(); - lastPull = undefined; - - if (result.done) { - return result; + // ── trailing edge ─────────────────────────────────────── + if (hasTrailing) { + output.push({ done: false as const, value: trailing as A }); } + } + }); - // Record the window deadline. Absorption is deferred to the - // next next() call so this value returns immediately. - windowDeadline = performance.now() + delayMS; - - return { done: false as const, value: result.value }; + // ── subscription ───────────────────────────────────────────── + return { + *next() { + return yield* output.shift(); }, }; }, From 2e057933d85be5641a26105b19076a378687bf69 Mon Sep 17 00:00:00 2001 From: Taras Mankovski Date: Tue, 31 Mar 2026 08:01:42 -0400 Subject: [PATCH 5/6] =?UTF-8?q?=F0=9F=90=9B=20Enforce=20consumer-side=20sp?= =?UTF-8?q?acing=20so=20buffered=20backlog=20is=20not=20drained=20instantl?= =?UTF-8?q?y?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The pump eagerly fills an output buffer, so a slow consumer could drain multiple pre-buffered values back-to-back, violating the documented delayMS spacing guarantee. Fix: add a delay gate in next() that sleeps for the remaining interval when the previous emission was less than delayMS ago. Output items are tagged with a `flush` flag so the stream-completion exception (trailing-before-close) still bypasses the gate. Added test: "enforces spacing when consumer drains a backlog" — pumps two windows while the consumer is idle, then drains and asserts the gap between consecutive emissions respects delayMS. --- stream-helpers/throttle.test.ts | 37 ++++++++++++++++++++ stream-helpers/throttle.ts | 62 +++++++++++++++++++++++++++------ 2 files changed, 88 insertions(+), 11 deletions(-) diff --git a/stream-helpers/throttle.test.ts b/stream-helpers/throttle.test.ts index f9ad0f95..83a070f9 100644 --- a/stream-helpers/throttle.test.ts +++ b/stream-helpers/throttle.test.ts @@ -207,6 +207,43 @@ describe("throttle", () => { expect(second).toEqual({ done: false, value: 3 }); }); + it("enforces spacing when consumer drains a backlog", function* () { + const delay = 60; + const source = createChannel(); + const stream = throttle(delay)(source); + const subscription = yield* stream; + + // Produce two complete windows worth of values while the consumer + // is idle: window 1 → leading 1, trailing 3; window 2 → leading 4, + // trailing 6. + yield* spawn(function* () { + yield* sleep(0); + for (let i = 1; i <= 6; i++) { + yield* source.send(i); + } + }); + + // Wait long enough for the pump to have buffered both windows. + yield* sleep(delay * 3); + + // Now drain rapidly and record emission timestamps. + const emissions: Emission[] = []; + const r1 = yield* subscription.next(); + emissions.push({ value: (r1 as { value: number }).value, time: performance.now() }); + + const r2 = yield* subscription.next(); + emissions.push({ value: (r2 as { value: number }).value, time: performance.now() }); + + // Verify values are the leading+trailing from the windows + expect(emissions[0].value).toBe(1); + expect(emissions[1].value).toBe(6); + + // The gap between the two emissions must respect delayMS even + // though both values were already buffered. + const gap = emissions[1].time - emissions[0].time; + expect(gap).toBeGreaterThanOrEqual(delay * 0.8); + }); + it("passes through values spaced beyond the delay", function* () { const delay = 20; const faucet = yield* useFaucet({ open: true }); diff --git a/stream-helpers/throttle.ts b/stream-helpers/throttle.ts index 7c69f1eb..f853b1dc 100644 --- a/stream-helpers/throttle.ts +++ b/stream-helpers/throttle.ts @@ -1,6 +1,16 @@ import { timebox } from "@effectionx/timebox"; import { createArraySignal } from "@effectionx/signals"; -import { type Stream, spawn } from "effection"; +import { type Operation, type Stream, sleep, spawn } from "effection"; + +/** + * A tagged output item. `flush` marks values that should bypass + * consumer-side delay enforcement (stream-completion trailing and the + * done sentinel). + */ +interface OutputItem { + result: IteratorResult; + flush: boolean; +} /** * Throttles a stream to emit at most one value per `delayMS` milliseconds. @@ -11,7 +21,9 @@ import { type Stream, spawn } from "effection"; * and only the latest is buffered. * - After the window expires, the buffered value is emitted (trailing edge), * which opens a new window. - * - Two emissions are never closer together than `delayMS`. + * - Two emissions are never closer together than `delayMS`, both at the + * pump side (window timing) and at the consumer side (delay gate in + * `next()`), so a slow consumer cannot drain a backlog instantly. * * Stream-completion exception: if the upstream closes during an open window, * the trailing value (if any) is emitted promptly without waiting for the @@ -26,7 +38,7 @@ export function throttle( return (stream: Stream): Stream => ({ *[Symbol.iterator]() { const subscription = yield* stream; - const output = yield* createArraySignal>([]); + const output = yield* createArraySignal>([]); // ── pump ────────────────────────────────────────────────────── // A persistent background task that owns all upstream reads. @@ -40,10 +52,13 @@ export function throttle( // ── leading edge ──────────────────────────────────────── const first = yield* subscription.next(); if (first.done) { - output.push(first); + output.push({ result: first, flush: true }); return; } - output.push({ done: false as const, value: first.value }); + output.push({ + result: { done: false as const, value: first.value }, + flush: false, + }); // ── absorption window ─────────────────────────────────── let trailing: A | undefined; @@ -63,9 +78,12 @@ export function throttle( if (tb.value.done) { // Stream closed during window — flush trailing, then done if (hasTrailing) { - output.push({ done: false as const, value: trailing as A }); + output.push({ + result: { done: false as const, value: trailing as A }, + flush: true, + }); } - output.push(tb.value); + output.push({ result: tb.value, flush: true }); return; } @@ -75,15 +93,37 @@ export function throttle( // ── trailing edge ─────────────────────────────────────── if (hasTrailing) { - output.push({ done: false as const, value: trailing as A }); + output.push({ + result: { done: false as const, value: trailing as A }, + flush: false, + }); } } }); - // ── subscription ───────────────────────────────────────────── + // ── consumer-side delay gate ─────────────────────────────── + let lastEmitTime: number | undefined; + return { - *next() { - return yield* output.shift(); + *next(): Operation> { + const { result, flush } = yield* output.shift(); + + // Enforce minimum spacing between non-flush emissions. + // The first emission (lastEmitTime undefined) passes through + // immediately. Flush items (stream-completion trailing and + // done) bypass the gate so close is not artificially delayed. + if (!result.done && !flush && lastEmitTime !== undefined) { + const wait = delayMS - (performance.now() - lastEmitTime); + if (wait > 0) { + yield* sleep(wait); + } + } + + if (!result.done) { + lastEmitTime = performance.now(); + } + + return result; }, }; }, From 003928de55da07ba78995b98241521a82cdb7e16 Mon Sep 17 00:00:00 2001 From: Taras Mankovski Date: Tue, 31 Mar 2026 08:36:25 -0400 Subject: [PATCH 6/6] =?UTF-8?q?=E2=9C=85=20Remove=20non-zero=20sleep()=20f?= =?UTF-8?q?rom=20test=20sync=20and=20fix=20Biome=20formatting?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace all non-zero sleep() calls in the main test body with deterministic coordination per no-sleep-test-sync policy: - "does not emit trailing before delay" — replaced sleep(delay*0.4) checkpoint with timebox assertion proving next() does not resolve within half the window - "emits at most once per delay window" — removed sleep(delay+20) gap between bursts; is() already syncs, and the pump blocks on its next leading pull - "yields latest window value" — removed sleep(500); renamed to "buffers the latest window value, not the oldest"; output.shift() blocks until the pump pushes - "enforces spacing when consumer drains a backlog" — removed sleep(delay*3); same rationale Fixed Biome formatting on long emissions.push() lines. --- stream-helpers/throttle.test.ts | 72 ++++++++++++++++----------------- 1 file changed, 34 insertions(+), 38 deletions(-) diff --git a/stream-helpers/throttle.test.ts b/stream-helpers/throttle.test.ts index 83a070f9..41214e0f 100644 --- a/stream-helpers/throttle.test.ts +++ b/stream-helpers/throttle.test.ts @@ -1,5 +1,6 @@ import { describe, it } from "@effectionx/bdd"; import { createArraySignal, is } from "@effectionx/signals"; +import { timebox } from "@effectionx/timebox"; import { expect } from "expect"; import { createChannel, sleep, spawn } from "effection"; @@ -80,29 +81,26 @@ describe("throttle", () => { it("does not emit trailing before the remaining delay elapses", function* () { const delay = 100; - const faucet = yield* useFaucet({ open: true }); - const stream = throttle(delay)(faucet); - const emissions = yield* createArraySignal>([]); - - yield* spawn(() => - forEach(function* (value) { - emissions.push({ value, time: performance.now() }); - }, stream), - ); + const source = createChannel(); + const stream = throttle(delay)(source); + const subscription = yield* stream; - yield* sleep(0); + yield* spawn(function* () { + yield* sleep(0); + yield* source.send(1); + yield* source.send(2); + }); - yield* faucet.pour([1, 2]); + const first = yield* subscription.next(); + expect((first as { value: number }).value).toBe(1); - // Checkpoint inside the window: only the leading value should exist. - // sleep() here creates a timing scenario, not waiting for a result. - yield* sleep(delay * 0.4); - expect(emissions.valueOf()).toHaveLength(1); - expect(emissions.valueOf()[0].value).toBe(1); + // Must NOT resolve within the first half of the window. + const mid = yield* timebox(delay * 0.4, () => subscription.next()); + expect(mid.timeout).toBe(true); - // Now wait for trailing to actually arrive - yield* is(emissions, (e) => e.length >= 2); - expect(emissions.valueOf()[1].value).toBe(2); + // After the full window, trailing is available. + const second = yield* subscription.next(); + expect((second as { value: number }).value).toBe(2); }); it("emits at most once per delay window", function* () { @@ -119,10 +117,11 @@ describe("throttle", () => { yield* sleep(0); - // Two rapid bursts separated by a gap longer than the delay + // Two rapid bursts — the pump blocks on its next leading pull + // after the first burst's trailing, so the second pour triggers + // a fresh window. yield* faucet.pour([1, 2, 3]); yield* is(emissions, (e) => e.length >= 2); - yield* sleep(delay + 20); yield* faucet.pour([10, 20, 30]); yield* is(emissions, (e) => e.some((v) => v.value === 30)); @@ -180,13 +179,11 @@ describe("throttle", () => { expect(third).toEqual({ done: true, value: 42 }); }); - it("yields the latest window value when consumer is slower than the window", function* () { + it("buffers the latest window value, not the oldest", function* () { const source = createChannel(); const stream = throttle(100)(source); const subscription = yield* stream; - // Pump three values in a spawned task so they queue up while the - // consumer is idle. yield* spawn(function* () { yield* sleep(0); yield* source.send(1); @@ -198,11 +195,8 @@ describe("throttle", () => { const first = yield* subscription.next(); expect(first).toEqual({ done: false, value: 1 }); - // Wait well beyond delayMS so the window has long expired. - yield* sleep(500); - - // Must be the latest value the absorber saw during the window, not - // the oldest queued one. + // The pump absorbs 2 and 3 during its window. output.shift() + // blocks until the pump pushes the trailing value. const second = yield* subscription.next(); expect(second).toEqual({ done: false, value: 3 }); }); @@ -213,9 +207,6 @@ describe("throttle", () => { const stream = throttle(delay)(source); const subscription = yield* stream; - // Produce two complete windows worth of values while the consumer - // is idle: window 1 → leading 1, trailing 3; window 2 → leading 4, - // trailing 6. yield* spawn(function* () { yield* sleep(0); for (let i = 1; i <= 6; i++) { @@ -223,16 +214,21 @@ describe("throttle", () => { } }); - // Wait long enough for the pump to have buffered both windows. - yield* sleep(delay * 3); - - // Now drain rapidly and record emission timestamps. + // Drain rapidly and record emission timestamps. output.shift() + // blocks until the pump pushes each item, and the delay gate + // enforces spacing between consecutive emissions. const emissions: Emission[] = []; const r1 = yield* subscription.next(); - emissions.push({ value: (r1 as { value: number }).value, time: performance.now() }); + emissions.push({ + value: (r1 as { value: number }).value, + time: performance.now(), + }); const r2 = yield* subscription.next(); - emissions.push({ value: (r2 as { value: number }).value, time: performance.now() }); + emissions.push({ + value: (r2 as { value: number }).value, + time: performance.now(), + }); // Verify values are the leading+trailing from the windows expect(emissions[0].value).toBe(1);