Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@librechat/agents",
"version": "3.2.44",
"version": "3.2.45",
"main": "./dist/cjs/main.cjs",
"module": "./dist/esm/main.mjs",
"types": "./dist/types/index.d.ts",
Expand Down
33 changes: 20 additions & 13 deletions src/graphs/Graph.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,6 @@ import {
makeIsDeferred,
partitionAndMarkAnthropicToolCache,
} from '@/messages';
import {
resolveLangfuseConfig,
shouldTraceToolNodeForLangfuse,
withLangfuseToolOutputTracingConfig,
} from '@/langfuseToolOutputTracing';
import {
createLangfuseHandler,
createLangfuseTraceMetadata,
Expand All @@ -54,6 +49,10 @@ import {
joinKeys,
sleep,
} from '@/utils';
import {
resolveLangfuseRuntimeScope,
withLangfuseRuntimeScope,
} from '@/langfuseRuntimeScope';
import {
GraphNodeKeys,
ContentTypes,
Expand All @@ -67,6 +66,7 @@ import {
type CallbackEntry,
} from '@/utils/callbacks';
import { partitionAndMarkOpenRouterToolCache } from '@/llm/openrouter/toolCache';
import { shouldTraceToolNodeForLangfuse } from '@/langfuseToolOutputTracing';
import { ToolNode as CustomToolNode, toolsCondition } from '@/tools/ToolNode';
import { createLocalCodingToolBundle } from '@/tools/local/LocalCodingTools';
import { SubagentExecutor, resolveSubagentConfigs } from '@/tools/subagent';
Expand All @@ -81,6 +81,7 @@ import { shouldTriggerSummarization } from '@/summarization';
import { resolveLocalToolsForBinding } from '@/tools/local';
import { createSummarizeNode } from '@/summarization/node';
import { messagesStateReducer } from '@/messages/reducer';
import { resolveLangfuseConfig } from '@/langfuseConfig';
import { createSchemaOnlyTools } from '@/tools/schema';
import { AgentContext } from '@/agents/AgentContext';
import { createFakeStreamingLLM } from '@/llm/fake';
Expand Down Expand Up @@ -2079,6 +2080,8 @@ export class StandardGraph extends Graph<t.BaseGraphState, t.GraphNode> {
sessionId: config.configurable?.thread_id as string | undefined,
traceMetadata,
tags: ['librechat', 'agent'],
traceIdSeed:
langfuse?.deterministicTraceId === true ? this.runId : undefined,
});
if (langfuseHandler != null) {
invokeConfig = {
Expand All @@ -2092,8 +2095,11 @@ export class StandardGraph extends Graph<t.BaseGraphState, t.GraphNode> {
const metadata = config.metadata as Record<string, unknown>;

try {
result = await withLangfuseToolOutputTracingConfig(
this.langfuse,
result = await withLangfuseRuntimeScope(
resolveLangfuseRuntimeScope({
runLangfuse: this.langfuse,
langfuseOverlay: agentContext.langfuse,
}),
() =>
attemptInvoke(
{
Expand All @@ -2103,16 +2109,18 @@ export class StandardGraph extends Graph<t.BaseGraphState, t.GraphNode> {
context: this,
},
invokeConfig
),
agentContext.langfuse
)
);
} catch (primaryError) {
clearCurrentDeltaStepMarkers({
graph: this,
metadata,
});
result = await withLangfuseToolOutputTracingConfig(
this.langfuse,
result = await withLangfuseRuntimeScope(
resolveLangfuseRuntimeScope({
runLangfuse: this.langfuse,
langfuseOverlay: agentContext.langfuse,
}),
() =>
tryFallbackProviders({
fallbacks,
Expand All @@ -2121,8 +2129,7 @@ export class StandardGraph extends Graph<t.BaseGraphState, t.GraphNode> {
config: invokeConfig,
primaryError,
context: this,
}),
agentContext.langfuse
})
);
} finally {
await disposeLangfuseHandler(langfuseHandler);
Expand Down
62 changes: 33 additions & 29 deletions src/instrumentation.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { AsyncLocalStorage } from 'node:async_hooks';
import { createHash, randomBytes } from 'node:crypto';
import { setLangfuseTracerProvider } from '@langfuse/tracing';
import { BasicTracerProvider } from '@opentelemetry/sdk-trace-base';
Expand All @@ -14,43 +13,33 @@ import type { LangfuseSpanProcessorParams } from '@langfuse/otel';
import type { Context } from '@opentelemetry/api';
import type * as t from '@/types';
import {
createLibreChatTraceAttributes,
hasLangfuseConfigCredentials,
hasLangfuseEnvCredentials,
hasLangfuseEnvConfig,
} from '@/langfuse';
import {
createLangfuseSpanProcessor,
getContextLangfuseConfig,
} from '@/langfuseToolOutputTracing';
resolveLangfuseConfigForSpan,
resolveTraceIdSeedForSpan,
} from '@/langfuseRuntimeScope';
import { createLangfuseSpanProcessor } from '@/langfuseToolOutputTracing';
import { traceIdFromSeed } from '@/langfuseRuntimeContext';
import { isPresent } from '@/utils/misc';

/**
* Per-run seed for deterministic Langfuse trace ids. When a run opts in
* (`LangfuseConfig.deterministicTraceId`), it executes its stream inside
* `runWithTraceIdSeed(runId, …)` and the IdGenerator below derives the root
* trace id from that seed instead of a random one. This lets external systems
* (e.g. a host app recording user feedback after the fact) attach scores or
* observations to the trace by regenerating the same id from the run/message
* id — no trace lookup required. With no active seed it falls back to random
* ids, so default behavior is unchanged.
* `runWithTraceIdSeed(runId, ...)` from `./langfuseRuntimeContext`, and the
* IdGenerator below derives the root trace id from that seed instead of a
* random one. This lets external systems (e.g. a host app recording user
* feedback after the fact) attach scores or observations to the trace by
* regenerating the same id from the run/message id; no trace lookup required.
* With no active seed it falls back to random ids, so default behavior is
* unchanged.
*/
const traceIdSeedStore = new AsyncLocalStorage<string>();

export function runWithTraceIdSeed<T>(
seed: string | undefined,
fn: () => T
): T {
return isPresent(seed) ? traceIdSeedStore.run(seed, fn) : fn();
}

/** sha256(seed) → first 32 hex chars; matches `@langfuse/tracing` `createTraceId`. */
function traceIdFromSeed(seed: string): string {
return createHash('sha256').update(seed, 'utf8').digest('hex').slice(0, 32);
}

class SeededTraceIdGenerator implements IdGenerator {
generateTraceId(): string {
const seed = traceIdSeedStore.getStore();
const seed = resolveTraceIdSeedForSpan(context.active());
return isPresent(seed)
? traceIdFromSeed(seed)
: randomBytes(16).toString('hex');
Expand Down Expand Up @@ -120,20 +109,29 @@ function getLangfuseSpanProcessorParams(
return undefined;
}

function hashCacheKeyValue(value: string | undefined): string | undefined {
return isPresent(value)
? createHash('sha256').update(value, 'utf8').digest('hex')
: undefined;
}

function getLangfuseTracerProviderKey(
params: LangfuseSpanProcessorParams,
langfuse?: t.LangfuseConfig
): string {
return JSON.stringify({
publicKey: params.publicKey,
secretKey: params.secretKey,
secretKeyHash: hashCacheKeyValue(params.secretKey),
baseUrl: params.baseUrl,
environment: params.environment,
toolOutputTracing: langfuse?.toolOutputTracing,
});
}

class RoutingLangfuseSpanProcessor implements SpanProcessor {
// Processors live for the process lifetime. LibreChat tenant Langfuse
// destinations are expected to be a bounded admin-managed set, and shutdown
// drains every cached processor when the provider is disposed.
private readonly processors = new Map<string, SpanProcessor>();
private readonly spanProcessors = new WeakMap<object, SpanProcessor>();

Expand All @@ -155,13 +153,19 @@ class RoutingLangfuseSpanProcessor implements SpanProcessor {
}

onStart(span: Span, parentContext: Context): void {
const processor = this.ensureProcessor(
getContextLangfuseConfig(parentContext)
);
const langfuse = resolveLangfuseConfigForSpan(parentContext);
const processor = this.ensureProcessor(langfuse);
if (processor == null) {
return;
}

const librechatTraceAttributes = createLibreChatTraceAttributes(
langfuse?.librechatTraceAttributes ?? {}
);
if (Object.keys(librechatTraceAttributes).length > 0) {
span.setAttributes(librechatTraceAttributes);
}

this.spanProcessors.set(span, processor);
processor.onStart(span, parentContext);
}
Expand Down
Loading
Loading