diff --git a/packages/agent/src/dkg-agent-lifecycle.ts b/packages/agent/src/dkg-agent-lifecycle.ts index a569701bc..648f49999 100644 --- a/packages/agent/src/dkg-agent-lifecycle.ts +++ b/packages/agent/src/dkg-agent-lifecycle.ts @@ -1204,6 +1204,13 @@ export class LifecycleSyncMethods extends DKGAgentBase { `keeping handler active: ${err instanceof Error ? err.message : String(err)}`, ); }, + onDecline: (details) => { + this.log.warn( + attemptCtx, + `V10 StorageACK declined: code=${details.code} ` + + `cg=${details.contextGraphId} reason=${details.message}`, + ); + }, // PR5 ACK-provenance โ€” bind to the agent's host-mode // bookkeeping so every signed ACK carries which of the // four LU-6 Phase B discovery paths brought this CG's diff --git a/packages/agent/src/dkg-agent-publish.ts b/packages/agent/src/dkg-agent-publish.ts index 5c93bb11e..bb4e1de37 100644 --- a/packages/agent/src/dkg-agent-publish.ts +++ b/packages/agent/src/dkg-agent-publish.ts @@ -98,6 +98,7 @@ import { sharedMemoryReadBothFilter, partitionCatalogQuads, } from '@origintrail-official/dkg-core'; +import { normalizeLargeRdfLiteralsForBlazegraph } from '@origintrail-official/dkg-core'; import { GraphManager, PrivateContentStore, createTripleStore, type TripleStore, type TripleStoreConfig, type Quad, type LargeLiteralStorageConfig } from '@origintrail-official/dkg-storage'; import { EVMChainAdapter, NoChainAdapter, enrichEvmError, buildKnowledgeAssetUal, type EVMAdapterConfig, type ChainAdapter, type CreateContextGraphParams, type CreateOnChainContextGraphParams, type CreateOnChainContextGraphResult, type TxResult, type V10PublishingConvictionAccountInfo } from '@origintrail-official/dkg-chain'; import { @@ -1267,6 +1268,10 @@ export class PublishMethods extends DKGAgentBase { ): Promise { const ctx = opts?.operationCtx ?? createOperationContext('publish'); const onPhase = opts?.onPhase; + quads = normalizeLargeRdfLiteralsForBlazegraph(quads).quads as Quad[]; + if (privateQuads) { + privateQuads = normalizeLargeRdfLiteralsForBlazegraph(privateQuads).quads as Quad[]; + } this.log.info(ctx, `Starting publish to context graph "${contextGraphId}" with ${quads.length} triples`); const isSystem = contextGraphId === SYSTEM_CONTEXT_GRAPHS.AGENTS || contextGraphId === SYSTEM_CONTEXT_GRAPHS.ONTOLOGY; @@ -2567,6 +2572,10 @@ export class PublishMethods extends DKGAgentBase { privateQuads?: Quad[]; }, ): Promise { + quads = normalizeLargeRdfLiteralsForBlazegraph(quads).quads as Quad[]; + const normalizedPrivateQuads = opts?.privateQuads + ? normalizeLargeRdfLiteralsForBlazegraph(opts.privateQuads).quads as Quad[] + : undefined; if ( opts?.authorAgentAddress != null && opts?.preSignedAuthorAttestation != null @@ -2595,7 +2604,7 @@ export class PublishMethods extends DKGAgentBase { // KC merkle. The order MUST follow the publisher's manifest // iteration over `kaMap`, which is the insertion order โ€” same map // we built two lines up. - const privateQuads = opts?.privateQuads ?? []; + const privateQuads = normalizedPrivateQuads ?? []; const privateRoots: Uint8Array[] = []; for (const rootEntity of kaMap.keys()) { if (privateQuads.length === 0) break; diff --git a/packages/agent/test/storage-ack-protocol-extra.test.ts b/packages/agent/test/storage-ack-protocol-extra.test.ts index c27da51b3..c64cd3001 100644 --- a/packages/agent/test/storage-ack-protocol-extra.test.ts +++ b/packages/agent/test/storage-ack-protocol-extra.test.ts @@ -56,6 +56,15 @@ describe('A-9: storage-ack protocol id (libp2p) pin', () => { expect(combined).toMatch(registerRE); }); + it('agent wires core-side StorageACK decline logging', () => { + const combined = walk(AGENT_SRC) + .map((f) => readFileSync(f, 'utf8')) + .join('\n'); + + expect(combined).toMatch(/onDecline:\s*\(details\)\s*=>/); + expect(combined).toContain('V10 StorageACK declined: code='); + }); + it('agent source never publishes ACKs on GossipSub', () => { // A false-positive here would be any call like // `publish('/dkg/10.0.1/storage-ack', ...)` or diff --git a/packages/cli/src/daemon/http-utils.ts b/packages/cli/src/daemon/http-utils.ts index cd4031f4d..d25d2f07d 100644 --- a/packages/cli/src/daemon/http-utils.ts +++ b/packages/cli/src/daemon/http-utils.ts @@ -67,6 +67,19 @@ export function payloadTooLargeResponseBody(err: unknown): Record { + return { + error: err instanceof Error ? err.message : String(err ?? 'RDF literal exceeds size limit'), + code: 'RDF_LITERAL_TOO_LARGE', + }; +} + export async function resolveNameToPeerId( agent: DKGAgent, nameOrId: string, diff --git a/packages/cli/src/daemon/routes/knowledge-assets.ts b/packages/cli/src/daemon/routes/knowledge-assets.ts index 1b1b321dd..6f4aa2881 100644 --- a/packages/cli/src/daemon/routes/knowledge-assets.ts +++ b/packages/cli/src/daemon/routes/knowledge-assets.ts @@ -27,8 +27,10 @@ import type { RequestContext } from "./context.js"; import { isPayloadTooLargeError, + isRdfLiteralSizeError, jsonResponse, payloadTooLargeResponseBody, + rdfLiteralSizeResponseBody, readBody, safeParseJson, validateEntities, @@ -650,6 +652,9 @@ export async function handleKnowledgeAssetsRoutes(ctx: RequestContext): Promise< if (isPayloadTooLargeError(e)) { return jsonResponse(res, 413, payloadTooLargeResponseBody(e)); } + if (isRdfLiteralSizeError(e)) { + return jsonResponse(res, 400, rdfLiteralSizeResponseBody(e)); + } return jsonResponse(res, 500, { error: e?.message ?? String(e) }); } } diff --git a/packages/cli/src/daemon/routes/memory.ts b/packages/cli/src/daemon/routes/memory.ts index 4e9c81cb4..28e12b99b 100644 --- a/packages/cli/src/daemon/routes/memory.ts +++ b/packages/cli/src/daemon/routes/memory.ts @@ -226,6 +226,8 @@ import { shortId, sleep, deriveBlockExplorerUrl, + isRdfLiteralSizeError, + rdfLiteralSizeResponseBody, } from '../http-utils.js'; import { normalizeRepo, @@ -1695,6 +1697,9 @@ WHERE { ) { return jsonResponse(res, 400, { error: err.message }); } + if (isRdfLiteralSizeError(err)) { + return jsonResponse(res, 400, rdfLiteralSizeResponseBody(err)); + } // Strict curator-ack gate (OT-RFC-49 curator-leader): the write was NOT // persisted because the curator (the authoritative replica) did not // confirm it. Surface a distinct, actionable status instead of a generic @@ -2256,6 +2261,9 @@ WHERE { }); } catch (err: any) { tracker.fail(ctx, err); + if (isRdfLiteralSizeError(err)) { + return jsonResponse(res, 400, rdfLiteralSizeResponseBody(err)); + } if ( err.name === "StaleWriteError" || err.message?.includes("stale") || diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index 9805c0c89..de5ea595d 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -5,6 +5,7 @@ export { parseDotenvValue } from './dotenv.js'; export * from './memory-model.js'; export * from './trust.js'; export * from './publisher-extension.js'; +export * from './rdf-literal-limits.js'; export * from './imported-artifact-bytes.js'; export * from './imported-artifact-metadata.js'; export * from './event-bus.js'; diff --git a/packages/core/src/rdf-literal-limits.ts b/packages/core/src/rdf-literal-limits.ts new file mode 100644 index 000000000..9b62ccd5f --- /dev/null +++ b/packages/core/src/rdf-literal-limits.ts @@ -0,0 +1,365 @@ +import { createHash } from 'node:crypto'; + +export interface RdfLiteralQuadLike { + subject: string; + predicate: string; + object: string; + graph?: string; +} + +export interface ParsedRdfLiteralTerm { + lexical: string; + suffix: string; +} + +export interface RdfLiteralRewrite { + subject: string; + predicate: string; + originalMutf8Bytes: number; + chunkCount: number; + bodySubject: string; + sha256: string; +} + +export interface OversizedRdfLiteralViolation { + index: number; + subject: string; + predicate: string; + mutf8Bytes: number; + maxBytes: number; +} + +export interface RdfLiteralCompatibilityResult { + quads: RdfLiteralQuadLike[]; + rewrites: RdfLiteralRewrite[]; +} + +export interface RdfLiteralCompatibilityOptions { + maxLiteralMutf8Bytes?: number; + textChunkMutf8Bytes?: number; + textPredicates?: Iterable; +} + +export const BLAZEGRAPH_MUTF8_LIMIT = 65_535; +export const BLAZEGRAPH_SAFE_LITERAL_MUTF8_BYTES = 60_000; + +export const RDF_TYPE = 'http://www.w3.org/1999/02/22-rdf-syntax-ns#type'; +export const XSD_INTEGER = 'http://www.w3.org/2001/XMLSchema#integer'; +export const SCHEMA_TEXT_PREDICATES = [ + 'http://schema.org/text', + 'https://schema.org/text', +] as const; + +export const DKG_TEXT_BODY = 'http://dkg.io/ontology/TextBody'; +export const DKG_TEXT_CHUNK = 'http://dkg.io/ontology/TextChunk'; +export const DKG_HAS_TEXT_BODY = 'http://dkg.io/ontology/hasTextBody'; +export const DKG_HAS_TEXT_CHUNK = 'http://dkg.io/ontology/hasTextChunk'; +export const DKG_TEXT_SOURCE_PREDICATE = 'http://dkg.io/ontology/textSourcePredicate'; +export const DKG_TEXT_CONTENT_SHA256 = 'http://dkg.io/ontology/textContentSha256'; +export const DKG_TEXT_LITERAL_MUTF8_BYTES = 'http://dkg.io/ontology/textLiteralMutf8Bytes'; +export const DKG_TEXT_UTF8_BYTES = 'http://dkg.io/ontology/textUtf8Bytes'; +export const DKG_TEXT_CHUNK_COUNT = 'http://dkg.io/ontology/textChunkCount'; +export const DKG_TEXT_CHUNK_LIMIT = 'http://dkg.io/ontology/textChunkMutf8Limit'; +export const DKG_CHUNK_INDEX = 'http://dkg.io/ontology/chunkIndex'; + +export class RdfLiteralSizeError extends Error { + readonly code = 'RDF_LITERAL_TOO_LARGE'; + readonly statusCode = 400; + + constructor( + message: string, + readonly violations: OversizedRdfLiteralViolation[], + ) { + super(message); + this.name = 'RdfLiteralSizeError'; + } +} + +/** + * Java Modified UTF-8 byte length of a string. Blazegraph reaches Java's + * DataOutputStream.writeUTF() path for string keys, which hard-caps this + * encoded length at 65,535 bytes. + */ +export function javaModifiedUtf8Length(str: string): number { + let len = 0; + for (let i = 0; i < str.length; i++) { + const code = str.charCodeAt(i); + if (code === 0) { + len += 2; + } else if (code <= 0x7f) { + len += 1; + } else if (code <= 0x7ff) { + len += 2; + } else { + len += 3; + } + } + return len; +} + +export function parseRdfLiteralTerm(term: string): ParsedRdfLiteralTerm | null { + if (!term.startsWith('"')) return null; + let escaped = false; + for (let i = 1; i < term.length; i++) { + const ch = term[i]!; + if (escaped) { + escaped = false; + continue; + } + if (ch === '\\') { + escaped = true; + continue; + } + if (ch !== '"') continue; + const body = term.slice(1, i); + const suffix = term.slice(i + 1); + validateLiteralSuffix(suffix); + return { + lexical: decodeRdfLiteralBody(body), + suffix, + }; + } + return null; +} + +export function normalizeLargeRdfLiteralsForBlazegraph( + quads: readonly RdfLiteralQuadLike[], + options: RdfLiteralCompatibilityOptions = {}, +): RdfLiteralCompatibilityResult { + const maxBytes = options.maxLiteralMutf8Bytes ?? BLAZEGRAPH_SAFE_LITERAL_MUTF8_BYTES; + const chunkMaxBytes = options.textChunkMutf8Bytes ?? maxBytes; + if (!Number.isInteger(maxBytes) || maxBytes <= 0 || maxBytes > BLAZEGRAPH_MUTF8_LIMIT) { + throw new Error(`Invalid maxLiteralMutf8Bytes: ${maxBytes}`); + } + if (!Number.isInteger(chunkMaxBytes) || chunkMaxBytes <= 0 || chunkMaxBytes > maxBytes) { + throw new Error(`Invalid textChunkMutf8Bytes: ${chunkMaxBytes}`); + } + + const textPredicates = new Set(options.textPredicates ?? SCHEMA_TEXT_PREDICATES); + const out: RdfLiteralQuadLike[] = []; + const rewrites: RdfLiteralRewrite[] = []; + const violations: OversizedRdfLiteralViolation[] = []; + + quads.forEach((quad, index) => { + if (!quad.object.startsWith('"')) { + out.push({ ...quad }); + return; + } + + const mutf8Bytes = javaModifiedUtf8Length(quad.object); + if (mutf8Bytes <= maxBytes) { + out.push({ ...quad }); + return; + } + + if (!textPredicates.has(quad.predicate)) { + violations.push(violationFor(index, quad, mutf8Bytes, maxBytes)); + return; + } + + let parsed: ParsedRdfLiteralTerm | null; + try { + parsed = parseRdfLiteralTerm(quad.object); + } catch { + parsed = null; + } + if (!parsed) { + violations.push(violationFor(index, quad, mutf8Bytes, maxBytes)); + return; + } + + let chunks: string[]; + try { + chunks = splitLiteralLexicalIntoSafeChunks(parsed.lexical, parsed.suffix, chunkMaxBytes); + } catch { + violations.push(violationFor(index, quad, mutf8Bytes, maxBytes)); + return; + } + const chunkTerms = chunks.map((chunk) => rdfLiteralTerm(chunk, parsed.suffix)); + const unsafeChunk = chunkTerms.find((term) => javaModifiedUtf8Length(term) > maxBytes); + if (unsafeChunk) { + violations.push({ + ...violationFor(index, quad, javaModifiedUtf8Length(unsafeChunk), maxBytes), + predicate: `${quad.predicate} chunk`, + }); + return; + } + + const sha256 = createHash('sha256').update(parsed.lexical, 'utf8').digest('hex'); + const bodySubject = `${quad.subject}/.well-known/genid/text-${sha256.slice(0, 16)}`; + const graph = quad.graph ?? ''; + out.push( + { subject: quad.subject, predicate: DKG_HAS_TEXT_BODY, object: bodySubject, graph }, + { subject: bodySubject, predicate: RDF_TYPE, object: DKG_TEXT_BODY, graph }, + { subject: bodySubject, predicate: DKG_TEXT_SOURCE_PREDICATE, object: quad.predicate, graph }, + { subject: bodySubject, predicate: DKG_TEXT_CONTENT_SHA256, object: rdfLiteralTerm(sha256), graph }, + { subject: bodySubject, predicate: DKG_TEXT_LITERAL_MUTF8_BYTES, object: xsdInteger(mutf8Bytes), graph }, + { + subject: bodySubject, + predicate: DKG_TEXT_UTF8_BYTES, + object: xsdInteger(new TextEncoder().encode(parsed.lexical).length), + graph, + }, + { subject: bodySubject, predicate: DKG_TEXT_CHUNK_COUNT, object: xsdInteger(chunks.length), graph }, + { subject: bodySubject, predicate: DKG_TEXT_CHUNK_LIMIT, object: xsdInteger(chunkMaxBytes), graph }, + ); + + chunkTerms.forEach((term, chunkIndex) => { + const chunkSubject = `${bodySubject}/chunk/${chunkIndex}`; + out.push( + { subject: bodySubject, predicate: DKG_HAS_TEXT_CHUNK, object: chunkSubject, graph }, + { subject: chunkSubject, predicate: RDF_TYPE, object: DKG_TEXT_CHUNK, graph }, + { subject: chunkSubject, predicate: DKG_CHUNK_INDEX, object: xsdInteger(chunkIndex), graph }, + { subject: chunkSubject, predicate: quad.predicate, object: term, graph }, + ); + }); + + rewrites.push({ + subject: quad.subject, + predicate: quad.predicate, + originalMutf8Bytes: mutf8Bytes, + chunkCount: chunks.length, + bodySubject, + sha256, + }); + }); + + const remainingViolations = out + .map((quad, index) => ({ quad, index, bytes: quad.object.startsWith('"') ? javaModifiedUtf8Length(quad.object) : 0 })) + .filter(({ bytes }) => bytes > maxBytes) + .map(({ quad, index, bytes }) => violationFor(index, quad, bytes, maxBytes)); + violations.push(...remainingViolations); + + if (violations.length > 0) { + throw new RdfLiteralSizeError(formatOversizedLiteralMessage(violations), violations); + } + + return { quads: out, rewrites }; +} + +function splitLiteralLexicalIntoSafeChunks(lexical: string, suffix: string, maxBytes: number): string[] { + const baseBytes = javaModifiedUtf8Length(`""${suffix}`); + if (baseBytes >= maxBytes) { + throw new Error(`Literal suffix consumes the full MUTF-8 budget (${baseBytes} bytes)`); + } + const chunks: string[] = []; + let current = ''; + let currentBodyBytes = 0; + for (const ch of lexical) { + const chBytes = escapedLiteralBodyMutf8Length(ch); + if (current.length > 0 && baseBytes + currentBodyBytes + chBytes > maxBytes) { + chunks.push(current); + current = ch; + currentBodyBytes = chBytes; + continue; + } + if (current.length === 0 && baseBytes + chBytes > maxBytes) { + throw new Error(`A single literal character exceeds the MUTF-8 budget (${chBytes} bytes)`); + } + current += ch; + currentBodyBytes += chBytes; + } + if (current.length > 0 || lexical.length === 0) chunks.push(current); + return chunks; +} + +function escapedLiteralBodyMutf8Length(ch: string): number { + const escaped = JSON.stringify(ch).slice(1, -1); + return javaModifiedUtf8Length(escaped); +} + +function rdfLiteralTerm(lexical: string, suffix = ''): string { + return `${JSON.stringify(lexical)}${suffix}`; +} + +function xsdInteger(value: number): string { + return `"${value}"^^<${XSD_INTEGER}>`; +} + +function violationFor( + index: number, + quad: RdfLiteralQuadLike, + mutf8Bytes: number, + maxBytes: number, +): OversizedRdfLiteralViolation { + return { + index, + subject: quad.subject, + predicate: quad.predicate, + mutf8Bytes, + maxBytes, + }; +} + +function formatOversizedLiteralMessage(violations: readonly OversizedRdfLiteralViolation[]): string { + const details = violations + .slice(0, 5) + .map((v) => + `index=${v.index} subject=${v.subject.slice(0, 120)} predicate=${v.predicate.slice(0, 120)} ` + + `(${v.mutf8Bytes} MUTF-8 bytes, max ${v.maxBytes})`, + ) + .join('; '); + const suffix = violations.length > 5 ? `; +${violations.length - 5} more` : ''; + return `Invalid RDF literal: ${violations.length} literal object(s) exceed the Blazegraph-safe MUTF-8 limit. ` + + `Use schema.org/text chunking or external content references before publishing. ${details}${suffix}`; +} + +function validateLiteralSuffix(suffix: string): void { + if (suffix === '') return; + if (/^@[A-Za-z]+(?:-[A-Za-z0-9]+)*$/.test(suffix)) return; + if (/^\^\^<[^<>"{}|\\^`\x00-\x20>]+>$/.test(suffix)) return; + throw new Error(`Invalid RDF literal suffix: ${suffix.slice(0, 80)}`); +} + +function decodeRdfLiteralBody(body: string): string { + let out = ''; + for (let i = 0; i < body.length; i++) { + const ch = body[i]!; + if (ch !== '\\') { + out += ch; + continue; + } + i += 1; + if (i >= body.length) throw new Error('Invalid trailing RDF literal escape'); + const escaped = body[i]!; + switch (escaped) { + case 't': + out += '\t'; + break; + case 'b': + out += '\b'; + break; + case 'n': + out += '\n'; + break; + case 'r': + out += '\r'; + break; + case 'f': + out += '\f'; + break; + case '"': + case "'": + case '\\': + out += escaped; + break; + case 'u': { + const hex = body.slice(i + 1, i + 5); + if (!/^[0-9a-fA-F]{4}$/.test(hex)) throw new Error('Invalid RDF \\u escape'); + out += String.fromCodePoint(parseInt(hex, 16)); + i += 4; + break; + } + case 'U': { + const hex = body.slice(i + 1, i + 9); + if (!/^[0-9a-fA-F]{8}$/.test(hex)) throw new Error('Invalid RDF \\U escape'); + out += String.fromCodePoint(parseInt(hex, 16)); + i += 8; + break; + } + default: + throw new Error(`Invalid RDF literal escape: \\${escaped}`); + } + } + return out; +} diff --git a/packages/core/test/rdf-literal-limits.test.ts b/packages/core/test/rdf-literal-limits.test.ts new file mode 100644 index 000000000..262293970 --- /dev/null +++ b/packages/core/test/rdf-literal-limits.test.ts @@ -0,0 +1,82 @@ +import { createHash } from 'node:crypto'; +import { describe, expect, it } from 'vitest'; +import { + BLAZEGRAPH_SAFE_LITERAL_MUTF8_BYTES, + DKG_CHUNK_INDEX, + DKG_HAS_TEXT_BODY, + DKG_HAS_TEXT_CHUNK, + DKG_TEXT_CHUNK_COUNT, + DKG_TEXT_CONTENT_SHA256, + javaModifiedUtf8Length, + normalizeLargeRdfLiteralsForBlazegraph, + parseRdfLiteralTerm, + RdfLiteralSizeError, +} from '../src/rdf-literal-limits.js'; + +describe('RDF literal Blazegraph compatibility', () => { + it('measures Java modified UTF-8 length', () => { + expect(javaModifiedUtf8Length('abc')).toBe(3); + expect(javaModifiedUtf8Length('\0')).toBe(2); + expect(javaModifiedUtf8Length('รฉ')).toBe(2); + expect(javaModifiedUtf8Length('๐Ÿ˜€')).toBe(6); + }); + + it('chunks oversized schema:text literals into ordered safe child resources', () => { + const body = `${'x'.repeat(35)}\n"${'๐Ÿ˜€'.repeat(10)}\\${'y'.repeat(35)}`; + const result = normalizeLargeRdfLiteralsForBlazegraph([ + { + subject: 'urn:dkg:test:computer-history', + predicate: 'http://schema.org/text', + object: JSON.stringify(body), + graph: 'did:dkg:context-graph:0x599BF63E/computer-history', + }, + ], { + maxLiteralMutf8Bytes: 90, + textChunkMutf8Bytes: 48, + }); + + expect(result.rewrites).toHaveLength(1); + expect(result.rewrites[0].chunkCount).toBeGreaterThan(1); + expect(result.quads.some((q) => q.predicate === 'http://schema.org/text' && q.subject === 'urn:dkg:test:computer-history')).toBe(false); + + const bodySubject = result.quads.find((q) => q.predicate === DKG_HAS_TEXT_BODY)?.object; + expect(bodySubject).toMatch(/^urn:dkg:test:computer-history\/\.well-known\/genid\/text-/); + + const count = result.quads.find((q) => q.subject === bodySubject && q.predicate === DKG_TEXT_CHUNK_COUNT)?.object; + expect(count).toBe(`"${result.rewrites[0].chunkCount}"^^`); + + const sha = result.quads.find((q) => q.subject === bodySubject && q.predicate === DKG_TEXT_CONTENT_SHA256)?.object; + expect(sha).toBe(JSON.stringify(createHash('sha256').update(body, 'utf8').digest('hex'))); + + const chunkSubjects = result.quads + .filter((q) => q.subject === bodySubject && q.predicate === DKG_HAS_TEXT_CHUNK) + .map((q) => q.object); + const reconstructed = chunkSubjects + .map((chunkSubject) => { + const indexLiteral = result.quads.find((q) => q.subject === chunkSubject && q.predicate === DKG_CHUNK_INDEX)?.object; + const index = Number(indexLiteral?.match(/\d+/)?.[0] ?? 'NaN'); + const text = result.quads.find((q) => q.subject === chunkSubject && q.predicate === 'http://schema.org/text')?.object; + expect(text).toBeDefined(); + expect(javaModifiedUtf8Length(text!)).toBeLessThanOrEqual(48); + return { index, text: parseRdfLiteralTerm(text!)!.lexical }; + }) + .sort((a, b) => a.index - b.index) + .map((chunk) => chunk.text) + .join(''); + + expect(reconstructed).toBe(body); + }); + + it('rejects oversized non-text literals before publishing', () => { + expect(() => + normalizeLargeRdfLiteralsForBlazegraph([ + { + subject: 'urn:dkg:test:bad', + predicate: 'http://schema.org/name', + object: JSON.stringify('x'.repeat(BLAZEGRAPH_SAFE_LITERAL_MUTF8_BYTES + 1)), + graph: 'did:dkg:context-graph:test', + }, + ]), + ).toThrow(RdfLiteralSizeError); + }); +}); diff --git a/packages/publisher/src/ack-collector.ts b/packages/publisher/src/ack-collector.ts index 181fa859b..731e8d4cd 100644 --- a/packages/publisher/src/ack-collector.ts +++ b/packages/publisher/src/ack-collector.ts @@ -547,6 +547,17 @@ export class ACKCollector { // and follow the legacy retry path below โ€” declines are strictly // additive on the wire. const declines = new Map(); + // ACKs that arrived over the protocol but failed publisher-side validation + // are not "no response". Track them separately so QuorumUnmetError can + // distinguish transport silence from an ACK the publisher rejected locally + // (signature, merkle root, or chain/RPC identity verification). + const ackFailures = new Map(); + const recordACKFailure = (peerId: string, reason: string): void => { + ackFailures.set(peerId, { reason }); + // A later terminal ACK-validation failure is more actionable than an + // earlier transient decline from the same peer. + declines.delete(peerId); + }; const formatDeclineDetail = (): string => { if (declines.size === 0) return ''; @@ -593,6 +604,15 @@ export class ACKCollector { reason: ack.subscriptionSource ? `ACK:${ack.subscriptionSource}` : 'ACK', }; } + const ackFailure = ackFailures.get(peerId); + if (ackFailure) { + return { + peerId, + dialOk: true, + protocolSupported: true, + reason: ackFailure.reason, + }; + } const decline = declines.get(peerId); if (decline) { const code = decline.code; @@ -653,6 +673,7 @@ export class ACKCollector { // prior entry is intentional โ€” operators care most about // why the peer ultimately could not ACK. declines.set(peerId, { code, message: declineMessage }); + ackFailures.delete(peerId); // Transient declines (SWM replication catching up via // gossip) can resolve on a retry, so re-send with backoff @@ -703,11 +724,13 @@ export class ACKCollector { const recoveredAddress = this.recoverACKSigner(ack, ackDigest); if (!recoveredAddress) { + recordACKFailure(peerId, 'INVALID_SIGNATURE'); log(`[ACKCollector] Invalid ACK signature from ${peerId.slice(-8)}`); return null; } if (!this.merkleRootsMatch(ack.merkleRoot, merkleRoot)) { + recordACKFailure(peerId, 'MERKLE_ROOT_MISMATCH'); log(`[ACKCollector] Merkle root mismatch from ${peerId.slice(-8)}`); return null; } @@ -726,7 +749,8 @@ export class ACKCollector { if (this.deps.verifyIdentityDetailed) { const verdict = await this.deps.verifyIdentityDetailed(recoveredAddress, identityId); if (!verdict.valid) { - const reason = verdict.reason ?? 'unknown'; + const reason = sanitizeDeclineField(verdict.reason ?? 'unknown', MAX_DECLINE_CODE_CHARS) || 'unknown'; + recordACKFailure(peerId, `ACK_VERIFY:${reason}`); log( `[ACKCollector] ACK from ${peerId.slice(-8)} rejected: ${reason}` + ` (signer=${recoveredAddress.slice(0, 10)}..., identity=${identityId})`, @@ -736,6 +760,7 @@ export class ACKCollector { } else if (this.deps.verifyIdentity) { const valid = await this.deps.verifyIdentity(recoveredAddress, identityId); if (!valid) { + recordACKFailure(peerId, 'ACK_VERIFY:key-not-registered'); log(`[ACKCollector] Signer ${recoveredAddress.slice(0, 10)}... not registered for identity ${identityId} โ€” rejecting ACK from ${peerId.slice(-8)}`); return null; } @@ -746,6 +771,7 @@ export class ACKCollector { // stale decline would still appear in `storage_ack_insufficient` // if quorum fails for unrelated reasons. declines.delete(peerId); + ackFailures.delete(peerId); // PR5: capture the peer-reported ACK-provenance source if // present. Pre-PR5 cores never set the field; treat any diff --git a/packages/publisher/src/dkg-publisher.ts b/packages/publisher/src/dkg-publisher.ts index fd800a981..4c4c59aca 100644 --- a/packages/publisher/src/dkg-publisher.ts +++ b/packages/publisher/src/dkg-publisher.ts @@ -3,6 +3,7 @@ import type { ChainAdapter, OnChainPublishResult, AddBatchToContextGraphParams } import { enrichEvmError } from '@origintrail-official/dkg-chain'; import type { EventBus, OperationContext } from '@origintrail-official/dkg-core'; import { DKGEvent, Logger, createOperationContext, sha256, encodeWorkspacePublishRequest, encodeEncryptedWorkspacePayload, encryptWorkspacePayload, contextGraphDataUri, contextGraphDataGraphUri, contextGraphMetaUri, contextGraphAssertionUri, contextGraphLayerUri, MemoryLayer, assertionLifecycleUri, contextGraphSubGraphUri, contextGraphSubGraphMetaUri, SYSTEM_CONTEXT_GRAPHS, validateSubGraphName, isSafeIri, assertSafeIri, assertSafeRdfTerm, DKG_GOSSIP_MAX_MESSAGE_BYTES, SwmGossipPayloadTooLargeError, type Ed25519Keypair, buildAuthorAttestationTypedData, buildUpdateAuthorAttestationTypedData, AUTHOR_SCHEME_VERSION_V1, TrustLevel, TRUST_LEVEL_PREDICATE, assertNoUserAuthoredTrustLevelQuads, buildTrustLevelQuads, isTrustLevelQuad, DKG_ENTITY, DKG_ROOT_ENTITY_LEGACY, ENTITY_PRED_ALT, parseAssertionSealQuads, ASSERTION_SEAL_PREDICATES, sharedMemoryReadBothFilter } from '@origintrail-official/dkg-core'; +import { normalizeLargeRdfLiteralsForBlazegraph } from '@origintrail-official/dkg-core'; import { GraphManager, PrivateContentStore } from '@origintrail-official/dkg-storage'; import { DEFAULT_PUBLISH_EPOCHS, MAX_PUBLISH_EPOCHS, type Publisher, type PublishOptions, type PublishResult, type KAManifestEntry, type PhaseCallback, type V10CoreNodeACK } from './publisher.js'; import { skolemizeByEntity } from './auto-partition.js'; @@ -376,6 +377,10 @@ function rejectUserAuthoredProtocolMetadata(quads: Quad[]): void { assertNoUserAuthoredTrustLevelQuads(quads); } +function normalizeProducerRdfLiterals(quads: Quad[]): Quad[] { + return normalizeLargeRdfLiteralsForBlazegraph(quads).quads as Quad[]; +} + async function stampTrustLevel( store: TripleStore, graph: string, @@ -866,17 +871,15 @@ export class DKGPublisher implements Publisher { quads: Quad[], options: ShareOptions, ): Promise { - // Round 9 Bug 25: reject user-authored quads with reserved URN - // prefixes at the TOP of the Bucket A entry point, before any - // other processing (lock acquisition, partitioning, etc.) per - // spec `19_MARKDOWN_CONTENT_TYPE.md ยง10.2`. Short-circuit so a - // reserved-namespace violation cannot be masked by a lock timeout - // or subject-level validation error downstream. - rejectUserAuthoredProtocolMetadata(quads); - const subjects = [...new Set(quads.map(q => q.subject))]; + // Normalize oversized text literals and reject reserved/protocol + // metadata before lock acquisition or subject-level validation, so + // malformed producer data cannot poison SWM or VM replication. + const normalizedQuads = normalizeProducerRdfLiterals(quads); + rejectUserAuthoredProtocolMetadata(normalizedQuads); + const subjects = [...new Set(normalizedQuads.map(q => q.subject))]; const lockPrefix = options.subGraphName ? `${contextGraphId}\0${options.subGraphName}` : contextGraphId; const lockKeys = subjects.map(s => `${lockPrefix}\0${s}`); - return this.withWriteLocks(lockKeys, () => this._shareImpl(contextGraphId, quads, options)); + return this.withWriteLocks(lockKeys, () => this._shareImpl(contextGraphId, normalizedQuads, options)); } /** @deprecated Use share() */ @@ -1156,12 +1159,11 @@ export class DKGPublisher implements Publisher { quads: Quad[], options: ConditionalShareOptions, ): Promise { - // Round 9 Bug 25: reject user-authored quads with reserved URN - // prefixes at the TOP of the Bucket A entry point, before the - // CAS condition check (which could otherwise mask the namespace - // violation with a StaleWriteError). Short-circuit per - // `19_MARKDOWN_CONTENT_TYPE.md ยง10.2`. - rejectUserAuthoredProtocolMetadata(quads); + // Normalize oversized text literals and reject reserved/protocol + // metadata before the CAS condition check, so producer data issues + // cannot be masked by a StaleWriteError. + const normalizedQuads = normalizeProducerRdfLiterals(quads); + rejectUserAuthoredProtocolMetadata(normalizedQuads); for (const cond of options.conditions) { assertSafeIri(cond.subject); assertSafeIri(cond.predicate); @@ -1171,11 +1173,11 @@ export class DKGPublisher implements Publisher { } const conditionSubjects = options.conditions.map(c => c.subject); - const quadSubjects = [...new Set(quads.map(q => q.subject))]; + const quadSubjects = [...new Set(normalizedQuads.map(q => q.subject))]; const lockPrefix = options.subGraphName ? `${contextGraphId}\0${options.subGraphName}` : contextGraphId; const lockKeys = [...new Set([...conditionSubjects, ...quadSubjects])].map(s => `${lockPrefix}\0${s}`); - return this.withWriteLocks(lockKeys, () => this._executeConditionalWrite(contextGraphId, quads, options)); + return this.withWriteLocks(lockKeys, () => this._executeConditionalWrite(contextGraphId, normalizedQuads, options)); } /** @deprecated Use conditionalShare() */ @@ -1757,6 +1759,14 @@ export class DKGPublisher implements Publisher { }; } + options = { + ...options, + quads: normalizeProducerRdfLiterals(options.quads), + ...(options.privateQuads + ? { privateQuads: normalizeProducerRdfLiterals(options.privateQuads) } + : {}), + }; + const { contextGraphId, quads, @@ -5089,9 +5099,9 @@ export class DKGPublisher implements Publisher { ): Promise { await this.ensureSubGraphRegistered(contextGraphId, subGraphName); const graphUri = await this.wmGraphUri(contextGraphId, agentAddress, name, subGraphName); - const quads = input.map((t) => ({ + const quads = normalizeProducerRdfLiterals(input.map((t) => ({ subject: t.subject, predicate: t.predicate, object: t.object, graph: graphUri, - })); + }))); // Round 9 Bug 25: reject user-authored quads whose subject is in a // protocol-reserved URN namespace. See RESERVED_SUBJECT_PREFIXES above. rejectUserAuthoredProtocolMetadata(quads); diff --git a/packages/publisher/src/storage-ack-handler.ts b/packages/publisher/src/storage-ack-handler.ts index d0e6d2681..05a40229c 100644 --- a/packages/publisher/src/storage-ack-handler.ts +++ b/packages/publisher/src/storage-ack-handler.ts @@ -24,6 +24,8 @@ type PeerId = { toString(): string }; const MAX_DECLINE_ENTITY_COUNT = 5; const MAX_DECLINE_ENTITY_CHARS = 120; +const MAX_DECLINE_LOG_CG_ID_CHARS = 160; +const MAX_DECLINE_LOG_MESSAGE_CHARS = 240; function compactDeclineText(value: string, maxChars: number): string { const compacted = value.replace(/[\u0000-\u001f\u007f]+/g, ' ').replace(/\s+/g, ' ').trim(); @@ -92,6 +94,16 @@ export interface StorageACKHandlerConfig { * registered on-chain at signing time. */ onSignerRegistrationLookupFailed?: (err: unknown) => void | Promise; + /** + * Called whenever the handler returns a typed StorageACK decline. The hook + * receives bounded, log-safe text; the wire decline message is encoded + * unchanged below so publisher-visible behavior stays stable. + */ + onDecline?: (details: { + code: StorageACKDeclineCode; + contextGraphId: string; + message: string; + }) => void | Promise; /** * Codex PR #608: independent curation oracle. The handler MUST verify a * publisher's `isEncryptedPayload=true` claim against the CG's real @@ -262,6 +274,18 @@ export class StorageACKHandler { code: StorageACKDeclineCode, message: string, ): Uint8Array { + if (this.config.onDecline) { + const details = { + code, + contextGraphId: compactDeclineText(cgId, MAX_DECLINE_LOG_CG_ID_CHARS), + message: compactDeclineText(message, MAX_DECLINE_LOG_MESSAGE_CHARS), + }; + try { + void Promise.resolve(this.config.onDecline(details)).catch(() => undefined); + } catch { + // Logging must never change ACK wire behavior. + } + } return encodeStorageACK({ merkleRoot: new Uint8Array(0), coreNodeSignatureR: new Uint8Array(0), diff --git a/packages/publisher/test/dkg-publisher-compat.test.ts b/packages/publisher/test/dkg-publisher-compat.test.ts index 829527fd6..34cea102a 100644 --- a/packages/publisher/test/dkg-publisher-compat.test.ts +++ b/packages/publisher/test/dkg-publisher-compat.test.ts @@ -1,7 +1,9 @@ import { describe, expect, it } from 'vitest'; import { NoChainAdapter } from '@origintrail-official/dkg-chain'; import { + DKG_HAS_TEXT_CHUNK, TypedEventBus, + javaModifiedUtf8Length, generateEd25519Keypair, } from '@origintrail-official/dkg-core'; import { OxigraphStore, type Quad } from '@origintrail-official/dkg-storage'; @@ -35,4 +37,45 @@ describe('DKGPublisher compatibility aliases', () => { expect(publisher.autoPartition(quads)).toEqual(publisher.skolemizeByEntity(quads)); }); + + it('rewrites oversized schema:text literals before storing a direct publish', async () => { + const store = new OxigraphStore(); + const publisher = new DKGPublisher({ + store, + chain: new NoChainAdapter(), + eventBus: new TypedEventBus(), + keypair: await generateEd25519Keypair(), + }); + const root = 'urn:compat:computer-history'; + const body = 'computer history '.repeat(5_000); + + await publisher.publish({ + contextGraphId: 'test', + quads: [q(root, 'http://schema.org/text', JSON.stringify(body))], + }); + + const result = await store.query( + 'CONSTRUCT { ?s ?p ?o } WHERE { GRAPH { ?s ?p ?o } }', + ); + expect(result.type).toBe('quads'); + if (result.type !== 'quads') return; + + expect(result.quads.some((quad) => + quad.subject === root && + quad.predicate === 'http://schema.org/text' && + quad.object === JSON.stringify(body), + )).toBe(false); + + const chunkSubjects = result.quads + .filter((quad) => quad.predicate === DKG_HAS_TEXT_CHUNK) + .map((quad) => quad.object); + expect(chunkSubjects.length).toBeGreaterThan(1); + + const chunkTextQuads = result.quads.filter((quad) => + chunkSubjects.includes(quad.subject) && + quad.predicate === 'http://schema.org/text' + ); + expect(chunkTextQuads).toHaveLength(chunkSubjects.length); + expect(chunkTextQuads.every((quad) => javaModifiedUtf8Length(quad.object) <= 60_000)).toBe(true); + }); }); diff --git a/packages/publisher/test/storage-ack-handler.test.ts b/packages/publisher/test/storage-ack-handler.test.ts index e6e842cdc..0269cfc2b 100644 --- a/packages/publisher/test/storage-ack-handler.test.ts +++ b/packages/publisher/test/storage-ack-handler.test.ts @@ -331,6 +331,56 @@ describe('StorageACKHandler', () => { expect(decoded.declineMessage).toContain('urn:entity:1'); }); + it('calls the decline hook with typed, bounded details when returning a decline', async () => { + const onDecline = vi.fn(); + const handler = await createHandler([], { onDecline }); + const intent = encodePublishIntent({ + merkleRoot, + contextGraphId, + publisherPeerId: 'publisher-0', + publicByteSize: 300, + isPrivate: false, + kaCount: 1, + rootEntities: ['urn:entity:1'], + }); + + const response = await handler.handler(intent, fakePeerId); + const decoded = decodeStorageACK(response); + + expect(isStorageACKDecline(decoded)).toBe(true); + expect(decoded.declineCode).toBe(STORAGE_ACK_DECLINE_CODES.NO_DATA_IN_SWM); + expect(onDecline).toHaveBeenCalledOnce(); + expect(onDecline).toHaveBeenCalledWith({ + code: STORAGE_ACK_DECLINE_CODES.NO_DATA_IN_SWM, + contextGraphId, + message: expect.stringContaining('No data found in SWM'), + }); + const details = onDecline.mock.calls[0]?.[0]; + expect(details.message.length).toBeLessThanOrEqual(240); + }); + + it('ignores decline hook failures and preserves the encoded decline', async () => { + const handler = await createHandler([], { + onDecline: async () => { throw new Error('logger unavailable'); }, + }); + const intent = encodePublishIntent({ + merkleRoot, + contextGraphId, + publisherPeerId: 'publisher-0', + publicByteSize: 300, + isPrivate: false, + kaCount: 1, + rootEntities: ['urn:entity:1'], + }); + + const response = await handler.handler(intent, fakePeerId); + const decoded = decodeStorageACK(response); + + expect(isStorageACKDecline(decoded)).toBe(true); + expect(decoded.declineCode).toBe(STORAGE_ACK_DECLINE_CODES.NO_DATA_IN_SWM); + expect(decoded.declineMessage).toContain('No data found in SWM'); + }); + it('declines (MERKLE_MISMATCH_IN_SWM) when SWM data does not match the publisher merkle root', async () => { const differentQuads = [makeQuad('urn:other', 'urn:p', 'urn:val')]; const handler = await createHandler(differentQuads); diff --git a/packages/publisher/test/storage-update-ack.test.ts b/packages/publisher/test/storage-update-ack.test.ts index c10d5ce57..3c8341870 100644 --- a/packages/publisher/test/storage-update-ack.test.ts +++ b/packages/publisher/test/storage-update-ack.test.ts @@ -1,4 +1,4 @@ -import { describe, it, expect } from 'vitest'; +import { describe, it, expect, vi } from 'vitest'; import { StorageACKHandler, type StorageACKHandlerConfig } from '../src/storage-ack-handler.js'; import { ACKCollector, type ACKCollectorDeps } from '../src/ack-collector.js'; import { @@ -51,7 +51,11 @@ describe('V10 UPDATE StorageACK โ€” peer handler + collector quorum', () => { const fakePeerId = { toString: () => 'publisher-peer' }; - function makeConfig(wallet: ethers.Wallet, identityId: bigint): StorageACKHandlerConfig { + function makeConfig( + wallet: ethers.Wallet, + identityId: bigint, + overrides: Partial = {}, + ): StorageACKHandlerConfig { return { nodeRole: 'core', nodeIdentityId: identityId, @@ -61,6 +65,7 @@ describe('V10 UPDATE StorageACK โ€” peer handler + collector quorum', () => { chainId: TEST_CHAIN_ID, kav10Address: TEST_KAV10_ADDR, isCgCurated: async () => true, + ...overrides, }; } @@ -263,6 +268,39 @@ describe('V10 UPDATE StorageACK โ€” peer handler + collector quorum', () => { expect(ack.declineCode).toBe(STORAGE_ACK_DECLINE_CODES.MERKLE_MISMATCH_IN_SWM); }); + it('calls the decline hook for UPDATE typed declines', async () => { + const wallet = ethers.Wallet.createRandom(); + const onDecline = vi.fn(); + const handler = new StorageACKHandler( + new OxigraphStore() as any, + makeConfig(wallet, 42n, { onDecline }), + new TypedEventBus() as any, + ); + const intent = encodeUpdateIntent({ + kaId: kaId.toString(), + contextGraphId, + preUpdateMerkleRootCount: Number(preUpdateMerkleRootCount), + newMerkleRoot, + newByteSize: Number(newByteSize), + newTokenAmount: newTokenAmount.toString(), + mintAmount: Number(mintAmount), + burnTokenIds: burnTokenIds.map((b) => b.toString()), + newMerkleLeafCount, + publisherPeerId: 'publisher-0', + }); + + const ack = decodeStorageACK(await handler.updateHandler(intent, fakePeerId)); + + expect(isStorageACKDecline(ack)).toBe(true); + expect(ack.declineCode).toBe(STORAGE_ACK_DECLINE_CODES.NO_DATA_IN_SWM); + expect(onDecline).toHaveBeenCalledOnce(); + expect(onDecline).toHaveBeenCalledWith({ + code: STORAGE_ACK_DECLINE_CODES.NO_DATA_IN_SWM, + contextGraphId, + message: expect.stringContaining('UpdateStorageACK: no data found in SWM graph'), + }); + }); + // #1283 โ€” PUBLIC-update byteSize floor. The pre-fix updateHandler verified // only the new Merkle root and signed the publisher-supplied `newByteSize` // with NO floor, so a publisher could ship a correct new root while diff --git a/packages/publisher/test/v10-ack-edge-cases.test.ts b/packages/publisher/test/v10-ack-edge-cases.test.ts index 046969459..2da2124e4 100644 --- a/packages/publisher/test/v10-ack-edge-cases.test.ts +++ b/packages/publisher/test/v10-ack-edge-cases.test.ts @@ -369,13 +369,78 @@ describe('ACKCollector identity verification', () => { }; const collector = new ACKCollector(deps); - await expect(collector.collect(buildCollectParams())) - .rejects.toThrow('storage_ack_insufficient'); + let captured: any; + try { + await collector.collect(buildCollectParams()); + } catch (err) { + captured = err; + } + expect(captured).toBeInstanceOf(Error); + expect(captured.message).toContain('storage_ack_insufficient'); + expect(captured.message).toContain('ACK_VERIFY:rpc-error'); + expect(captured.peerOutcomes.map((p: { reason?: string }) => p.reason)) + .toEqual(['ACK_VERIFY:rpc-error', 'ACK_VERIFY:rpc-error', 'ACK_VERIFY:rpc-error']); expect(log.calls.some( (c: unknown[]) => /rpc-error/.test(c[0] as string), )).toBe(true); }); + it('invalid ACK signatures surface as terminal peer outcomes, not no_response', async () => { + const deps: ACKCollectorDeps = { + gossipPublish: noop(), + sendP2P: async (peerId) => { + const idx = parseInt(peerId.replace('peer-', ''), 10); + return encodeStorageACK({ + merkleRoot, + coreNodeSignatureR: new Uint8Array(32), + coreNodeSignatureVS: new Uint8Array(32), + contextGraphId: testCGIdStr, + nodeIdentityId: idx + 1, + }); + }, + getConnectedCorePeers: () => ['peer-0', 'peer-1', 'peer-2'], + log: noop(), + }; + const collector = new ACKCollector(deps); + + let captured: any; + try { + await collector.collect(buildCollectParams()); + } catch (err) { + captured = err; + } + + expect(captured).toBeInstanceOf(Error); + expect(captured.message).toContain('storage_ack_insufficient'); + expect(captured.message).toContain('INVALID_SIGNATURE'); + expect(captured.peerOutcomes.map((p: { reason?: string }) => p.reason)) + .toEqual(['INVALID_SIGNATURE', 'INVALID_SIGNATURE', 'INVALID_SIGNATURE']); + }); + + it('ACK merkle mismatches surface as terminal peer outcomes, not no_response', async () => { + const wrongRoot = ethers.getBytes(ethers.keccak256(ethers.toUtf8Bytes('wrong-root'))); + const deps: ACKCollectorDeps = { + gossipPublish: noop(), + sendP2P: buildSendP2P({ merkleRootOverride: wrongRoot }), + getConnectedCorePeers: () => ['peer-0', 'peer-1', 'peer-2'], + log: noop(), + }; + const collector = new ACKCollector(deps); + + let captured: any; + try { + await collector.collect(buildCollectParams()); + } catch (err) { + captured = err; + } + + expect(captured).toBeInstanceOf(Error); + expect(captured.message).toContain('storage_ack_insufficient'); + expect(captured.message).toContain('MERKLE_ROOT_MISMATCH'); + expect(captured.peerOutcomes.map((p: { reason?: string }) => p.reason)) + .toEqual(['MERKLE_ROOT_MISMATCH', 'MERKLE_ROOT_MISMATCH', 'MERKLE_ROOT_MISMATCH']); + }); + it('detailed verifier takes precedence over legacy boolean verifier', async () => { const log = noop(); const verifyIdentity = tracked(async () => true); diff --git a/packages/storage/src/adapters/blazegraph.ts b/packages/storage/src/adapters/blazegraph.ts index ce9aafb50..458ba868a 100644 --- a/packages/storage/src/adapters/blazegraph.ts +++ b/packages/storage/src/adapters/blazegraph.ts @@ -10,6 +10,10 @@ import type { } from '../triple-store.js'; import { registerTripleStoreAdapter } from '../triple-store.js'; import { buildBlankNodeSafeDelete } from './sparql-http.js'; +import { + BLAZEGRAPH_MUTF8_LIMIT, + javaModifiedUtf8Length, +} from '@origintrail-official/dkg-core'; /** * BlazegraphStore โ€” TripleStore adapter backed by a remote Blazegraph @@ -336,43 +340,6 @@ function escapeString(s: string): string { // Oversized-literal guard // ===================================================================== -/** - * Java Modified UTF-8 byte length of a string. - * - * Blazegraph uses `DataOutputStream.writeUTF()` for index keys, which - * encodes strings in Java's Modified UTF-8 (MUTF-8). The key - * differences from standard UTF-8: - * - U+0000 (NUL) is encoded as 2 bytes (0xC0, 0x80) instead of 1 - * - Supplementary codepoints (U+10000โ€“U+10FFFF) are encoded as a - * UTF-16 surrogate pair, each surrogate taking 3 MUTF-8 bytes = - * 6 bytes total (vs 4 in standard UTF-8) - * - * `writeUTF()` hard-caps the encoded length at 65 535 bytes. - * Exceeding this triggers `java.io.UTFDataFormatException` and causes - * the entire batch to fail with HTTP 500. - */ -const BLAZEGRAPH_MUTF8_LIMIT = 65_535; - -function javaModifiedUtf8Length(str: string): number { - let len = 0; - for (let i = 0; i < str.length; i++) { - const code = str.charCodeAt(i); - if (code === 0) { - len += 2; - } else if (code <= 0x7f) { - len += 1; - } else if (code <= 0x7ff) { - len += 2; - } else if (code >= 0xd800 && code <= 0xdbff) { - // High surrogate โ€” the low surrogate at i+1 will add another 3 - len += 3; - } else { - len += 3; - } - } - return len; -} - /** * Check quads for literal objects that would exceed Blazegraph's * MUTF-8 index key limit. Throws with details about the offending diff --git a/packages/storage/test/blazegraph.unit.test.ts b/packages/storage/test/blazegraph.unit.test.ts index 4e3969b40..407f2f92f 100644 --- a/packages/storage/test/blazegraph.unit.test.ts +++ b/packages/storage/test/blazegraph.unit.test.ts @@ -3,6 +3,10 @@ * no live Blazegraph required). */ import { describe, it, expect, beforeEach, afterEach } from 'vitest'; +import { + javaModifiedUtf8Length, + normalizeLargeRdfLiteralsForBlazegraph, +} from '@origintrail-official/dkg-core'; import { BlazegraphStore } from '../src/adapters/blazegraph.js'; describe('BlazegraphStore (mocked HTTP)', () => { @@ -312,6 +316,29 @@ describe('BlazegraphStore (mocked HTTP)', () => { expect(body).toContain('http://s1'); }); + it('insert accepts producer-chunked schema:text that started as a 140KB literal', async () => { + setFetch(async () => new Response(null, { status: 200 })); + const s = new BlazegraphStore(baseUrl); + const root = 'urn:bg-unit:computer-history'; + const oversizedText = 'x'.repeat(140_951); + expect(javaModifiedUtf8Length(JSON.stringify(oversizedText))).toBe(140_953); + + const normalized = normalizeLargeRdfLiteralsForBlazegraph([ + { + subject: root, + predicate: 'http://schema.org/text', + object: JSON.stringify(oversizedText), + graph: 'did:dkg:context-graph:0x599BF63E/computer-history', + }, + ]); + + await s.insert(normalized.quads as any); + expect(fetchCalls).toHaveLength(1); + const body = String(fetchCalls[0][1]?.body); + expect(body).toContain('http://dkg.io/ontology/hasTextChunk'); + expect(body).not.toContain(oversizedText); + }); + it('insert keeps non-literal quads regardless of size', async () => { setFetch(async () => new Response(null, { status: 200 })); const s = new BlazegraphStore(baseUrl);