Skip to content

Commit a2d3d10

Browse files
authored
fix(langchain): emit tool observations for LangChain tool runs (#787)
1 parent 829eccd commit a2d3d10

2 files changed

Lines changed: 117 additions & 23 deletions

File tree

packages/langchain/src/CallbackHandler.ts

Lines changed: 50 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ import {
1717
startActiveObservation,
1818
LangfuseGeneration,
1919
LangfuseSpan,
20+
LangfuseTool,
21+
LangfuseObservationAttributes,
2022
LangfuseGenerationAttributes,
2123
LangfuseSpanAttributes,
2224
propagateAttributes,
@@ -49,6 +51,8 @@ type ConstructorParams = {
4951
traceMetadata?: Record<string, unknown>; // added to all traces
5052
};
5153

54+
type TrackedObservation = LangfuseSpan | LangfuseGeneration | LangfuseTool;
55+
5256
export class CallbackHandler extends BaseCallbackHandler {
5357
name = "LangfuseCallbackHandler";
5458

@@ -60,7 +64,7 @@ export class CallbackHandler extends BaseCallbackHandler {
6064

6165
private completionStartTimes: Record<string, Date> = {};
6266
private promptToParentRunMap;
63-
private runMap: Map<string, LangfuseSpan | LangfuseGeneration> = new Map();
67+
private runMap: Map<string, TrackedObservation> = new Map();
6468

6569
public last_trace_id: string | null = null;
6670

@@ -457,6 +461,7 @@ export class CallbackHandler extends BaseCallbackHandler {
457461
this.logger.debug(`Tool start with ID: ${runId}`);
458462

459463
this.startAndRegisterOtelSpan({
464+
type: "tool",
460465
runId,
461466
parentRunId,
462467
runName: name ?? tool.id.at(-1)?.toString() ?? "Tool execution",
@@ -716,7 +721,7 @@ export class CallbackHandler extends BaseCallbackHandler {
716721
runName: string;
717722
runId: string;
718723
parentRunId?: string;
719-
attributes: LangfuseGenerationAttributes;
724+
attributes: LangfuseSpanAttributes;
720725
metadata?: Record<string, unknown>;
721726
tags?: string[];
722727
}): LangfuseSpan;
@@ -730,14 +735,23 @@ export class CallbackHandler extends BaseCallbackHandler {
730735
tags?: string[];
731736
}): LangfuseGeneration;
732737
private startAndRegisterOtelSpan(params: {
733-
type?: "span" | "generation";
738+
type: "tool";
734739
runName: string;
735740
runId: string;
736741
parentRunId?: string;
737-
attributes: LangfuseGenerationAttributes;
742+
attributes: LangfuseSpanAttributes;
743+
metadata?: Record<string, unknown>;
744+
tags?: string[];
745+
}): LangfuseTool;
746+
private startAndRegisterOtelSpan(params: {
747+
type?: "span" | "generation" | "tool";
748+
runName: string;
749+
runId: string;
750+
parentRunId?: string;
751+
attributes: LangfuseObservationAttributes;
738752
metadata?: Record<string, unknown>;
739753
tags?: string[];
740-
}): LangfuseSpan | LangfuseGeneration {
754+
}): TrackedObservation {
741755
const { type, runName, runId, parentRunId, attributes, metadata, tags } =
742756
params;
743757

@@ -746,37 +760,50 @@ export class CallbackHandler extends BaseCallbackHandler {
746760
metadata: this.joinTagsAndMetaData(tags, metadata),
747761
level: tags && tags.includes(LANGSMITH_HIDDEN_TAG) ? "DEBUG" : undefined,
748762
...attributes,
749-
} as LangfuseGenerationAttributes;
763+
} as LangfuseObservationAttributes;
764+
765+
const parentSpanContext = parentRunId
766+
? this.runMap.get(parentRunId)?.otelSpan.spanContext()
767+
: undefined;
750768

751769
const observation =
752770
type === "generation"
753771
? startActiveObservation(
754772
runName,
755773
(gen) => {
756-
gen.update(observationAttributes);
774+
gen.update(observationAttributes as LangfuseGenerationAttributes);
757775
return gen;
758776
},
759777
{
760778
asType: "generation",
761-
parentSpanContext: parentRunId
762-
? this.runMap.get(parentRunId)?.otelSpan.spanContext()
763-
: undefined,
779+
parentSpanContext,
764780
endOnExit: false,
765781
},
766782
)
767-
: startActiveObservation(
768-
runName,
769-
(span) => {
770-
span.update(observationAttributes);
771-
return span;
772-
},
773-
{
774-
parentSpanContext: parentRunId
775-
? this.runMap.get(parentRunId)?.otelSpan.spanContext()
776-
: undefined,
777-
endOnExit: false,
778-
},
779-
);
783+
: type === "tool"
784+
? startActiveObservation(
785+
runName,
786+
(tool) => {
787+
tool.update(observationAttributes as LangfuseSpanAttributes);
788+
return tool;
789+
},
790+
{
791+
asType: "tool",
792+
parentSpanContext,
793+
endOnExit: false,
794+
},
795+
)
796+
: startActiveObservation(
797+
runName,
798+
(span) => {
799+
span.update(observationAttributes as LangfuseSpanAttributes);
800+
return span;
801+
},
802+
{
803+
parentSpanContext,
804+
endOnExit: false,
805+
},
806+
);
780807
this.runMap.set(runId, observation);
781808

782809
return observation;
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
import { DynamicTool } from "@langchain/core/tools";
2+
import { CallbackHandler } from "@langfuse/langchain";
3+
import { LangfuseOtelSpanAttributes } from "@langfuse/tracing";
4+
import { afterEach, beforeEach, describe, expect, it } from "vitest";
5+
6+
import { SpanAssertions } from "./helpers/assertions.js";
7+
import {
8+
setupTestEnvironment,
9+
teardownTestEnvironment,
10+
waitForSpanExport,
11+
type TestEnvironment,
12+
} from "./helpers/testSetup.js";
13+
14+
describe("LangChain callback handler integration tests", () => {
15+
let testEnv: TestEnvironment;
16+
let assertions: SpanAssertions;
17+
18+
beforeEach(async () => {
19+
testEnv = await setupTestEnvironment();
20+
assertions = new SpanAssertions(testEnv.mockExporter);
21+
});
22+
23+
afterEach(async () => {
24+
await teardownTestEnvironment(testEnv);
25+
});
26+
27+
it("should mark LangChain tool runs as tool observations", async () => {
28+
const calculatorTool = new DynamicTool({
29+
name: "calculator",
30+
description:
31+
"Perform basic arithmetic operations. Input should be a mathematical expression.",
32+
func: async (input: string) => {
33+
const sanitizedInput = input.replace(/[^0-9+\-*/().]/g, "");
34+
const result = eval(sanitizedInput);
35+
return `The result is: ${result}`;
36+
},
37+
});
38+
39+
const handler = new CallbackHandler();
40+
41+
const result = await calculatorTool.invoke("25*4", {
42+
callbacks: [handler],
43+
});
44+
45+
expect(result).toBe("The result is: 100");
46+
47+
await waitForSpanExport(testEnv.mockExporter, 1);
48+
49+
assertions.expectSpanCount(1);
50+
assertions.expectSpanWithName("calculator");
51+
assertions.expectSpanAttribute(
52+
"calculator",
53+
LangfuseOtelSpanAttributes.OBSERVATION_TYPE,
54+
"tool",
55+
);
56+
assertions.expectSpanAttributeContains(
57+
"calculator",
58+
LangfuseOtelSpanAttributes.OBSERVATION_INPUT,
59+
"25*4",
60+
);
61+
assertions.expectSpanAttribute(
62+
"calculator",
63+
LangfuseOtelSpanAttributes.OBSERVATION_OUTPUT,
64+
"The result is: 100",
65+
);
66+
});
67+
});

0 commit comments

Comments
 (0)