diff --git a/xml/_pipeline.ts b/xml/_pipeline.ts new file mode 100644 index 000000000000..33e6a7d0d388 --- /dev/null +++ b/xml/_pipeline.ts @@ -0,0 +1,42 @@ +// Copyright 2018-2026 the Deno authors. MIT license. +// This module is browser compatible. + +/** + * Internal helper for constructing the tokenizer + parser pair that drives + * both {@linkcode parseXmlStream} and {@linkcode parseXmlRecords}. + * + * @module + */ + +import type { ParseStreamOptions, XmlEventCallbacks } from "./types.ts"; +import { XmlTokenizer } from "./_tokenizer.ts"; +import { XmlEventParser } from "./_parser.ts"; + +/** A configured tokenizer paired with the parser it feeds. */ +export interface XmlPipeline { + /** Tokenizer that consumes raw XML chunks. */ + readonly tokenizer: XmlTokenizer; + /** Event parser that receives tokens and invokes user callbacks. */ + readonly parser: XmlEventParser; +} + +/** + * Constructs a tokenizer/parser pipeline from {@linkcode ParseStreamOptions} + * and {@linkcode XmlEventCallbacks}, applying the canonical defaults used by + * the public streaming APIs. + * + * @param options Stream parse options. + * @param callbacks Event callbacks invoked by the parser. + * @returns The configured tokenizer and parser. + */ +export function createXmlPipeline( + options: ParseStreamOptions, + callbacks: XmlEventCallbacks, +): XmlPipeline { + const trackPosition = options.trackPosition ?? false; + const disallowDoctype = options.disallowDoctype ?? true; + const xml11 = options.xmlVersion === "1.1"; + const tokenizer = new XmlTokenizer({ trackPosition, disallowDoctype, xml11 }); + const parser = new XmlEventParser(callbacks, options, xml11); + return { tokenizer, parser }; +} diff --git a/xml/deno.json b/xml/deno.json index 5e7bfbfe898d..177428fa7432 100644 --- a/xml/deno.json +++ b/xml/deno.json @@ -3,9 +3,10 @@ "version": "0.1.1", "exports": { ".": "./mod.ts", - "./types": "./types.ts", - "./parse-stream": "./parse_stream.ts", "./parse": "./parse.ts", - "./stringify": "./stringify.ts" + "./parse-records": "./parse_records.ts", + "./parse-stream": "./parse_stream.ts", + "./stringify": "./stringify.ts", + "./types": "./types.ts" } } diff --git a/xml/mod.ts b/xml/mod.ts index 3a7da5ee4961..9c478f7dd48a 100644 --- a/xml/mod.ts +++ b/xml/mod.ts @@ -9,12 +9,13 @@ * * ## Parsing APIs * - * Two parsing APIs are provided for different use cases: + * Three parsing APIs are provided for different use cases: * * | API | Use Case | Output | * |-----|----------|--------| * | {@linkcode parse} | Parse a complete XML string | Document tree | * | {@linkcode parseXmlStream} | Streaming with maximum throughput | Direct callbacks | + * | {@linkcode parseXmlRecords} | Streaming records assembled in callbacks | `AsyncGenerator` | * * ## Quick Examples * @@ -82,4 +83,5 @@ export * from "./types.ts"; export * from "./parse_stream.ts"; export * from "./parse.ts"; +export * from "./parse_records.ts"; export * from "./stringify.ts"; diff --git a/xml/parse_records.ts b/xml/parse_records.ts new file mode 100644 index 000000000000..3460d1ba1ef4 --- /dev/null +++ b/xml/parse_records.ts @@ -0,0 +1,106 @@ +// Copyright 2018-2026 the Deno authors. MIT license. +// This module is browser compatible. + +/** + * Async-generator adapter that turns an XML chunk source into a stream of + * application-defined records assembled inside SAX-style event callbacks. + * + * @module + */ + +import type { ParseStreamOptions, XmlEventCallbacks } from "./types.ts"; +import { createXmlPipeline } from "./_pipeline.ts"; + +/** + * Parses an async iterable of XML chunks and yields records assembled inside + * SAX-style event callbacks. + * + * Each input chunk is parsed synchronously and the records emitted from that + * chunk are buffered before any are yielded; the consumer then pulls records + * one at a time. Yield rate (and downstream backpressure) is per-record while + * peak memory is bounded by the records produced by a single chunk. + * + * If parsing throws (XML syntax error or a user callback that throws), the + * iteration rejects immediately; records buffered within the failing chunk + * are discarded. Records yielded by earlier chunks remain visible. + * + * For `pipeThrough` composition, wrap the result with + * {@linkcode ReadableStream.from}. + * + * @typeParam T The type of records yielded by the generator. + * + * @param source An async iterable of XML string chunks (e.g. a + * `ReadableStream` piped through {@linkcode TextDecoderStream}). + * + * @param createCallbacks Factory invoked once with an `emit` function; returns + * the SAX-style callbacks that build records and call `emit` per completed + * record. + * + * @param options Parser options forwarded to the underlying tokenizer/parser. + * + * @returns An async generator that yields records as the document is parsed. + * + * @example Parse items from an XML feed + * ```ts + * import { parseXmlRecords } from "@std/xml/parse-records"; + * import { assertEquals } from "@std/assert"; + * + * const xml = "FirstSecond"; + * + * const titles: string[] = []; + * for await ( + * const title of parseXmlRecords( + * ReadableStream.from([xml]), + * (emit) => { + * let inside = false; + * let text = ""; + * return { + * onStartElement(name) { + * if (name === "item") { + * inside = true; + * text = ""; + * } + * }, + * onText(t) { + * if (inside) text += t; + * }, + * onEndElement(name) { + * if (name === "item") { + * emit(text); + * inside = false; + * } + * }, + * }; + * }, + * ) + * ) { + * titles.push(title); + * } + * + * assertEquals(titles, ["First", "Second"]); + * ``` + */ +export async function* parseXmlRecords( + source: AsyncIterable, + createCallbacks: (emit: (record: T) => void) => XmlEventCallbacks, + options: ParseStreamOptions = {}, +): AsyncGenerator { + const buffer: T[] = []; + const callbacks = createCallbacks((record) => buffer.push(record)); + const { tokenizer, parser } = createXmlPipeline(options, callbacks); + + // Fail-fast contract: parse errors propagate immediately and records + // buffered within the failing chunk are dropped. Wrapping `process` / + // `finalize` in `try { ... } finally { drain }` is tempting but unsafe — + // `iter.return()` from a consumer `break` mid-drain silently swallows + // the pending exception per ECMAScript semantics. + for await (const chunk of source) { + tokenizer.process(chunk, parser); + for (let i = 0; i < buffer.length; i++) yield buffer[i]!; + buffer.length = 0; + } + tokenizer.finalize(parser); + parser.finalize(); + for (let i = 0; i < buffer.length; i++) yield buffer[i]!; + buffer.length = 0; +} diff --git a/xml/parse_records_test.ts b/xml/parse_records_test.ts new file mode 100644 index 000000000000..f8d97ea6507d --- /dev/null +++ b/xml/parse_records_test.ts @@ -0,0 +1,458 @@ +// Copyright 2018-2026 the Deno authors. MIT license. + +import { assert, assertEquals, assertRejects } from "@std/assert"; +import { parseXmlRecords } from "./parse_records.ts"; +import { XmlSyntaxError } from "./types.ts"; + +async function collect(iter: AsyncIterable): Promise { + const out: T[] = []; + for await (const r of iter) out.push(r); + return out; +} + +function items( + xml: string | string[], +): AsyncGenerator> { + const chunks = Array.isArray(xml) ? xml : [xml]; + return parseXmlRecords>( + ReadableStream.from(chunks), + (emit) => { + let insideItem = false; + let currentTag = ""; + let currentItem: Record = {}; + return { + onStartElement(name) { + if (name === "item") { + insideItem = true; + currentItem = {}; + } else if (insideItem) { + currentTag = name; + } + }, + onText(text) { + if (insideItem && currentTag) { + currentItem[currentTag] = (currentItem[currentTag] ?? "") + text; + } + }, + onEndElement(name) { + if (name === "item") { + emit(currentItem); + insideItem = false; + currentItem = {}; + } + currentTag = ""; + }, + }; + }, + { ignoreWhitespace: true }, + ); +} + +// ============================================================================= +// Basic record emission +// ============================================================================= + +Deno.test("parseXmlRecords() yields records assembled from callbacks", async () => { + const xml = + "AB"; + const records = await collect(items(xml)); + assertEquals(records, [{ title: "A" }, { title: "B" }]); +}); + +Deno.test("parseXmlRecords() assembles records spanning chunk boundaries", async () => { + const xml = + "HelloWorld"; + const chunks: string[] = []; + for (let i = 0; i < xml.length; i += 7) chunks.push(xml.slice(i, i + 7)); + const records = await collect(items(chunks)); + assertEquals(records, [{ title: "Hello" }, { title: "World" }]); +}); + +Deno.test("parseXmlRecords() completes cleanly when no records are emitted", async () => { + const xml = "x"; + const records = await collect(items(xml)); + assertEquals(records, []); +}); + +Deno.test("parseXmlRecords() errors on truly empty input (no root element)", async () => { + await assertRejects(() => collect(items("")), XmlSyntaxError); +}); + +Deno.test("parseXmlRecords() preserves record order across many chunks", async () => { + const n = 100; + const xml = `${ + Array.from({ length: n }, (_, i) => `${i}`).join("") + }`; + const chunks: string[] = []; + for (let i = 0; i < xml.length; i += 13) chunks.push(xml.slice(i, i + 13)); + const records = await collect(items(chunks)); + assertEquals(records, Array.from({ length: n }, (_, i) => ({ id: `${i}` }))); +}); + +Deno.test("parseXmlRecords() handles many records emitted in a single chunk", async () => { + const n = 5000; + const xml = `${ + Array.from({ length: n }, (_, i) => `${i}`).join("") + }`; + const records = await collect(items([xml])); + assertEquals(records.length, n); + assertEquals(records[0], { id: "0" }); + assertEquals(records[n - 1], { id: `${n - 1}` }); +}); + +// ============================================================================= +// Parse option forwarding +// ============================================================================= + +Deno.test("parseXmlRecords() ignoreWhitespace suppresses whitespace-only text events", async () => { + const xml = "\n A\n"; + const texts: string[] = []; + await collect( + parseXmlRecords( + ReadableStream.from([xml]), + (emit) => ({ + onText(text) { + texts.push(text); + emit(text); + }, + }), + { ignoreWhitespace: true }, + ), + ); + assertEquals(texts, ["A"]); +}); + +Deno.test("parseXmlRecords() xmlVersion '1.1' enables XML 1.1 character rules", async () => { + const xml = ""; + const records = await collect( + parseXmlRecords( + ReadableStream.from([xml]), + (emit) => ({ + onText(text) { + emit(text); + }, + }), + { xmlVersion: "1.1" }, + ), + ); + assertEquals(records, ["\x01"]); +}); + +Deno.test("parseXmlRecords() rejects DOCTYPE by default", async () => { + const xml = ''; + await assertRejects( + () => + collect( + parseXmlRecords( + ReadableStream.from([xml]), + (_emit) => ({}), + ), + ), + XmlSyntaxError, + ); +}); + +Deno.test("parseXmlRecords() allows DOCTYPE when disallowDoctype is false", async () => { + const xml = "1"; + const records = await collect( + parseXmlRecords( + ReadableStream.from([xml]), + (emit) => ({ + onText(text) { + emit(text); + }, + }), + { disallowDoctype: false }, + ), + ); + assertEquals(records, ["1"]); +}); + +// ============================================================================= +// createCallbacks lifecycle +// ============================================================================= + +Deno.test("parseXmlRecords() invokes createCallbacks exactly once per call", async () => { + let callCount = 0; + const xml = ""; + await collect( + parseXmlRecords( + ReadableStream.from([xml]), + (emit) => { + callCount++; + return { + onStartElement(name) { + if (name === "item") emit(name); + }, + }; + }, + ), + ); + assertEquals(callCount, 1); +}); + +// ============================================================================= +// Error semantics +// ============================================================================= + +Deno.test("parseXmlRecords() throws on malformed XML", async () => { + const xml = ""; + await assertRejects( + () => + collect( + parseXmlRecords( + ReadableStream.from([xml]), + (_emit) => ({}), + ), + ), + XmlSyntaxError, + ); +}); + +Deno.test("parseXmlRecords() propagates errors thrown from a callback", async () => { + const xml = ""; + await assertRejects( + () => + collect( + parseXmlRecords( + ReadableStream.from([xml]), + (_emit) => ({ + onStartElement(name) { + if (name === "item") throw new Error("callback error"); + }, + }), + ), + ), + Error, + "callback error", + ); +}); + +Deno.test("parseXmlRecords() propagates errors thrown from createCallbacks", async () => { + await assertRejects( + () => + collect( + parseXmlRecords( + ReadableStream.from([""]), + (_emit) => { + throw new Error("init error"); + }, + ), + ), + Error, + "init error", + ); +}); + +Deno.test("parseXmlRecords() discards records buffered in a chunk that errors", async () => { + const xml = + "12"; + const received: string[] = []; + await assertRejects(async () => { + for await (const id of ids(xml)) received.push(id); + }, XmlSyntaxError); + // The malformed third item and the preceding items 1, 2 are all parsed in + // the same chunk; on syntax error the buffer is discarded and the + // iteration rejects without yielding anything from that chunk. + assertEquals(received, []); +}); + +Deno.test("parseXmlRecords() yields records from earlier chunks before failing on a later one", async () => { + // Splitting across chunk boundaries lets the first two items drain through + // the iteration cleanly before the third (malformed) chunk is processed. + const chunks = [ + "1", + "2", + "", + ]; + const received: string[] = []; + await assertRejects(async () => { + for await (const id of ids(chunks)) received.push(id); + }, XmlSyntaxError); + assertEquals(received, ["1", "2"]); +}); + +Deno.test("parseXmlRecords() rejects with the parse error even if the consumer breaks early when the next chunk would throw", async () => { + // Iteration consumes one record then breaks. The break completes the + // iteration cleanly; the malformed chunk is never processed because the + // consumer asked to stop. This documents the iterator contract that + // breaking signals "no further values needed". + const chunks = [ + "1", + "", + ]; + const received: string[] = []; + for await (const id of ids(chunks)) { + received.push(id); + break; + } + assertEquals(received, ["1"]); +}); + +Deno.test("parseXmlRecords() discards records buffered during a failing finalize", async () => { + const xml = "hello"; + const received: string[] = []; + const iter = parseXmlRecords( + ReadableStream.from([xml]), + (emit) => ({ + onText(text) { + emit(text); + }, + }), + ); + await assertRejects(async () => { + for await (const r of iter) received.push(r); + }, XmlSyntaxError); + // Pending text "hello" is flushed via onText inside finalize, but the + // unclosed-root error means the buffer is never drained. + assertEquals(received, []); +}); + +Deno.test("parseXmlRecords() rejects with the user error when a callback throws", async () => { + const xml = + "123"; + const iter = parseXmlRecords( + ReadableStream.from([xml]), + (emit) => { + let inside = false; + let text = ""; + let count = 0; + return { + onStartElement(name) { + if (name === "id") { + inside = true; + text = ""; + } + }, + onText(t) { + if (inside) text += t; + }, + onEndElement(name) { + if (name === "id") { + count++; + if (count === 3) throw new Error("boom"); + emit(text); + inside = false; + } + }, + }; + }, + { ignoreWhitespace: true }, + ); + + const received: string[] = []; + await assertRejects( + async () => { + for await (const id of iter) received.push(id); + }, + Error, + "boom", + ); + // Records 1, 2 were buffered in the same chunk as the throw on item 3; + // fail-fast contract drops them along with the throw. + assertEquals(received, []); +}); + +// ============================================================================= +// Per-record yielding and early termination +// ============================================================================= + +function ids(xml: string | string[]): AsyncGenerator { + const chunks = Array.isArray(xml) ? xml : [xml]; + return parseXmlRecords( + ReadableStream.from(chunks), + (emit) => { + let inside = false; + let text = ""; + return { + onStartElement(name) { + if (name === "id") { + inside = true; + text = ""; + } + }, + onText(t) { + if (inside) text += t; + }, + onEndElement(name) { + if (name === "id") { + emit(text); + inside = false; + } + }, + }; + }, + { ignoreWhitespace: true }, + ); +} + +Deno.test("parseXmlRecords() yields records as the document is parsed (per-record backpressure)", async () => { + const n = 20; + const xml = `${ + Array.from({ length: n }, (_, i) => `${i}`).join("") + }`; + const received: string[] = []; + + for await (const id of ids(xml)) { + await new Promise((r) => setTimeout(r, 0)); + received.push(id); + } + + assertEquals(received, Array.from({ length: n }, (_, i) => `${i}`)); +}); + +Deno.test("parseXmlRecords() stops parsing further chunks when the consumer breaks", async () => { + const items = Array.from( + { length: 10 }, + (_, i) => `${i}`, + ); + const chunks = ["", ...items, ""]; + const totalChunks = chunks.length; + let pullCount = 0; + const source = new ReadableStream({ + pull(controller) { + pullCount++; + const next = chunks.shift(); + if (next === undefined) controller.close(); + else controller.enqueue(next); + }, + }); + + const iter = parseXmlRecords( + source, + (emit) => { + let inside = false; + let text = ""; + return { + onStartElement(name) { + if (name === "id") { + inside = true; + text = ""; + } + }, + onText(t) { + if (inside) text += t; + }, + onEndElement(name) { + if (name === "id") { + emit(text); + inside = false; + } + }, + }; + }, + { ignoreWhitespace: true }, + ); + + const received: string[] = []; + for await (const id of iter) { + received.push(id); + if (received.length === 2) break; + } + + assertEquals(received, ["0", "1"]); + assert( + pullCount < totalChunks, + `expected early break to bound upstream pulls, got ${pullCount}/${totalChunks}`, + ); +}); diff --git a/xml/parse_stream.ts b/xml/parse_stream.ts index abd19bd12450..8406bc33521b 100644 --- a/xml/parse_stream.ts +++ b/xml/parse_stream.ts @@ -8,8 +8,7 @@ */ import type { ParseStreamOptions, XmlEventCallbacks } from "./types.ts"; -import { XmlTokenizer } from "./_tokenizer.ts"; -import { XmlEventParser } from "./_parser.ts"; +import { createXmlPipeline } from "./_pipeline.ts"; export type { ParseStreamOptions, XmlEventCallbacks } from "./types.ts"; @@ -64,11 +63,7 @@ export async function parseXmlStream( callbacks: XmlEventCallbacks, options: ParseStreamOptions = {}, ): Promise { - const trackPosition = options.trackPosition ?? false; - const disallowDoctype = options.disallowDoctype ?? true; - const xml11 = options.xmlVersion === "1.1"; - const tokenizer = new XmlTokenizer({ trackPosition, disallowDoctype, xml11 }); - const parser = new XmlEventParser(callbacks, options, xml11); + const { tokenizer, parser } = createXmlPipeline(options, callbacks); for await (const chunk of source) { tokenizer.process(chunk, parser);