Skip to content
Open
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ run
.DS_Store
.tmp
.vscode
.claude

package-lock.json
yarn.lock
Expand Down
129 changes: 129 additions & 0 deletions plugin/langchain/app/controller/RunsController.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
import {
HTTPController,
HTTPMethod,
HTTPMethodEnum,
HTTPBody,
Context,
Middleware,
} from '@eggjs/tegg';
import type { EggContext } from '@eggjs/tegg';
import type { RunCreateDTO } from './types';
import { streamSSE } from '../../lib/sse';
import { RunCreate } from './schemas';
import { ZodErrorMiddleware } from '../middleware/ZodErrorMiddleware';

/**
* LangGraph Runs Controller
* 处理 Run 相关的 HTTP 请求
*/
@HTTPController({
path: '/api',
})
@Middleware(ZodErrorMiddleware)
export class RunsController {
/**
* POST /api/runs/stream
* 流式创建无状态 Run (SSE)
*
* 对应 LangGraph runs.mts 的 api.post("/runs/stream", ...) 端点
*/
@HTTPMethod({
method: HTTPMethodEnum.POST,
path: '/runs/stream',
})
async streamStatelessRun(@Context() ctx: EggContext, @HTTPBody() payload: RunCreateDTO) {
const validated = RunCreate.parse(payload);
console.log('streamStatelessRun', validated);
// Mock: 生成一个假的 run_id
const runId = `run_${Date.now()}_${Math.random().toString(36).substring(7)}`;

// 设置 Content-Location header
ctx.set('Content-Location', `/runs/${runId}`);

// 类型断言帮助访问 input 中的 messages
const inputData = validated.input as { messages?: Array<{ role: string; content: string }> } | undefined;

// 使用 SSE 流式返回
return streamSSE(ctx, async (stream) => {
// 如果需要在断开连接时取消,创建 AbortSignal
// const cancelOnDisconnect = validated.on_disconnect === 'cancel'
// ? getDisconnectAbortSignal(ctx, stream)
// : undefined;

try {
// TODO: 调用 runs service 的 stream.join 方法获取运行结果
// for await (const { event, data } of runs().stream.join(
// runId,
// undefined,
// {
// cancelOnDisconnect,
// lastEventId: validated.stream_resumable ? "-1" : undefined,
// ignore404: true,
// },
// auth
// )) {
// await stream.writeSSE({ data: JSON.stringify(data), event });
// }

// Mock 实现:模拟 SSE 流式响应
// 1. 发送 metadata 事件
await stream.writeSSE({
event: 'metadata',
data: JSON.stringify({
run_id: runId,
assistant_id: validated.assistant_id || 'mock_assistant',
}),
});

await stream.sleep(100);

// 2. 发送 values 事件 - 模拟开始处理
await stream.writeSSE({
event: 'values',
data: JSON.stringify({
messages: [
{
role: 'user',
content: inputData?.messages?.[0]?.content || 'Hello',
},
],
}),
});

await stream.sleep(500);

// 3. 发送 values 事件 - 模拟 AI 响应
await stream.writeSSE({
event: 'values',
data: JSON.stringify({
messages: [
{
role: 'user',
content: inputData?.messages?.[0]?.content || 'Hello',
},
{
role: 'assistant',
content: `Mock response to: ${inputData?.messages?.[0]?.content || 'Hello'}`,
},
],
}),
});

await stream.sleep(200);

// 4. 发送 end 事件
await stream.writeSSE({
event: 'end',
data: JSON.stringify({
run_id: runId,
status: 'completed',
}),
});
} catch (error) {
console.error('Error streaming run:', error);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

建议使用 ctx.logger.error 代替 console.error 来记录错误。这样可以利用应用统一的日志配置,方便后续的日志收集和分析。

Suggested change
console.error('Error streaming run:', error);
ctx.logger.error('Error streaming run:', error);

throw error;
}
});
}

}
Loading