Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 33 additions & 32 deletions ARCHITECTURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ classDiagram
<<test double>>
+writeWorkingMemory()
+liftWorkingMemoryToSharedMemory()
+sharedMemoryWrite()
+publishFromSharedMemory()
+publisherEnqueue()
+createKnowledgeAsset()
+publishAssertion()
+knowledgeAssetPublishAsync()
+publisherJob()
+query()
}
Expand Down Expand Up @@ -81,7 +81,7 @@ classDiagram

class PublisherRuntime {
<<runtime>>
+publishFromSharedMemory()
+knowledgeAssetPublishAsync()
+enqueue()
+processNext()
+finalize()
Expand Down Expand Up @@ -165,18 +165,18 @@ sequenceDiagram

loop warmups and measured iterations
Script->>Script: create unique sync payload
Script->>Client: sharedMemoryWrite(sync quads)
Client->>Daemon: POST shared memory write
Daemon->>Agent: write benchmark triples
Agent->>Store: persist SWM payload
Script->>Client: createKnowledgeAsset(sync quads, alsoShareSwm)
Client->>Daemon: POST /api/knowledge-assets
Daemon->>Agent: create + write + finalize + share
Agent->>Store: persist named KA and SWM share
Store-->>Agent: share operation recorded
Agent-->>Daemon: write accepted
Daemon-->>Client: share operation id
Agent-->>Daemon: named KA shared
Daemon-->>Client: assertion + share metadata

Script->>Client: publishFromSharedMemory(sync root)
Client->>Daemon: POST shared memory publish
Daemon->>Publisher: publish synchronously
Publisher->>Store: read staged triples
Script->>Client: publishAssertion(sync name)
Client->>Daemon: POST /api/knowledge-assets/:name/vm/publish
Daemon->>Publisher: publish finalized named KA
Publisher->>Store: read sealed SWM share
Publisher->>Chain: anchor knowledge asset
Chain-->>Publisher: commitment finalized
Publisher-->>Daemon: kc id
Expand All @@ -192,17 +192,17 @@ sequenceDiagram
Script->>Script: validate returned marker

Script->>Script: create unique async payload
Script->>Client: sharedMemoryWrite(async quads)
Client->>Daemon: POST shared memory write
Daemon->>Agent: write benchmark triples
Agent->>Store: persist SWM payload
Script->>Client: createKnowledgeAsset(async quads, alsoShareSwm)
Client->>Daemon: POST /api/knowledge-assets
Daemon->>Agent: create + write + finalize + share
Agent->>Store: persist named KA and SWM share
Store-->>Agent: share operation recorded
Agent-->>Daemon: write accepted
Daemon-->>Client: share operation id
Agent-->>Daemon: named KA shared
Daemon-->>Client: assertion + share metadata

Script->>Client: publisherEnqueue(share operation id)
Client->>Daemon: POST publisher enqueue
Daemon->>Publisher: enqueue job
Script->>Client: knowledgeAssetPublishAsync(name)
Client->>Daemon: POST /api/knowledge-assets/:name/vm/publish-async
Daemon->>Publisher: enqueue named KA VM publish job
Publisher-->>Daemon: job id
Daemon-->>Client: enqueue result

Expand Down Expand Up @@ -273,13 +273,13 @@ sequenceDiagram
else synchronous publish with finalization
Client->>WM: write payload
Client->>SWM: lift payload
Suite->>Client: publishFromSharedMemory(root)
Suite->>Client: publishAssertion(name)
Client->>VM: promote root with kc id
Client-->>Suite: finalized publish result
else asynchronous publish enqueue and finalization
Client->>WM: write payload
Client->>SWM: lift payload
Suite->>Client: publisherEnqueue(share operation)
Suite->>Client: knowledgeAssetPublishAsync(name)
Client->>Jobs: create queued job
Suite->>Client: publisherJob(job id)
Client->>VM: promote queued roots
Expand Down Expand Up @@ -715,10 +715,11 @@ decision to keep private-store RDF plaintext after message decryption.
daemon-reserved metadata, partitions root entities, inserts the promoted quads
into `_shared_memory`, removes the promoted rows from the assertion graph, and
updates lifecycle metadata in `_meta`.
- **SWM writes** from `/api/shared-memory/write`,
`/api/shared-memory/conditional-write`, and assertion promotion persist
normalized quads directly in `did:dkg:context-graph:<cg>/_shared_memory` or
the sub-graph equivalent. SWM operation metadata and ownership live in
- **SWM substrate writes** are produced by named KA sharing, gossip receive,
catch-up, and host-mode replication. Product callers share through
`POST /api/knowledge-assets/:name/swm/share` or `/swm/share-async`; those
paths persist normalized quads in `did:dkg:context-graph:<cg>/_shared_memory`
or the sub-graph equivalent. SWM operation metadata and ownership live in
`_shared_memory_meta`; the public snapshot store records the same public quads
for replay and catch-up. The store rows are not `enc:gcm:v1` envelopes.
- **Private content** lives in `_private` graphs through `PrivateContentStore`.
Expand Down Expand Up @@ -814,9 +815,9 @@ loop each configured source
Runner-->>Runner: skip processSource
else source content changed or retry is needed
Runner->>Handler: processSource(source, fingerprint, priorState)
Handler->>SWM: POST /api/shared-memory/write
SWM-->>Handler: shareOperationId
Handler->>Publisher: POST /api/publisher/enqueue
Handler->>KA: POST /api/knowledge-assets
KA-->>Handler: name + shareOperationId
Handler->>Publisher: POST /api/knowledge-assets/:name/vm/publish-async
Publisher-->>Handler: jobId
Handler-->>Runner: nextState
end
Expand Down
5 changes: 2 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -250,8 +250,7 @@ dkg assertion query <name> -c <cg> # read assertion quads from
dkg assertion promote <name> -c <cg> # WM → SWM

# Shared memory (team-visible) and publishing
dkg shared-memory write <cg> ... # write triples directly to SWM
dkg shared-memory publish <cg> # SWM → Verifiable Memory (costs TRAC)
dkg assertion promote <name> -c <cg> # share a named KA from WM to SWM
dkg publish <cg> -f <file> # one-shot RDF publish to a context graph
dkg verify <batchId> --context-graph <cg> --verified-graph <id> # propose M-of-N verification
dkg endorse <ual> --context-graph <cg> --agent <addr> # endorse a published KA
Expand All @@ -264,7 +263,7 @@ dkg subscribe <cg> # subscribe to a CG's gossip topics

# Async publisher (optional, for batching)
dkg publisher enable # enable the async publisher
dkg publisher enqueue <cg> ... # enqueue a publish job
dkg publisher publish-async <cg> <name> # enqueue a named KA VM publish job
dkg publisher jobs # list publisher jobs
dkg publisher stats # publisher throughput stats

Expand Down
37 changes: 16 additions & 21 deletions bench/analyze-publish-async-get.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ const DEFAULT_OUTPUT_DIR = 'bench/results/profiles';
const PUBLISH_ASYNC_GET_PAGES: Array<[string, string]> = [
['get/read retrieval', 'bench/results/publish-async-get/get-read-retrieval.html'],
['synchronous publish with finalization', 'bench/results/publish-async-get/sync-publish-finalization.html'],
['asynchronous publish enqueue and finalization', 'bench/results/publish-async-get/async-publish-finalization.html'],
['asynchronous VM publish request and finalization', 'bench/results/publish-async-get/async-publish-finalization.html'],
['upload payload to local working memory', 'bench/results/publish-async-get/working-memory-upload.html'],
['lift local working memory to shared working memory', 'bench/results/publish-async-get/working-to-shared-memory.html'],
];
Expand Down Expand Up @@ -197,25 +197,15 @@ async function analyzeSyncPublishFlow(config: BenchmarkConfig, payloadSize: Payl

async function analyzeAsyncPublishFlow(config: BenchmarkConfig, payloadSize: PayloadSizeLabel): Promise<FlowAnalysis> {
const client = new LayeredDkgBenchmarkClient();
return analyzeFlow('asynchronous publish enqueue and finalization', payloadSize, async (trace) => {
return analyzeFlow('asynchronous VM publish request and finalization', payloadSize, async (trace) => {
const payload = traceSync(trace, 'setup', 'createPayload', [], 'Generate the payload for async publish.', () => (
createPayload(config, `analysis-async-${payloadSize}`, 1, 'async', false)
));
const prepared = await traceSharedMemoryWrite(trace, client, config, payload);
const shareOperationId = prepared.shareOperationId ?? '';
const queued = await traceAsync(trace, 'measured', 'publisherEnqueue', ['publisherJobs.set'], 'Enqueue the publish request through the publisher runtime path.', () => (
client.publisherEnqueue({
contextGraphId: config.contextGraphId,
shareOperationId,
roots: [payload.rootEntity],
namespace: config.namespace,
scope: config.scope,
authorityProofRef: config.authorityProofRef,
swmId: 'swm-main',
transitionType: 'CREATE',
authorityType: 'owner',
})
), { rootEntity: payload.rootEntity, shareOperationId });
const name = `analysis-async-${payloadSize}`;
await traceCreateSharedKnowledgeAsset(trace, client, config, name, payload);
const queued = await traceAsync(trace, 'measured', 'knowledgeAssetPublishAsync', ['publisherJobs.set'], 'Queue VM publish for the named knowledge asset.', () => (
client.knowledgeAssetPublishAsync(config.contextGraphId, name)
), { rootEntity: payload.rootEntity, name });
await traceAsync(trace, 'measured', 'publisherJob', ['promoteSharedRoot'], 'Poll the publisher job and finalize queued content.', () => (
client.publisherJob(queued.jobId ?? '')
), { jobId: queued.jobId });
Expand Down Expand Up @@ -268,15 +258,20 @@ async function analyzeFlow(
return { flow, payloadSize, totalMs, measuredMs, traces: normalizedTraces };
}

async function traceSharedMemoryWrite(
async function traceCreateSharedKnowledgeAsset(
traces: MethodTrace[],
client: LayeredDkgBenchmarkClient,
config: BenchmarkConfig,
name: string,
payload: BenchmarkPayload,
) {
return traceAsync(traces, 'setup', 'sharedMemoryWrite', ['writeWorkingMemory', 'liftWorkingMemoryToSharedMemory'], 'Stage generated quads in local memory and lift them into shared working memory.', () => (
client.sharedMemoryWrite(config.contextGraphId, payload.quads)
), payloadContext(payload));
return traceAsync(traces, 'setup', 'createKnowledgeAsset', ['writeWorkingMemory', 'liftWorkingMemoryToSharedMemory'], 'Create, seal, and share the named knowledge asset into shared working memory.', () => (
client.createKnowledgeAsset(config.contextGraphId, name, {
quads: payload.quads,
finalize: true,
alsoShareSwm: true,
})
), { ...payloadContext(payload), name });
}

async function traceAsync<T>(
Expand Down
40 changes: 16 additions & 24 deletions bench/publish-async-get.bench.ts
Original file line number Diff line number Diff line change
Expand Up @@ -94,11 +94,9 @@ export default defineSuite({
},
{
beforeIteration: async () => {
// The measured publishAssertion stages the quads internally no separate
// The measured publishAssertion stages the quads internally; no separate
// sharedMemoryWrite here, or the sync flow would double-write SWM and stop
// matching the canonical create/write/share/publish path (cf. analyze-* and
// the get/read flow). The async flow below keeps its write — it needs the
// returned shareOperationId for the publisher enqueue/lift path.
// matching the canonical create/write/share/publish path.
syncName = `esbench-sync-${sequence++}`;
syncPayload = createPayload(config, syncName, 1, 'sync', false);
},
Expand All @@ -110,26 +108,16 @@ export default defineSuite({
);

let asyncPayload: BenchmarkPayload | undefined;
let asyncShareOperationId: string | undefined;
let asyncName: string | undefined;
const asyncClient = new LayeredDkgBenchmarkClient();
benchAsyncWithHooks(
scene,
'asynchronous publish enqueue and finalization',
'asynchronous VM publish request and finalization',
async () => {
const payload = requirePayload(asyncPayload, 'asynchronous publish enqueue and finalization');
if (!asyncShareOperationId) throw new Error('async setup did not produce a share operation id');

const queued = await asyncClient.publisherEnqueue({
contextGraphId: config.contextGraphId,
shareOperationId: asyncShareOperationId,
roots: [payload.rootEntity],
namespace: config.namespace,
scope: config.scope,
authorityProofRef: config.authorityProofRef,
swmId: 'swm-main',
transitionType: 'CREATE',
authorityType: 'owner',
});
const payload = requirePayload(asyncPayload, 'asynchronous VM publish request and finalization');
if (!asyncName) throw new Error('async setup did not produce a knowledge asset name');

const queued = await asyncClient.knowledgeAssetPublishAsync(config.contextGraphId, asyncName);
if (!queued.jobId) throw new Error('async publisher did not return a job id');

const completed = await asyncClient.publisherJob(queued.jobId);
Expand All @@ -139,13 +127,17 @@ export default defineSuite({
},
{
beforeIteration: async () => {
asyncPayload = createPayload(config, `esbench-async-${sequence++}`, 1, 'async', false);
const prepared = await asyncClient.sharedMemoryWrite(config.contextGraphId, asyncPayload.quads);
asyncShareOperationId = prepared.shareOperationId;
asyncName = `esbench-async-${sequence++}`;
asyncPayload = createPayload(config, asyncName, 1, 'async', false);
await asyncClient.createKnowledgeAsset(config.contextGraphId, asyncName, {
quads: asyncPayload.quads,
finalize: true,
alsoShareSwm: true,
});
},
afterIteration: () => {
asyncPayload = undefined;
asyncShareOperationId = undefined;
asyncName = undefined;
asyncClient.clear();
},
},
Expand Down
2 changes: 1 addition & 1 deletion bench/support/cpu-profile-report.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ function labelForWidth(label, width) {
}

function formatValue(value, unit) {
if (unit === 'ms') return `${value.toLocaleString(undefined, { maximumFractionDigits: 2, minimumFractionDigits: value < 10 ? 2 : 0 })} ms`;
if (unit === 'ms') return `${value.toLocaleString('en-US', { maximumFractionDigits: 2, minimumFractionDigits: value < 10 ? 2 : 0 })} ms`;
return `${Math.round(value).toLocaleString()} samples`;
}

Expand Down
46 changes: 36 additions & 10 deletions bench/support/layered-dkg-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ interface MemoryRecord {
marker: string;
quads: Quad[];
shareOperationId?: string;
assertionName?: string;
kaId?: string;
}

interface PublisherJob {
contextGraphId: string;
assertionName: string;
jobId: string;
roots: string[];
status: 'queued' | 'finalized' | 'failed';
Expand All @@ -27,6 +29,7 @@ export class LayeredDkgBenchmarkClient implements BenchmarkClient {
readonly workingMemory = new Map<string, MemoryRecord>();
readonly sharedWorkingMemory = new Map<string, MemoryRecord>();
readonly verifiableMemory = new Map<string, MemoryRecord>();
readonly knowledgeAssets = new Map<string, { contextGraphId: string; name: string; roots: string[] }>();
readonly publisherJobs = new Map<string, PublisherJob>();

private shareSequence = 0;
Expand All @@ -50,21 +53,31 @@ export class LayeredDkgBenchmarkClient implements BenchmarkClient {
this.workingMemory.clear();
this.sharedWorkingMemory.clear();
this.verifiableMemory.clear();
this.knowledgeAssets.clear();
this.publisherJobs.clear();
}

async sharedMemoryWrite(contextGraphId: string, quads: Quad[]) {
const working = await this.writeWorkingMemory(contextGraphId, quads);
const shared = await this.liftWorkingMemoryToSharedMemory(contextGraphId, uniqueSubjects(quads));
async createKnowledgeAsset(
contextGraphId: string,
name: string,
options: { quads: Quad[]; finalize: true; alsoShareSwm: true; subGraphName?: string },
) {
const roots = uniqueSubjects(options.quads);
await this.writeWorkingMemory(contextGraphId, options.quads, { assertionName: name });
const shared = await this.liftWorkingMemoryToSharedMemory(contextGraphId, roots);
this.knowledgeAssets.set(assetKey(contextGraphId, name), { contextGraphId, name, roots });
return {
assertionUri: `urn:benchmark:assertion:${encodeURIComponent(contextGraphId)}:${encodeURIComponent(name)}`,
promotedCount: roots.length,
publishReady: true,
shareOperationId: shared.shareOperationId,
};
}

async writeWorkingMemory(contextGraphId: string, quads: Quad[]) {
async writeWorkingMemory(contextGraphId: string, quads: Quad[], ids: Pick<MemoryRecord, 'assertionName'> = {}) {
const shareOperationId = `draft-${++this.draftSequence}`;
for (const rootEntity of uniqueSubjects(quads)) {
const record = createMemoryRecord(contextGraphId, rootEntity, quads, { shareOperationId });
const record = createMemoryRecord(contextGraphId, rootEntity, quads, { shareOperationId, ...ids });
this.workingMemory.set(rootEntity, record);
}
return { shareOperationId };
Expand Down Expand Up @@ -92,7 +105,11 @@ export class LayeredDkgBenchmarkClient implements BenchmarkClient {
// Named-KA one-shot: stage the quads (create → write → share) then publish
// its roots to verifiable memory. Mirrors the create → /vm/publish flow the
// real benchmark client (ApiClient.publishAssertion) drives.
await this.sharedMemoryWrite(contextGraphId, quads);
await this.createKnowledgeAsset(contextGraphId, _name, {
quads,
finalize: true,
alsoShareSwm: true,
});
const roots = uniqueSubjects(quads);
const kaId = `kc-${++this.kcSequence}`;
for (const rootEntity of roots) {
Expand All @@ -106,18 +123,23 @@ export class LayeredDkgBenchmarkClient implements BenchmarkClient {
};
}

async publisherEnqueue(request: Parameters<BenchmarkClient['publisherEnqueue']>[0]) {
for (const rootEntity of request.roots) {
async knowledgeAssetPublishAsync(contextGraphId: string, name: string) {
const asset = this.knowledgeAssets.get(assetKey(contextGraphId, name));
if (!asset) {
throw new Error(`Knowledge asset ${name} is missing from context graph ${contextGraphId}`);
}
for (const rootEntity of asset.roots) {
if (!this.sharedWorkingMemory.has(rootEntity)) {
throw new Error(`Root ${rootEntity} is missing from shared working memory`);
}
}

const jobId = `job-${++this.jobSequence}`;
this.publisherJobs.set(jobId, {
contextGraphId: request.contextGraphId,
contextGraphId,
assertionName: name,
jobId,
roots: [...request.roots],
roots: [...asset.roots],
status: 'queued',
});
return { jobId };
Expand Down Expand Up @@ -196,6 +218,10 @@ function uniqueSubjects(quads: Quad[]): string[] {
return [...new Set(quads.map((quad) => quad.subject))];
}

function assetKey(contextGraphId: string, name: string): string {
return `${contextGraphId}\0${name}`;
}

function markerFromQuads(quads: Quad[]): string {
const markerQuad = quads.find((quad) => quad.predicate === 'http://schema.org/identifier');
if (!markerQuad) throw new Error('Benchmark payload is missing a marker quad');
Expand Down
Loading