Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
6cc97d0
feat(telemetry): opt-in OTLP log collection for core nodes (PoC)
Bojan131 Jun 24, 2026
ab56bf7
feat(telemetry): per-node Grafana selection + dashboard
Bojan131 Jun 24, 2026
4cdeef6
feat(telemetry): production ingest path for existing Loki (Alloy bridge)
Bojan131 Jun 24, 2026
242c54f
docs(telemetry): production deploy bundle + manager handoff
Bojan131 Jun 24, 2026
a3c5257
docs(telemetry): exact Cloudflare WAF expression + real-Grafana verif…
Bojan131 Jun 24, 2026
9a76a4a
feat(telemetry): fleet dashboard + operator guide + example alerts
Bojan131 Jun 24, 2026
d9b557b
chore: drop accidentally-committed localhost hardhat deploy artifacts
Bojan131 Jun 24, 2026
10da035
chore: restore tracked localhost_contracts.json (only the untracked l…
Bojan131 Jun 24, 2026
a30318d
feat(telemetry): OTel SDK foundation — traces+metrics providers, log …
Bojan131 Jun 25, 2026
80e536d
feat(telemetry): instrument spans + metrics across agent/publisher/ch…
Bojan131 Jun 25, 2026
a841d00
chore(telemetry): collector traces+metrics pipelines + mock OTLP coll…
Bojan131 Jun 25, 2026
2aa31f7
feat(telemetry): add protocol_router.send span + P2P send-duration me…
Bojan131 Jun 25, 2026
414936b
fix(telemetry): address bug-bot review (1 bug + 3 issues + 2 nits)
Bojan131 Jun 25, 2026
9f3625d
Merge remote-tracking branch 'origin/main' into feat/core-node-log-co…
Bojan131 Jun 25, 2026
f78d06f
fix(telemetry): two CI regressions from instrumentation
Bojan131 Jun 25, 2026
908e298
Merge remote-tracking branch 'origin/main' into feat/core-node-log-co…
Bojan131 Jun 26, 2026
0bfdadd
chore: drop accidentally-committed localhost Hardhat deploy artifacts…
Bojan131 Jun 26, 2026
6a8c971
feat(telemetry): address #1317 review — unify log resource attrs + fu…
Bojan131 Jun 26, 2026
66ef5b7
Merge remote-tracking branch 'origin/main' into feat/core-node-log-co…
Bojan131 Jun 26, 2026
2771d2e
fix(telemetry): tie OTel traces/metrics to the master gate + prove in…
Bojan131 Jun 26, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ packages/evm-module/typechain/

packages/evm-module/deployments/hardhat_contracts.json
packages/evm-module/deployments/localhost_contracts.json
packages/evm-module/deployments/localhost/
snapshots/_cache_phase1_neuroweb_epoch16.json
.claude/
.orchestrator/
Expand Down
1 change: 1 addition & 0 deletions packages/agent/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"clean": "rm -rf dist tsconfig.tsbuildinfo"
},
"dependencies": {
"@opentelemetry/api": "^1.9.1",
"@libp2p/peer-id": "^6.0.9",
"@multiformats/multiaddr": "^13.0.3",
"@noble/ciphers": "^2.2.0",
Expand Down
42 changes: 42 additions & 0 deletions packages/agent/src/dkg-agent-publish.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,11 @@ import {
pickNetworkTunables,
sharedMemoryReadBothFilter,
partitionCatalogQuads,
withSpan,
getMetrics,
assertQuadLiteralsMutf8Safe,
} from '@origintrail-official/dkg-core';
import { SpanStatusCode } from '@opentelemetry/api';
import { GraphManager, PrivateContentStore, createTripleStore, type TripleStore, type TripleStoreConfig, type Quad, type LargeLiteralStorageConfig } from '@origintrail-official/dkg-storage';
import { EVMChainAdapter, NoChainAdapter, enrichEvmError, buildKnowledgeAssetUal, type EVMAdapterConfig, type ChainAdapter, type CreateContextGraphParams, type CreateOnChainContextGraphParams, type CreateOnChainContextGraphResult, type TxResult, type V10PublishingConvictionAccountInfo } from '@origintrail-official/dkg-chain';
import {
Expand Down Expand Up @@ -1290,6 +1293,15 @@ export class PublishMethods extends DKGAgentBase {
privateQuads?: Quad[],
opts?: PublishOpts,
): Promise<PublishResult> {
return withSpan('agent.publish', async (span) => {

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Issue: Publish telemetry is duplicated across both publish entry points

What's wrong
The same cross-cutting observability sequence is pasted into two very busy publish flows in a 4.5k-line file. That makes future telemetry changes easy to apply to one path and miss in the other, and it makes the publish logic less direct without adding a real abstraction.

Example
The direct path at lines 1296-1304 and 1440-1459 mirrors the SWM path at lines 4143-4150 and 4282-4380: derive chain id, start a timer, set publish status, mark failed spans, and emit publishTotal / publishDuration.

Suggested direction
Use a helper such as withPublishTelemetry({ spanName, source, contextGraphId, chainId, attributes }, fn) that records status and metrics around the existing publish body. Let each method supply only the attributes that differ.

For Agents
In packages/agent/src/dkg-agent-publish.ts, factor the added publish telemetry into a small helper used by _publish and publishFromSharedMemory. Preserve span names (agent.publish, agent.publish_from_swm), source labels (direct, swm), status handling, and metric attributes.

const chainId = typeof this.chain?.chainId === 'string' && this.chain.chainId !== 'none' ? this.chain.chainId : undefined;
const publishStartedAt = Date.now();
span.setAttributes({
'dkg.context_graph_id': contextGraphId,
'dkg.triple_count': quads.length,
'dkg.has_private': !!privateQuads && privateQuads.length > 0,
...(chainId ? { 'dkg.chain_id': chainId } : {}),
});
const ctx = opts?.operationCtx ?? createOperationContext('publish');
const onPhase = opts?.onPhase;
this.log.info(ctx, `Starting publish to context graph "${contextGraphId}" with ${quads.length} triples`);
Expand Down Expand Up @@ -1425,6 +1437,12 @@ export class PublishMethods extends DKGAgentBase {
encryptInlineChunked,
});

span.setAttribute('dkg.publish_status', result.status);
if (result.status === 'failed') {
span.setStatus({ code: SpanStatusCode.ERROR });
span.addEvent('publish_failed', { error: String(result.contextGraphError ?? '') });
}

onPhase?.('broadcast', 'start');
this.log.info(ctx, `Local publish complete, broadcasting to peers`);
await this.broadcastPublish(contextGraphId, result, ctx);
Expand All @@ -1436,7 +1454,12 @@ export class PublishMethods extends DKGAgentBase {
// it can never affect the publish just completed.
await this.emitPublicProjectionAfterPublish(contextGraphId, result, ctx);

const publishMetricAttrs = { outcome: result.status, source: 'direct', ...(chainId ? { chain_id: chainId } : {}) };
getMetrics().publishTotal.add(1, publishMetricAttrs);
getMetrics().publishDuration.record(Date.now() - publishStartedAt, publishMetricAttrs);

return result;
});
}

/**
Expand Down Expand Up @@ -4117,6 +4140,14 @@ export class PublishMethods extends DKGAgentBase {
schemeVersion?: number;
},
): Promise<PublishResult> {
return withSpan('agent.publish_from_swm', async (span) => {
const chainId = typeof this.chain?.chainId === 'string' && this.chain.chainId !== 'none' ? this.chain.chainId : undefined;
const publishStartedAt = Date.now();
span.setAttributes({
'dkg.context_graph_id': contextGraphId,
'dkg.selection': selection === 'all' ? 'all' : 'roots',
...(chainId ? { 'dkg.chain_id': chainId } : {}),
});
const ctx = options?.operationCtx ?? createOperationContext('publishFromSWM');
const effectiveSubCG = options?.subContextGraphId ?? options?.contextGraphId;
// `ctxGraphIdStr` doubles as `publishContextGraphId` for REMAP-flow
Expand Down Expand Up @@ -4248,6 +4279,12 @@ export class PublishMethods extends DKGAgentBase {
encryptInlineChunked,
});

span.setAttribute('dkg.publish_status', result.status);
if (result.status === 'failed') {
span.setStatus({ code: SpanStatusCode.ERROR });
span.addEvent('publish_failed', { error: String(result.contextGraphError ?? '') });
}

if (result.status === 'confirmed' && result.onChainResult) {
const rootEntities = result.kaManifest.map(ka => ka.rootEntity);

Expand Down Expand Up @@ -4338,7 +4375,12 @@ export class PublishMethods extends DKGAgentBase {
}
}

const publishMetricAttrs = { outcome: result.status, source: 'swm', ...(chainId ? { chain_id: chainId } : {}) };
getMetrics().publishTotal.add(1, publishMetricAttrs);
getMetrics().publishDuration.record(Date.now() - publishStartedAt, publishMetricAttrs);

return result;
});
}

/** @deprecated Use publishFromSharedMemory. Will be removed in V10.1. */
Expand Down
17 changes: 15 additions & 2 deletions packages/agent/src/p2p/sync-transport.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { randomUUID } from 'node:crypto';
import { withRetry } from '@origintrail-official/dkg-core';
import { withRetry, withSpan, getMetrics } from '@origintrail-official/dkg-core';
import { markSyncTransportFailure } from '../sync/error-tags.js';

/**
Expand Down Expand Up @@ -82,7 +82,11 @@ interface SyncSendParams {
}

export async function sendSyncRequest(params: SyncSendParams): Promise<Uint8Array> {
return withRetry(
return withSpan(
'sync.request',
async () => {
try {
const out = await withRetry(
async () => {
throwIfAborted(params.signal);
const requestBytes = await params.requestFactory();
Expand Down Expand Up @@ -114,6 +118,15 @@ export async function sendSyncRequest(params: SyncSendParams): Promise<Uint8Arra
isRetryable: () => params.signal?.aborted !== true,
onRetry: params.onRetry,
},
);
getMetrics().syncRequestTotal.add(1, { outcome: 'ok', protocol_id: params.protocolId });
return out;
} catch (err) {
getMetrics().syncRequestTotal.add(1, { outcome: 'error', protocol_id: params.protocolId });
throw err;
}
},
{ attributes: { 'dkg.protocol_id': params.protocolId } },
);
}

Expand Down
15 changes: 13 additions & 2 deletions packages/agent/src/sync/responder/sync-handler.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import {
createOperationContext,
QuietRetryableHandlerError,
withSpan,
getMetrics,
type OperationContext,
} from '@origintrail-official/dkg-core';
import type { TripleStore } from '@origintrail-official/dkg-storage';
Expand Down Expand Up @@ -297,7 +299,8 @@ export function registerSyncHandler(params: RegisterSyncHandlerParams): void {
};
};

register(protocolSync, async (data, peerId, options) => {
register(protocolSync, async (data, peerId, options) => withSpan('sync.response', async (span) => {
span.setAttribute('dkg.protocol_id', protocolSync);
const signal = options?.signal;
const handlerStartedAt = Date.now();
const request = parseSyncRequest(data);
Expand Down Expand Up @@ -495,12 +498,19 @@ export function registerSyncHandler(params: RegisterSyncHandlerParams): void {
logDebug(createOperationContext('sync'), `Sync responder total for "${contextGraphId}" (phase=${phase}, workspace=${isWorkspace}): ${totalDurationMs}ms`);
}
return new TextEncoder().encode(nquads.join('\n'));
}).then((res) => {
getMetrics().syncResponseTotal.add(1, { outcome: 'ok' });
return res;
}).catch((err) => {
if (err instanceof SyncResponderBusyError) {
getMetrics().syncResponseTotal.add(1, { outcome: 'busy' });
span.setAttribute('dkg.sync_response_outcome', 'busy');
logDebug(createOperationContext('sync'), `Sync responder busy for "${contextGraphId}" from peer ${peerId} (phase=${phase}): ${err.message}`);
throw new QuietRetryableHandlerError(err.message);
}
if (err instanceof SyncRowSnapshotLimitError) {
getMetrics().syncResponseTotal.add(1, { outcome: 'limit' });
span.setAttribute('dkg.sync_response_outcome', 'limit');
logWarn(
createOperationContext('sync'),
`Sync responder snapshot limit for "${contextGraphId}" from peer ${peerId} (phase=${phase}, workspace=${isWorkspace}): active=${err.activeEntries}/${err.maxEntries} cached=${err.cachedEntries} inflight=${err.inflightEntries} key=${err.key}`,
Expand All @@ -509,7 +519,8 @@ export function registerSyncHandler(params: RegisterSyncHandlerParams): void {
`sync responder snapshot limit exceeded (active=${err.activeEntries}/${err.maxEntries})`,
);
}
getMetrics().syncResponseTotal.add(1, { outcome: 'error' });
throw err;
});
});
}));
}
2 changes: 2 additions & 0 deletions packages/chain/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
"@origintrail-official/dkg-evm-module": "workspace:*"
},
"devDependencies": {
"@opentelemetry/api": "^1.9.1",
"@opentelemetry/sdk-metrics": "^2.8.0",
"@vitest/coverage-v8": "^4.0.18",
"vitest": "^4.0.18"
},
Expand Down
Loading
Loading