Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions stream-helpers/mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,3 +14,4 @@ export * from "./last.ts";
export * from "./take.ts";
export * from "./take-while.ts";
export * from "./take-until.ts";
export * from "./throttle.ts";
124 changes: 124 additions & 0 deletions stream-helpers/throttle.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
import { describe, it } from "@effectionx/bdd";
import { createArraySignal, is } from "@effectionx/signals";
import { expect } from "expect";
import { createChannel, sleep, spawn } from "effection";

import { throttle } from "./throttle.ts";
import { forEach } from "./for-each.ts";
import { useFaucet } from "./test-helpers/faucet.ts";

describe("throttle", () => {
it("emits the first value immediately", function* () {
const source = createChannel<number, never>();
const stream = throttle<number>(100)(source);
const subscription = yield* stream;

const next = yield* spawn(() => subscription.next());
yield* source.send(1);

expect((yield* next).value).toBe(1);
});

it("drops intermediate values and emits trailing", function* () {
const faucet = yield* useFaucet<number>({ open: true });
const stream = throttle<number>(50)(faucet);
const results = yield* createArraySignal<number>([]);

yield* spawn(() =>
forEach(function* (value) {
results.push(value);
}, stream),
);

yield* sleep(0);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need a sleep. Maybe use a resource for your loop?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made forEach a resource in #209 — how does this look? It'd need to be merged before this PR.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd hate to lose synchronous foreach and would prefer it to stay blocking.

I think there are a couple of ways to have our cake and eat it too.

  1. have both a foreground and background versions
  • forEach<T, TClose>(stream: Stream<T, TClose>, body): Operation<TClose> -- foregroundu
  • useForEach<T, TClose>(stream: Stream<T, TClose>, body): Future<TClose> -- background
  1. allow forEach to accept both a subscription and a stream. That allows you to subscribe first and then spawn safely in the background.

  2. add a "live stream" helper that takes a stream which does (2) for you

useBoundStream(stream: Stream): Stream

not sure what a good name for this is, but it takes a stream, subscribes to it, and then return a stream that returns that subscription. This ensures that any stream interfaces can use it it the background while ensuring that they do not miss any items.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about just useStream?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does that indicate enough context? I worry that the programmer (ai or otherwise) might see that and think: "well of course I want to use the stream"

What differenties this from a normal steam is that it laid over a "live" subscription.

  • useSubscribedStream()
  • useLiveStream()
  • useUniqueStream()

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In observables word, I believe these are called hot observables. We could use that terminology to reduce naming dissonance for similar concepts.

useHotStream()

Sounds steamy.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, useHotStream() sounds kinda gross tbqh :)


yield* faucet.pour([1, 2, 3, 4, 5]);

yield* is(results, (r) => r.length >= 2);

const values = results.valueOf();
expect(values[0]).toBe(1);
expect(values[values.length - 1]).toBe(5);
});

it("emits the final value before stream closes", function* () {
const source = createChannel<number, never>();
const stream = throttle<number>(200)(source);
const results = yield* createArraySignal<number>([]);

yield* spawn(() =>
forEach(function* (value) {
results.push(value);
}, stream),
);

yield* sleep(0);

yield* source.send(1);
yield* source.send(2);
yield* source.send(3);
yield* source.close();

Check failure on line 60 in stream-helpers/throttle.test.ts

View workflow job for this annotation

GitHub Actions / Lint & Format

Expected 1 arguments, but got 0.

yield* is(results, (r) => r.includes(3));

const values = results.valueOf();
expect(values).toContain(1);
expect(values).toContain(3);
});

it("passes through values spaced beyond the delay", function* () {
const faucet = yield* useFaucet<number>({ open: true });
const stream = throttle<number>(20)(faucet);
const results = yield* createArraySignal<number>([]);

yield* spawn(() =>
forEach(function* (value) {
results.push(value);
}, stream),
);

yield* sleep(0);

yield* faucet.pour(function* (send) {
yield* send(1);
yield* sleep(50);
yield* send(2);
yield* sleep(50);
yield* send(3);
});

yield* is(results, (r) => r.length >= 3);

expect(results.valueOf()).toEqual([1, 2, 3]);
});
Comment thread
coderabbitai[bot] marked this conversation as resolved.

it("handles multiple throttle windows", function* () {
const faucet = yield* useFaucet<number>({ open: true });
const stream = throttle<number>(30)(faucet);
const results = yield* createArraySignal<number>([]);

yield* spawn(() =>
forEach(function* (value) {
results.push(value);
}, stream),
);

yield* sleep(0);

yield* faucet.pour([1, 2, 3]);

yield* is(results, (r) => r.length >= 2);

yield* sleep(60);

yield* faucet.pour([10, 20, 30]);

yield* is(results, (r) => r.includes(30));

const values = results.valueOf();
expect(values).toContain(1);
expect(values).toContain(3);
expect(values).toContain(10);
expect(values).toContain(30);
});
});
86 changes: 86 additions & 0 deletions stream-helpers/throttle.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
import { timebox } from "@effectionx/timebox";
import { type Stream, type Task, spawn } from "effection";

/**
* Throttles a stream to emit at most one value per `delayMS` milliseconds.
*
* Uses leading+trailing semantics: the first value is emitted immediately,
* intermediate values during the throttle window are dropped, and the most
* recent value is always emitted after the window expires. This ensures the
* final state is never lost when a burst of events ends mid-window.
*
* @param delayMS - The minimum time between emissions in milliseconds
*/
export function throttle<A>(
delayMS: number,
): <TClose>(stream: Stream<A, TClose>) => Stream<A, TClose> {
Comment thread
taras marked this conversation as resolved.
Outdated
return <TClose>(stream: Stream<A, TClose>): Stream<A, TClose> => ({
*[Symbol.iterator]() {
const subscription = yield* stream;
let lastPull: Task<IteratorResult<A, TClose>> | undefined;
let pendingTrailing: A | undefined;
let hasTrailing = false;
let doneResult: IteratorResult<A, TClose> | undefined;

return {
*next() {
// Emit stashed trailing value from previous window
if (hasTrailing) {
const value = pendingTrailing as A;
hasTrailing = false;
pendingTrailing = undefined;
return { done: false as const, value };
}

// Stream already ended
if (doneResult) {
return doneResult;
}

// Pull the next upstream value
const result = lastPull
? yield* lastPull
: yield* subscription.next();
lastPull = undefined;

if (result.done) {
return result;
}

const valueToEmit = result.value;

// Throttle window: absorb upstream values for delayMS
const windowStart = performance.now();

while (true) {
const remaining = delayMS - (performance.now() - windowStart);
if (remaining <= 0) break;

lastPull = yield* spawn(() => subscription.next());
const tb = yield* timebox(remaining, () => lastPull!);

if (tb.timeout) {
// Timer expired, lastPull survives for next call
break;
}

const upstream = tb.value;
lastPull = undefined;

if (upstream.done) {
doneResult = upstream;
break;
}

// Overwrite trailing — only the latest matters
pendingTrailing = upstream.value;
hasTrailing = true;
}

// Emit the leading-edge value
return { done: false as const, value: valueToEmit };
},
};
},
});
}
Loading